How to write to a Parquet file in Scala without using Apache Spark

How to write to a Parquet file in Scala without using Apache Spark

What to do when you want to store something in a Parquet file when writing a standard Scala application, not an Apache Spark job? You can use the project created by my colleague — Parquet4S.

First, I am going to create a custom class with custom type parameters (I also included all of the imports in the first code snippet).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import com.github.mjakubowski84.parquet4s._
import org.apache.parquet.schema.{OriginalType, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver._

import scala.concurrent.duration._
import scala.concurrent.Await

case class Email(value: String) extends AnyVal
case class FirstName(value: String) extends AnyVal
case class LastName(value: String) extends AnyVal

case class User(firstName: FirstName, lastName: LastName, email: Email)

Because I don’t use Scala built-in types, I have to define the codecs and schemas of those custom types.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
implicit val firstNameTypeCodec: OptionalValueCodec[FirstName] =
  new OptionalValueCodec[FirstName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): FirstName = value match {
      case StringValue(string) => FirstName(string)
    }
    override protected def encodeNonNull(data: FirstName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val firstNameSchema: TypedSchemaDef[FirstName] =
  typedSchemaDef[FirstName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val lastNameTypeCodec: OptionalValueCodec[LastName] =
  new OptionalValueCodec[LastName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): LastName = value match {
      case StringValue(string) => LastName(string)
    }
    override protected def encodeNonNull(data: LastName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val lastNameSchema: TypedSchemaDef[LastName] =
  typedSchemaDef[LastName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val emailTypeCodec: OptionalValueCodec[Email] =
  new OptionalValueCodec[Email] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): Email = value match {
      case StringValue(string) => Email(string)
    }
    override protected def encodeNonNull(data: Email, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val emailSchema: TypedSchemaDef[Email] =
  typedSchemaDef[Email](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

A lot of boilerplate code, but that is the cost of reusing the domain model classes in the data persistence layer ;)

Now, I can create an Akka stream which contains the data to be saved, and use the code from the Parquet4S documentation to store the data in parquet files.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val data = Stream.apply(User(FirstName("Test"), LastName("Test"), Email("mail@example.com")))

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

val writeOptions = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE,
  compressionCodecName = CompressionCodecName.SNAPPY
)

Source(data).runWith(ParquetStreams.toParquetParallelUnordered(
  path = "./users",
  parallelism = 4,
  options = writeOptions
))

Await.ready(actorSystem.terminate(), 2 minutes)

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you watch programming live streams, check out my YouTube channel.
You can also follow me on Twitter: @mikulskibartosz

For business inquiries, send me a message on LinkedIn or Twitter.


Bartosz Mikulski
Bartosz Mikulski * data scientist / software engineer * conference speaker * organizer of School of A.I. meetups in Poznań * co-founder of Software Craftsmanship Poznan & Poznan Scala User Group