How to write to a Parquet file in Scala without using Apache Spark

What to do when you want to store something in a Parquet file when writing a standard Scala application, not an Apache Spark job? You can use the project created by my colleague — Parquet4S.

First, I am going to create a custom class with custom type parameters (I also included all of the imports in the first code snippet).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import com.github.mjakubowski84.parquet4s._
import org.apache.parquet.schema.{OriginalType, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver._

import scala.concurrent.duration._
import scala.concurrent.Await

case class Email(value: String) extends AnyVal
case class FirstName(value: String) extends AnyVal
case class LastName(value: String) extends AnyVal

case class User(firstName: FirstName, lastName: LastName, email: Email)

Because I don’t use Scala built-in types, I have to define the codecs and schemas of those custom types.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
implicit val firstNameTypeCodec: OptionalValueCodec[FirstName] =
  new OptionalValueCodec[FirstName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): FirstName = value match {
      case StringValue(string) => FirstName(string)
    }
    override protected def encodeNonNull(data: FirstName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val firstNameSchema: TypedSchemaDef[FirstName] =
  typedSchemaDef[FirstName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val lastNameTypeCodec: OptionalValueCodec[LastName] =
  new OptionalValueCodec[LastName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): LastName = value match {
      case StringValue(string) => LastName(string)
    }
    override protected def encodeNonNull(data: LastName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val lastNameSchema: TypedSchemaDef[LastName] =
  typedSchemaDef[LastName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val emailTypeCodec: OptionalValueCodec[Email] =
  new OptionalValueCodec[Email] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): Email = value match {
      case StringValue(string) => Email(string)
    }
    override protected def encodeNonNull(data: Email, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val emailSchema: TypedSchemaDef[Email] =
  typedSchemaDef[Email](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

A lot of boilerplate code, but that is the cost of reusing the domain model classes in the data persistence layer ;)

Now, I can create an Akka stream which contains the data to be saved, and use the code from the Parquet4S documentation to store the data in parquet files.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val data = Stream.apply(User(FirstName("Test"), LastName("Test"), Email("mail@example.com")))

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

val writeOptions = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE,
  compressionCodecName = CompressionCodecName.SNAPPY
)

Source(data).runWith(ParquetStreams.toParquetParallelUnordered(
  path = "./users",
  parallelism = 4,
  options = writeOptions
))

Await.ready(actorSystem.terminate(), 2 minutes)

Did you enjoy reading this article?
Would you like to learn more about leveraging AI to drive growth and innovation, software craft in data engineering, and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • MLOps engineer by day
  • AI and data engineering consultant by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
  • Mastodon: @mikulskibartosz@mathstodon.xyz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.