Athena performance tips explained

When we google AWS Athena performance tips, we get a few hints such as

  1. using partitions,

  2. retrieving only the columns we need,

  3. using LIMIT to get all rows instead of retrieving everything just to look at the first page of the results,

  4. specifying the larger table first while joining many tables,

  5. using LIMIT with ORDER BY, or

  6. being careful with WHERE combined with LIMIT

We may blindly apply those hints and get better results immediately. Instead of that, I am going to look carefully at each of those tips and show you why those hints give us better Athena performance.

What is AWS Athena?

According to the documentation, “Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL.” From the same documentation, we know that Amazon Athena is built on top of Presto - a Distributed SQL Query Engine for Big Data.

Why does it matter? It is crucial because I often see incorrect Athena usage. Even though Athena supports SQL as the query language, it is not a SQL database! It is not even a database!

So here are the facts:

Athena is NOT a SQL database! - It is a query engine that uses SQL as the query definition language.

Athena does NOT store any data! - It reads data from S3.

Athena does NOT store table definitions! - The table definitions are in the Glue Data Catalog.

Athena does NOT optimize the SQL queries! - It just generates MapReduce code that retrieves and processes the data.

How does Athena run a query?

We know that when we run an Athena query, it reads the table definitions from the Glue Data Catalog. After that, every of the Presto worker nodes reads some part of the data from S3 using the Presto Hive Connector.

Presto Hive Connector can read data from S3 (because S3 has an HDFS-compatible interface) and use the S3SelectPushdown feature to push the data filter predicates to S3 to minimize the amount of data it needs to retrieve.

After that, Presto processes the data using the query execution plan we are going to discuss later. When the output is ready, it writes it to the output location.

When you look at the Athena settings, you see that there is the output bucket parameter. The name of the bucket looks like this: s3://aws-athena-query-results-some_number-aws_region, and it keeps the results of the Athena queries.

Query execution plans

There is no way to display an execution plan in Athena. Instead of that, I configured Presto on an EMR cluster connected to the Glue Metadata Catalog used by Athena.

Let’s take a look at a query that counts the number of rows per group and its query execution plan.

SELECT col_1, col_2, count(*) FROM some_table GROUP BY col_1, col_2

- Output
  - RemoteStreamingExchange[GATHER]
    - Project
      - Aggregate(FINAL)
        - LocalExchange[HASH]
          - RemoteStreamingExchange[REPARTITION]
            - Aggregate(PARTIAL)
              - ScanProject

We see that the execution starts with the ScanProject step. In this step, the Presto workers retrieve files from S3 and extract the two columns we are going to need to perform the query. After that, each worker calculates the partial aggregate by grouping the data and counting the number of values in the files it retrieved from S3.

RemoteStreamingExchange follows that step. During the exchange, the workers transfer data between each other using the hash code of columns to decide which worker gets which part of the data.

After transferring the data, workers repartition the retrieved values again to get the right partial aggregates in the same partition and calculate the sum of the counts.

In the end, the workers get rid of unnecessary data to get only the columns I requested and return the value.

Athena hints

In the following examples, I am going to run the “explain” command of multiple select statements with and without following the Athena best practices, look at the execution plans, and explain the differences.

Of course, I can’t show you the table names and all of the elements of the execution plan (including columns, partitions, etc.) but it should be enough to look at the step names and the estimated amount of data that must be processed by them.

Use partitions

I have a table that has a timestamp column and a partition column that corresponds to the timestamp.

First, I am going to run the query without partitions: SELECT * FROM some_table WHERE ts >= 'date' AND ts < 'next_date'

In the query execution plan, we see that the execution starts with the ScanFilter step that is going to scan 323 GB of data. The only good part of this execution plan is the usage of S3SelectPushdown to filter the data in S3.

On the other hand, if I replace the filter by using the partitions which contain the data created on that day. SELECT * FROM some_table WHERE timepartition = 'the_right_partition' I get a query execution plan starting with TableScan (full scan, but scanning only one partition) but processing only 357 MB of data.

Retrieve only the columns you need

This hint looks like something obvious. After all, we intuitively know that the fewer columns we retrieve, the less data we need to scan.

On the other hand, quite often, I see “SELECT * FROM” queries because the user did not bother with specifying the columns. In this example, I show that such laziness wastes a lot of money (every TB of scanned data costs $5 in AWS us-east-1 region).

SELECT * FROM a_large_table

TableScan(...) Estimates: {rows: ... (323.24GB)}

SELECT col_1, col_2, col_3 FROM a_large_table

TableScan(...) Estimates: {rows: ... (46.95GB)}

Use LIMIT but be careful with WHERE + LIMIT

