Optimizing waits in Airflow

Marton Trencseni - Sat 01 February 2020 - Data

Introduction

Sometimes I get to put on my Data Engineering hat for a few days. I enjoy this because I like to move up and down the Data Science stack and I try to keep myself sharp technically. Recently I was able to spend a few days optimizing our Airflow ETL for speed. We noticed that DWH jobs with a lots of waits are taking a lot of time to complete the waits (not counting the actual waiting time). Below is a list of changes I made to improve our waiting time.

Our history of waiting on tables

The basic premise is this. Suppose you have a DWH job that creates the latest ds partition for result_table, and the INSERT is a result of a SELECT like:

INSERT INTO result_table
SELECT
    ...
FROM
    table1
INNER JOIN
    table2
ON
    ...
INNER JOIN
    table3
ON
    ...

In our ETL, we would write this like:

INSERT INTO result_table
SELECT
    ...
FROM
    curent_ds_wait::table1
INNER JOIN
    curent_ds_wait::table2
ON
    ...
INNER JOIN
    curent_ds_wait::table3
ON
    ...

Our framework parses the SQL snippets and extracts tables names after current_ds_wait::. These are the list of tables where we need to wait for today's ds partition to land before we can run the SELECT (otherwise the result would be incomplete).

I described ds partitions in an earlier post:

The other major design pattern from Facebook is the idea of daily partitioned tables. This is a feature available on Hive, and not really practical on eg. Redshift. Essentially we store (complete) daily, write-once slices of each table, which are generated by daily jobs. The partitions are called ds at Facebook and logically show up as a column of the table, and you’ll find plenty of references to it if you read the Hive docs (because Hive was written at Facebook). Physically, these are essentially directories, each one holding the data files for that day’s data. We use S3, so in our case it looks something like s3://dwh-bucket/<table>/<ds>/<data_files>. For example, s3://dwh-bucket/company_metrics/2018-03-01/datafile.

So when our framework generates the DAG for this DWH job, it generates an insert task (PrestoOperator operator), which depends on 3 wait tasks (DsPartitionSensor operators), one for each table. There a bunch of other tasks that we generate (such as tasks for running CREATE TABLE IF NOT EXISTS), but let’s ignore that.

So this part of the DAG looks like:

    insert
    |
    +-- wait_table1
    |
    +-- wait_table2
    |
    +-- wait_table3

Chaining waits

Initially, the wait jobs issued a Presto SQL statement like:

SHOW PARTITIONS FROM {table} WHERE ds = '{ds}' LIMIT 1

The first thing we noticed is that this overloaded our Presto cluster. We have ~100 jobs, and each has a couple of waits, so this results in hundreds of waits trying to run at the same time. Also, since we only have a limited number of worker slots on our Airflow worker, sometimes the waits would use up all the slots, and the actual inserts never ran, or spent a long time in the queue, waiting to be executed.

So one of the initial optimizations was to chain the waits on the DAG, like:

    insert
    |
    +-- wait_table1
          |
          +-- wait_table2
               |
               +-- wait_table3

This way each DAG only ever has one wait job running. The wait jobs per DAG run sequentally. This change was easy to make, because we don't construct our DAGs "by hand" for each table, we have a helper function which does this (which also does the current_ds_wait:: stuff), so we just needed to make this change in one place.

Task pools

The second thing we tried was to use Airflow’s pool feature. With this, tasks can be assigned to pools, and per pool limits can be set on execution. So if we have 32 worker slots, we can set up a wait pool with 24 slots, so no more than 24 waits can be running.

Unfortunately, this feature in Airflow is buggy/broken. In our setup, where we’re running a separate master and worker, and using Celery for running worker tasks, the Airflow scheduler doesn’t respect the limits, similar to this bug report.

Hive instead of Presto

Since all our DWH jobs run on Presto, our Hive execution engine is just sitting around idle handling metadata queries such as CREATE TABLE (create tasks in the DAG) . So by running the SHOW PARTITION (the syntax starts the same, but it’s a bit different on Hive) on Hive, we can get rid of 95% of the jobs on the Presto cluster, which were taking a long time to run, even though they’re just checking for the presence of a partition. The Hive engine can handle these metadata queries easily, returning in less than a second.

Multiwaits

In the example above, we’re waiting on 3 tables, and we generate 3 wait jobs. We realized this is inefficient, and we can just have one multi_wait task which checks all 3 partitions at once. We just generate a HQL with several SHOW PARTITION statements separated by ; and parse the resulting string to see what’s there and what’s missing. So the final DAG looks very simple:

    insert
    |
    +-- multi_wait

This is such an obvious idea..

Reducing time between jobs

Looking at the scheduler logs, we still have a problem: the multi_wait finishes at time X, but the insert is only launched at time X+5 minutes. This is a generic issue with Airflow, not specific to waits. Why does Airflow need 5 minutes to figure out that a task’s dependencies are all finished?

To understand this I looked through the logs, and found that indeed, the first time in the logs that Airflow notices that the insert can run is several minutes after the multi_wait finishes. To understand this I took the log line and looked it up in the Airflow source code. What happens is this:

