How to write to a Parquet file in Scala without using Apache Spark

What to do when you want to store something in a Parquet file when writing a standard Scala application, not an Apache Spark job? You can use the project created by my colleague — Parquet4S.

First, I am going to create a custom class with custom type parameters (I also included all of the imports in the first code snippet).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import com.github.mjakubowski84.parquet4s._
import org.apache.parquet.schema.{OriginalType, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver._

import scala.concurrent.duration._
import scala.concurrent.Await

case class Email(value: String) extends AnyVal
case class FirstName(value: String) extends AnyVal
case class LastName(value: String) extends AnyVal

case class User(firstName: FirstName, lastName: LastName, email: Email)

Because I don’t use Scala built-in types, I have to define the codecs and schemas of those custom types.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
implicit val firstNameTypeCodec: OptionalValueCodec[FirstName] =
  new OptionalValueCodec[FirstName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): FirstName = value match {
      case StringValue(string) => FirstName(string)
    }
    override protected def encodeNonNull(data: FirstName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val firstNameSchema: TypedSchemaDef[FirstName] =
  typedSchemaDef[FirstName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val lastNameTypeCodec: OptionalValueCodec[LastName] =
  new OptionalValueCodec[LastName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): LastName = value match {
      case StringValue(string) => LastName(string)
    }
    override protected def encodeNonNull(data: LastName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val lastNameSchema: TypedSchemaDef[LastName] =
  typedSchemaDef[LastName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val emailTypeCodec: OptionalValueCodec[Email] =
  new OptionalValueCodec[Email] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): Email = value match {
      case StringValue(string) => Email(string)
    }
    override protected def encodeNonNull(data: Email, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val emailSchema: TypedSchemaDef[Email] =
  typedSchemaDef[Email](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

A lot of boilerplate code, but that is the cost of reusing the domain model classes in the data persistence layer ;)


Subscribe to the newsletter and join the free email course.

Now, I can create an Akka stream which contains the data to be saved, and use the code from the Parquet4S documentation to store the data in parquet files.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val data = Stream.apply(User(FirstName("Test"), LastName("Test"), Email("mail@example.com")))

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

val writeOptions = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE,
  compressionCodecName = CompressionCodecName.SNAPPY
)

Source(data).runWith(ParquetStreams.toParquetParallelUnordered(
  path = "./users",
  parallelism = 4,
  options = writeOptions
))

Await.ready(actorSystem.terminate(), 2 minutes)

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group

Subscribe to the newsletter and get access to my free email course on building trustworthy data pipelines.

Do you want to work with me at riskmethods?

REMOTE position (available in Poland or Germany)