How to derive multiple columns from a single column in a PySpark DataFrame

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (7/100)

In this article, I will show you how to extract multiple columns from a single column in a PySpark DataFrame. I am going to use two methods. First, I will use the withColumn function to create a new column twice.In the second example, I will implement a UDF that extracts both columns at once.

In both examples, I will use the following example DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
df_schema = StructType([StructField('to_be_extracted', StringType())])

test_list = [
    ['1183 Amsterdam'],
    ['06123 Ankara'],
    ['08067 Barcelona'],
    ['3030 Bern'],
    ['75116 Paris'],
    ['1149-014 Lisbon'],
    ['00-999 Warsaw'],
    ['00199 Rome'],
    ['HR-10 040 Zagreb']
]

df: DataFrame = spark_session.createDataFrame(test_list, schema=df_schema)
1
2
3
4
5
6
7
8
9
10
11
12
13
+----------------+
| to_be_extracted|
+----------------+
|  1183 Amsterdam|
|    06123 Ankara|
| 08067 Barcelona|
|       3030 Bern|
|     75116 Paris|
| 1149-014 Lisbon|
|   00-999 Warsaw|
|      00199 Rome|
|HR-10 040 Zagreb|
+----------------+

Note that it contains only one column to_be_extracted, and that column contains both the postal code and the name of a European city. I want to create separate columns for those two values.

Using the withColumn Function

To separate the postal code from the city name, I need a regular expression that splits the data into two groups. To handle my example data the following expression is sufficient: r'^(.*?)\s(\w*?)$'. It puts the last word in the text into the second group and everything else (without the space between groups) into the first group.

After defining the regular expression, I can use the withColumn function and the regex_extract function to separate the postal code from the city name:

1
2
3
4
5
6
7
8
9
10
regex = r'^(.*?)\s(\w*?)$'  
df \  
    .withColumn(
        'postal_code',
        regexp_extract(col('to_be_extracted'), regex, 1)
    ) \  
    .withColumn(
        'city',
        regexp_extract(col('to_be_extracted'), regex, 2)
    )
1
2
3
4
5
6
7
8
9
10
11
12
13
+----------------+-----------+---------+
| to_be_extracted|postal_code|     city|
+----------------+-----------+---------+
|  1183 Amsterdam|       1183|Amsterdam|
|    06123 Ankara|      06123|   Ankara|
| 08067 Barcelona|      08067|Barcelona|
|       3030 Bern|       3030|     Bern|
|     75116 Paris|      75116|    Paris|
| 1149-014 Lisbon|   1149-014|   Lisbon|
|   00-999 Warsaw|     00-999|   Warsaw|
|      00199 Rome|      00199|     Rome|
|HR-10 040 Zagreb|  HR-10 040|   Zagreb|
+----------------+-----------+---------+

In this case, the obvious disadvantage is the need to run the regex_extract function twice.



Using a UDF

If I want to use a UDF, the code gets a little bit more complicated. First, I have to define the schema of the value returned by the UDF. My return value is going to be a struct containing two text fields:

1
2
3
4
schema = StructType([  
    StructField("postal_code", StringType(), False),  
    StructField("city", StringType(), False)  
])

After defining the schema, I have to write the function code:

1
2
3
4
5
6
7
8
9
10
11
12
13
import re  

def extract_in_python(content):  
    regex = r'^(.*?)\s(\w*?)$'  
  
    search_result = re.search(regex, content)  
  
    if search_result:  
        postal_code = search_result.group(1)  
        city = search_result.group(2)  
        return postal_code, city
    else:  
        return None, None

Now, I can turn my Python function into a PySpark UDF and use it to extract the data from the column:

1
2
3
4
extract_udf = udf(extract_in_python, schema)

    df \
        .withColumn('extracted', extract_udf(col('to_be_extracted')))
1
2
3
4
5
6
7
8
9
10
11
12
13
+----------------+-------------------+
| to_be_extracted|          extracted|
+----------------+-------------------+
|  1183 Amsterdam|  [1183, Amsterdam]|
|    06123 Ankara|    [06123, Ankara]|
| 08067 Barcelona| [08067, Barcelona]|
|       3030 Bern|       [3030, Bern]|
|     75116 Paris|     [75116, Paris]|
| 1149-014 Lisbon| [1149-014, Lisbon]|
|   00-999 Warsaw|   [00-999, Warsaw]|
|      00199 Rome|      [00199, Rome]|
|HR-10 040 Zagreb|[HR-10 040, Zagreb]|
+----------------+-------------------+

I didn’t get the result that I want. Instead of separate columns, I have a single column with the Struct inside. Fortunately, I can easily flatten the Struct using the select function:

1
2
3
df \  
    .withColumn('extracted', extract_udf(col('to_be_extracted'))) \  
    .select(col('to_be_extracted'), col("extracted.*"))
1
2
3
4
5
6
7
8
9
10
11
12
13
+----------------+-----------+---------+
| to_be_extracted|postal_code|     city|
+----------------+-----------+---------+
|  1183 Amsterdam|       1183|Amsterdam|
|    06123 Ankara|      06123|   Ankara|
| 08067 Barcelona|      08067|Barcelona|
|       3030 Bern|       3030|     Bern|
|     75116 Paris|      75116|    Paris|
| 1149-014 Lisbon|   1149-014|   Lisbon|
|   00-999 Warsaw|     00-999|   Warsaw|
|      00199 Rome|      00199|     Rome|
|HR-10 040 Zagreb|  HR-10 040|   Zagreb|
+----------------+-----------+---------+

Are UDFs Better Than Multiple withColumn Calls?

The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.


Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group