Every 30 seconds the Airflow scheduler (as configured) lists out all .py files in the dags folder. It saves these known .py files, and then in a “round-robin” manner, executes them: it runs one.py, two.py, three.py, and so on, where each of the .py files is a DAG definition in our case. Each time it executes the .py file, it looks at instances of the DAG class in the global namespace, and those are the DAGs it executes. The problem is, the Airflow scheduler only checks for new runnable tasks (ie. all dependencies are finished) when it’s running the appropriate .py file! This is a very unfortunate architectural choice. And this explains why it takes ~5 minutes between task executions: we have about ~100 ETL jobs in ~100 .py files, and running a .py file takes 3-5 seconds. The reason it takes 3-5 seconds to execute a 100 line Python program is because they have to import Airflow libraries (to get DAG class, etc), and those Airflow imports take time:

from airflow.operators import PrestoOperator, DropDsPartitionOperator, DsPartitionSensor, PostgresOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.hooks.presto_hook import PrestoHook
from airflow.hooks.base_hook import BaseHook
from airflow import DAG

I asked the following question in the Airflow slack #support channel:

I'm trying to debug my Airflow (1.8.2), we've been using it in prod for ~2 yrs. My issue is that it takes a long time between task runs. Ie. task X is waiting on task Y to finish in a DAG. Y finishes, and then it takes ~5 minutes for X to get queued and executed. Overall these 5 mins add up and add hours to the ETL running time.

I've been doing some debugging, and looking at the Airflow source code; what I found so far: - for a task to be run, all upstream tasks have to finished, and the task has to be in SCHEDULED state: jobs.py::_execute_task_instances() called like _execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) - a task goes from None state to SCHEDULED state in jobs.py::process_file(), which corresponds to lines like Started a process (PID: 28897) to generate tasks for ... lines in my syslog - by default my tasks are in None state (I see this in Task Instance Details view on web UI). - I have ~100 DAG python files, each takes ~3 seconds to execute to collect the DAG, so a "roundtrip" takes ~300secs = 5mins - so I'm guessing this is what's causing the ~5 minute delay, that each DAG python file is re-read every 5 mins, and that's when Airflow realizes that the deps are good and makes it SCHEDULED. Correct me if I'm wrong. What's confusing to me is, why does Airflow need to re-read the file to notice that all upstream tasks are good to go?

I received no clear answer, other than a link about profiling the Airflow scheduler.

Given this limitation, the obvious workaround is to put several DAGs into one .py file, thus saving time on imports. For example, right now we have one .py file per table import from the production database, which is very nice in terms of code layout in the IDE, and in terms of following changes on git. But we could put all these into one big .py file, and have one big .py file per “type” of DAG (eg. one file for imports, one for exports, etc).

I haven’t yet made up my mind whether we should do this: it feels wrong to sacrifice everyday engineering UX for an accidental architectural flaw in the ETL system.

Further optimizations

Other ideas:

  • talk straight to the Hive metastore
  • cache existing partitions once we know they're there

These are good idea, but at this point (repeatedly) querying Hive with SHOW PARTITIONs is not a bottleneck, so it wouldn't help us.

Conclusion

Airflow is the 5th ETL tool I use: we wrote 3 hand-rolled ETL system at Prezi (one in bash, one in Haskell, one in Go), at Facebook we used Dataswarm, and at Fetchr we use Airflow (which is based on Dataswarm). I think it’s great that we have Airflow, because it’s miles better than a hand-rolled ETL system. Also, as it matures, it will get better!

Having said that, I hope the open source Data Engineering community will improve Airflow in the future to address these issues. My problems with Airflow are three-fold:

  • bugs:
    • the unreliable pools feature
    • Airflow can get stuck in various ways:
      • all worker slots are used up by tasks which block; in this case, we have to ps ax | grep airflow | grep wait | awk ' { print kill -9 $1 } ' on the worker
      • sometimes tasks get stuck in null or queued status; in this case, we have to manually re-kick them on the UI
      • sometimes the scheduler itself runs into a bug and gets stuck; in this case we have to restart the scheduler itself on the master with systemctl restart airflow-scheduler.service
  • architectural shortcomings
    • only making progress on the DAG when re-running the .py file containing the DAG
    • right now, for every task instance Airflow launches a new, expensive Python process on the worker node, which takes hundreds of MBs of memory and the turn-around time is quite slow; it'd be nice to come up with an "in-process" way to launch small inexpensive checks quickly (like waits)
  • resource hungry: we run two beefy EC2 instances, one airflow-master and one airflow-worker, but all these really do is manage a relatively small DAG (~100 DAGs each with ~10 tasks); the actual work is performed on a third node (actually, a cluster), the EC2 nodes that are running Presto and the various ML jobs; still, both nodes show 2-3 load with top.

These are real issues that affect our production every day in terms of landing time, dollars and engineering time:

  • Airflow is slow to make progress
  • we often have to manually kill / clear / re-kick jobs
  • we run two EC2 nodes just for Airflow (master and worker)

I definitely don’t regret using Airflow, but it would be nice if the core engine itself would be more efficient, performant, and less wasteful. 10 years ago I was a proud C++ programmer, building database kernels and storage engines, optimizing away bits and bytes. Today, because I try to move fast and focus on impact—which is the right thing to do, despite these issues—throwing hardware and money at a simple problem of managing a small DAG is the best option. Feels weird.