How to write to a Parquet file in Scala without using Apache Spark

What to do when you want to store something in a Parquet file when writing a standard Scala application, not an Apache Spark job? You can use the project created by my colleague — Parquet4S.

First, I am going to create a custom class with custom type parameters (I also included all of the imports in the first code snippet).

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import com.github.mjakubowski84.parquet4s._
import org.apache.parquet.schema.{OriginalType, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver._

import scala.concurrent.duration._
import scala.concurrent.Await

case class Email(value: String) extends AnyVal
case class FirstName(value: String) extends AnyVal
case class LastName(value: String) extends AnyVal

case class User(firstName: FirstName, lastName: LastName, email: Email)

Because I don’t use Scala built-in types, I have to define the codecs and schemas of those custom types.

implicit val firstNameTypeCodec: OptionalValueCodec[FirstName] =
  new OptionalValueCodec[FirstName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): FirstName = value match {
      case StringValue(string) => FirstName(string)
    }
    override protected def encodeNonNull(data: FirstName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val firstNameSchema: TypedSchemaDef[FirstName] =
  typedSchemaDef[FirstName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val lastNameTypeCodec: OptionalValueCodec[LastName] =
  new OptionalValueCodec[LastName] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): LastName = value match {
      case StringValue(string) => LastName(string)
    }
    override protected def encodeNonNull(data: LastName, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val lastNameSchema: TypedSchemaDef[LastName] =
  typedSchemaDef[LastName](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

implicit val emailTypeCodec: OptionalValueCodec[Email] =
  new OptionalValueCodec[Email] {
    override protected def decodeNonNull(value: Value, configuration: ValueCodecConfiguration): Email = value match {
      case StringValue(string) => Email(string)
    }
    override protected def encodeNonNull(data: Email, configuration: ValueCodecConfiguration): Value =
      StringValue(data.value)
  }

implicit val emailSchema: TypedSchemaDef[Email] =
  typedSchemaDef[Email](
    PrimitiveSchemaDef(
      primitiveType = PrimitiveType.PrimitiveTypeName.BINARY,
      required = true,
      originalType = Some(OriginalType.UTF8)
    )
  )

A lot of boilerplate code, but that is the cost of reusing the domain model classes in the data persistence layer ;)

Now, I can create an Akka stream which contains the data to be saved, and use the code from the Parquet4S documentation to store the data in parquet files.

val data = Stream.apply(User(FirstName("Test"), LastName("Test"), Email("mail@example.com")))

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

val writeOptions = ParquetWriter.Options(
  writeMode = ParquetFileWriter.Mode.OVERWRITE,
  compressionCodecName = CompressionCodecName.SNAPPY
)

Source(data).runWith(ParquetStreams.toParquetParallelUnordered(
  path = "./users",
  parallelism = 4,
  options = writeOptions
))

Await.ready(actorSystem.terminate(), 2 minutes)
Older post

Row number in Apache Spark window — row_number, rank, and dense_rank

This article is mostly a “note to self” because I don’t want to google that anymore ;)

Newer post

Calculating the cumulative sum of a group using Apache Spark

How to use the window function to calculate a cumulative sum