How to derive multiple columns from a single column in a PySpark DataFrame

In this article, I will show you how to extract multiple columns from a single column in a PySpark DataFrame. I am going to use two methods. First, I will use the withColumn function to create a new column twice.In the second example, I will implement a UDF that extracts both columns at once.

In both examples, I will use the following example DataFrame:

df_schema = StructType([StructField('to_be_extracted', StringType())])

test_list = [
    ['1183 Amsterdam'],
    ['06123 Ankara'],
    ['08067 Barcelona'],
    ['3030 Bern'],
    ['75116 Paris'],
    ['1149-014 Lisbon'],
    ['00-999 Warsaw'],
    ['00199 Rome'],
    ['HR-10 040 Zagreb']
]

df: DataFrame = spark_session.createDataFrame(test_list, schema=df_schema)
+----------------+
| to_be_extracted|
+----------------+
|  1183 Amsterdam|
|    06123 Ankara|
| 08067 Barcelona|
|       3030 Bern|
|     75116 Paris|
| 1149-014 Lisbon|
|   00-999 Warsaw|
|      00199 Rome|
|HR-10 040 Zagreb|
+----------------+

Note that it contains only one column to_be_extracted, and that column contains both the postal code and the name of a European city. I want to create separate columns for those two values.

Using the withColumn Function

To separate the postal code from the city name, I need a regular expression that splits the data into two groups. To handle my example data the following expression is sufficient: r'^(.*?)\s(\w*?)$'. It puts the last word in the text into the second group and everything else (without the space between groups) into the first group.

After defining the regular expression, I can use the withColumn function and the regex_extract function to separate the postal code from the city name:

regex = r'^(.*?)\s(\w*?)$'
df \
    .withColumn(
        'postal_code',
        regexp_extract(col('to_be_extracted'), regex, 1)
    ) \
    .withColumn(
        'city',
        regexp_extract(col('to_be_extracted'), regex, 2)
    )
+----------------+-----------+---------+
| to_be_extracted|postal_code|     city|
+----------------+-----------+---------+
|  1183 Amsterdam|       1183|Amsterdam|
|    06123 Ankara|      06123|   Ankara|
| 08067 Barcelona|      08067|Barcelona|
|       3030 Bern|       3030|     Bern|
|     75116 Paris|      75116|    Paris|
| 1149-014 Lisbon|   1149-014|   Lisbon|
|   00-999 Warsaw|     00-999|   Warsaw|
|      00199 Rome|      00199|     Rome|
|HR-10 040 Zagreb|  HR-10 040|   Zagreb|
+----------------+-----------+---------+

In this case, the obvious disadvantage is the need to run the regex_extract function twice.

Using a UDF

If I want to use a UDF, the code gets a little bit more complicated. First, I have to define the schema of the value returned by the UDF. My return value is going to be a struct containing two text fields:

schema = StructType([
    StructField("postal_code", StringType(), False),
    StructField("city", StringType(), False)
])

After defining the schema, I have to write the function code:

import re

def extract_in_python(content):
    regex = r'^(.*?)\s(\w*?)$'

    search_result = re.search(regex, content)

    if search_result:
        postal_code = search_result.group(1)
        city = search_result.group(2)
        return postal_code, city
    else:
        return None, None

Now, I can turn my Python function into a PySpark UDF and use it to extract the data from the column:

extract_udf = udf(extract_in_python, schema)

    df \
        .withColumn('extracted', extract_udf(col('to_be_extracted')))
+----------------+-------------------+
| to_be_extracted|          extracted|
+----------------+-------------------+
|  1183 Amsterdam|  [1183, Amsterdam]|
|    06123 Ankara|    [06123, Ankara]|
| 08067 Barcelona| [08067, Barcelona]|
|       3030 Bern|       [3030, Bern]|
|     75116 Paris|     [75116, Paris]|
| 1149-014 Lisbon| [1149-014, Lisbon]|
|   00-999 Warsaw|   [00-999, Warsaw]|
|      00199 Rome|      [00199, Rome]|
|HR-10 040 Zagreb|[HR-10 040, Zagreb]|
+----------------+-------------------+

I didn’t get the result that I want. Instead of separate columns, I have a single column with the Struct inside. Fortunately, I can easily flatten the Struct using the select function:

df \
    .withColumn('extracted', extract_udf(col('to_be_extracted'))) \
    .select(col('to_be_extracted'), col("extracted.*"))
+----------------+-----------+---------+
| to_be_extracted|postal_code|     city|
+----------------+-----------+---------+
|  1183 Amsterdam|       1183|Amsterdam|
|    06123 Ankara|      06123|   Ankara|
| 08067 Barcelona|      08067|Barcelona|
|       3030 Bern|       3030|     Bern|
|     75116 Paris|      75116|    Paris|
| 1149-014 Lisbon|   1149-014|   Lisbon|
|   00-999 Warsaw|     00-999|   Warsaw|
|      00199 Rome|      00199|     Rome|
|HR-10 040 Zagreb|  HR-10 040|   Zagreb|
+----------------+-----------+---------+

Are UDFs Better Than Multiple withColumn Calls?

The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.

Older post

Broadcast variables and broadcast joins in Apache Spark

How to speed up joins of small DataFrames by using the broadcast join

Newer post

How to concatenate columns in a PySpark DataFrame

How to use the concat and concat_ws functions to merge multiple columns into one in PySpark