Instead of retrieving all rows just to look at the top 100 in Athena Web UI, we should explicitly limit the number of retrieved values.

Adding the LIMIT clause causes every Presto worker to keep only the required amount of data, which gets transferred to a single worker node during the RemoteStreamingExchange[GATHER] step.

Something surprising and counterintuitive happens when we use both LIMIT and WHERE in the same query. For example, if I run this SQL statement: SELECT * FROM the_table LIMIT 10, Athena scans 137.61 MB of data, but if I add a WHERE clause and run this query: SELECT * FROM the_table WHERE a_column='some_value' LIMIT 10, it scans 7.37 TB of data!

What happened? I want less data (after all, I used a WHERE to limit the dataset), and it needs to scan way, way more. What is wrong? The problem gets obvious and easy to explain when we look at the query execution plan:

- Output
  - Limit[10]
    - LocalExchange[SINGLE]
      - RemoteStreamingExchange[GATHER]
        - LimitPartial[10]
          - ScanFilter (...) Estimates: {rows: ... (668.05TB)}

Thankfully, the query is pushed to the data source, and instead of retrieving almost 670 TB of data, I have to retrieve only 7 TB. After that, processing continues as usual. Every worker returns 10 elements of the partitions processed by that worker. The data gets transferred to one worker node that reruns the limit step to get the final result.

The problem is that in the case of SELECT * FROM the_table LIMIT 10 statement, Athena can return any 10 rows from the table. In the second query, it must find the rows that match the predicate first and then pick the 10 rows to return.

When joining tables, specify the largest table first

For everyone who was using relational databases, this hint seems odd. The order of tables in the SQL statement should not matter. If it matters, it is a job of the query optimizer to figure out what is the best way to execute the query. That is right, but Athena is not a SQL database.

In this example, I am going to perform an inner join, but the same thing happens during outer joins too. Assume that I have two tables that significantly differ in size. I run this SQL statement: SELECT * FROM larger_table l, smaller_table s WHERE l.col = s.col

In the query execution plan, we see that Presto has to repartition the data from the smaller table to exchange it between nodes and then repartition it again to perform the join. In comparison, the larger table is repartitioned only once.

- Output
  - RemoteStreamingExchange[GATHER]
    - InnerJoin
      - RemoteStreamingExchange[REPARTITION] (8.21 GB)
        - ScanProject (larger table)
      - LocalExchange[HASH] (563.92MB)
        - RemoteStreamingExchange[REPARTITION]
          - ScanProject (smaller table)

If I flip the order of tables in this query: SELECT * FROM smaller_table s, larger_table l WHERE l.col = s.col

I get a worse execution plan because the larger table must be processed in two steps.

- Output
  - RemoteStreamingExchange[GATHER]
    - InnerJoin
      - RemoteStreamingExchange[REPARTITION] (563.92MB)
        - ScanProject (smaller table)
      - LocalExchange[HASH] (8.21 GB)
        - RemoteStreamingExchange[REPARTITION]
          - ScanProject (larger table)

The query execution plan also displays the estimated CPU load. In the case of the first query, the estimated CPU load is around 30% smaller than in the second query.

LIMIT with ORDER BY

Again, the lazy way of using Athena is to order the rows and look at the top n results in Athena Web UI instead of limiting the number of output values.

Once more, we get way better performance by explicitly limiting the number of rows not only because Athena needs to scan less data but also because it can generate a more performant query execution plan.

In the query without the LIMIT clause, the workers do partial sorting, pass the data to a single worker node that produces the final result and let it sort the data. Even though it can perform a merge sort, this is still a massive waste of resources.

SELECT * FROM a_table ORDER BY col

- Output
  - RemoteStreamingMerge
    - LocalMerge
      - PartialSort
        - RemoteStreamingExchange[REPARTITION]
          - TableScan

If I know that I am going to look at the top 10000 rows anyway, I should explicitly specify the number of rows I get. This allows Athena to return only the top 10000 rows from every intermediate worker, exchange less data, and process the query faster.

SELECT * FROM a_table ORDER BY col LIMIT 10000

- Output
  - TopN[10000 by (col ASC_NULLS_LAST)]
    - LocalExchange[Single]
      - RemoteStreamingExchange[GATHER]
        - TopNPartial[10000 by (col ASC_NULLS_LAST)]
          - TableScan

Summary

Now, we see that the Athena hints and best practices are not tribal wisdom learned by comparing the query execution times but something that is based on the characteristics of the software used to build Athena.

Older post

Data flow - what functional programming and Unix philosophy can teach us about data streaming

How to write data stream processing code that is easy to maintain

Newer post

How does a Kafka Cluster work?

What is the difference between a leader and a replica broker? What is the cluster controller? How is the controller elected?