Optimizing waits in Airflow
Marton Trencseni - Sat 01 February 2020 - Data
Introduction
Sometimes I get to put on my Data Engineering hat for a few days. I enjoy this because I like to move up and down the Data Science stack and I try to keep myself sharp technically. Recently I was able to spend a few days optimizing our Airflow ETL for speed. We noticed that DWH jobs with a lots of waits are taking a lot of time to complete the waits (not counting the actual waiting time). Below is a list of changes I made to improve our waiting time.
Our history of waiting on tables
The basic premise is this. Suppose you have a DWH job that creates the latest ds
partition for result_table
, and the INSERT
is a result of a SELECT
like:
INSERT INTO result_table
SELECT
...
FROM
table1
INNER JOIN
table2
ON
...
INNER JOIN
table3
ON
...
In our ETL, we would write this like:
INSERT INTO result_table
SELECT
...
FROM
curent_ds_wait::table1
INNER JOIN
curent_ds_wait::table2
ON
...
INNER JOIN
curent_ds_wait::table3
ON
...
Our framework parses the SQL snippets and extracts tables names after current_ds_wait::
. These are the list of tables where we need to wait for today's ds
partition to land before we can run the SELECT
(otherwise the result would be incomplete).
I described ds
partitions in an earlier post:
The other major design pattern from Facebook is the idea of daily partitioned tables. This is a feature available on Hive, and not really practical on eg. Redshift. Essentially we store (complete) daily, write-once slices of each table, which are generated by daily jobs. The partitions are called
ds
at Facebook and logically show up as a column of the table, and you’ll find plenty of references to it if you read the Hive docs (because Hive was written at Facebook). Physically, these are essentially directories, each one holding the data files for that day’s data. We use S3, so in our case it looks something likes3://dwh-bucket/<table>/<ds>/<data_files>
. For example,s3://dwh-bucket/company_metrics/2018-03-01/datafile
.
So when our framework generates the DAG for this DWH job, it generates an insert
task (PrestoOperator
operator), which depends on 3 wait
tasks (DsPartitionSensor
operators), one for each table. There a bunch of other tasks that we generate (such as tasks for running CREATE TABLE IF NOT EXISTS
), but let’s ignore that.
So this part of the DAG looks like:
insert
|
+-- wait_table1
|
+-- wait_table2
|
+-- wait_table3
Chaining waits
Initially, the wait
jobs issued a Presto SQL statement like:
SHOW PARTITIONS FROM {table} WHERE ds = '{ds}' LIMIT 1
The first thing we noticed is that this overloaded our Presto cluster. We have ~100 jobs, and each has a couple of wait
s, so this results in hundreds of wait
s trying to run at the same time. Also, since we only have a limited number of worker slots on our Airflow worker, sometimes the wait
s would use up all the slots, and the actual insert
s never ran, or spent a long time in the queue, waiting to be executed.
So one of the initial optimizations was to chain the waits on the DAG, like:
insert
|
+-- wait_table1
|
+-- wait_table2
|
+-- wait_table3
This way each DAG only ever has one wait
job running. The wait
jobs per DAG run sequentally. This change was easy to make, because we don't construct our DAGs "by hand" for each table, we have a helper function which does this (which also does the current_ds_wait::
stuff), so we just needed to make this change in one place.
Task pools
The second thing we tried was to use Airflow’s pool feature. With this, tasks can be assigned to pools, and per pool limits can be set on execution. So if we have 32 worker slots, we can set up a wait
pool with 24 slots, so no more than 24 wait
s can be running.
Unfortunately, this feature in Airflow is buggy/broken. In our setup, where we’re running a separate master and worker, and using Celery for running worker tasks, the Airflow scheduler doesn’t respect the limits, similar to this bug report.
Hive instead of Presto
Since all our DWH jobs run on Presto, our Hive execution engine is just sitting around idle handling metadata queries such as CREATE TABLE
(create
tasks in the DAG) . So by running the SHOW PARTITION
(the syntax starts the same, but it’s a bit different on Hive) on Hive, we can get rid of 95% of the jobs on the Presto cluster, which were taking a long time to run, even though they’re just checking for the presence of a partition. The Hive engine can handle these metadata queries easily, returning in less than a second.
Multiwaits
In the example above, we’re waiting on 3 tables, and we generate 3 wait
jobs. We realized this is inefficient, and we can just have one multi_wait
task which checks all 3 partitions at once. We just generate a HQL with several SHOW PARTITION
statements separated by ;
and parse the resulting string to see what’s there and what’s missing. So the final DAG looks very simple:
insert
|
+-- multi_wait
This is such an obvious idea..
Reducing time between jobs
Looking at the scheduler logs, we still have a problem: the multi_wait
finishes at time X, but the insert is only launched at time X+5 minutes. This is a generic issue with Airflow, not specific to wait
s. Why does Airflow need 5 minutes to figure out that a task’s dependencies are all finished?
To understand this I looked through the logs, and found that indeed, the first time in the logs that Airflow notices that the insert can run is several minutes after the multi_wait
finishes. To understand this I took the log line and looked it up in the Airflow source code. What happens is this:
Every 30 seconds the Airflow scheduler (as configured) lists out all .py files in the dags
folder. It saves these known .py
files, and then in a “round-robin” manner, executes them: it runs one.py
, two.py
, three.py
, and so on, where each of the .py
files is a DAG definition in our case. Each time it executes the .py
file, it looks at instances of the DAG
class in the global namespace, and those are the DAG
s it executes. The problem is, the Airflow scheduler only checks for new runnable tasks (ie. all dependencies are finished) when it’s running the appropriate .py
file! This is a very unfortunate architectural choice. And this explains why it takes ~5 minutes between task executions: we have about ~100 ETL jobs in ~100 .py
files, and running a .py
file takes 3-5 seconds. The reason it takes 3-5 seconds to execute a 100 line Python program is because they have to import
Airflow libraries (to get DAG
class, etc), and those Airflow import
s take time:
from airflow.operators import PrestoOperator, DropDsPartitionOperator, DsPartitionSensor, PostgresOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.hooks.presto_hook import PrestoHook
from airflow.hooks.base_hook import BaseHook
from airflow import DAG
I asked the following question in the Airflow slack #support channel:
I'm trying to debug my Airflow (1.8.2), we've been using it in prod for ~2 yrs. My issue is that it takes a long time between task runs. Ie. task X is waiting on task Y to finish in a DAG. Y finishes, and then it takes ~5 minutes for X to get queued and executed. Overall these 5 mins add up and add hours to the ETL running time.
I've been doing some debugging, and looking at the Airflow source code; what I found so far: - for a task to be run, all upstream tasks have to finished, and the task has to be in
SCHEDULED
state:jobs.py::_execute_task_instances()
called like_execute_task_instances(simple_dag_bag, (State.SCHEDULED,))
- a task goes fromNone
state toSCHEDULED
state injobs.py::process_file()
, which corresponds to lines likeStarted a process (PID: 28897) to generate tasks for ...
lines in my syslog - by default my tasks are inNone
state (I see this in Task Instance Details view on web UI). - I have ~100 DAG python files, each takes ~3 seconds to execute to collect the DAG, so a "roundtrip" takes ~300secs = 5mins - so I'm guessing this is what's causing the ~5 minute delay, that each DAG python file is re-read every 5 mins, and that's when Airflow realizes that the deps are good and makes itSCHEDULED
. Correct me if I'm wrong. What's confusing to me is, why does Airflow need to re-read the file to notice that all upstream tasks are good to go?
I received no clear answer, other than a link about profiling the Airflow scheduler.
Given this limitation, the obvious workaround is to put several DAGs into one .py
file, thus saving time on import
s. For example, right now we have one .py
file per table import
from the production database, which is very nice in terms of code layout in the IDE, and in terms of following changes on git
. But we could put all these into one big .py
file, and have one big .py
file per “type” of DAG (eg. one file for imports, one for exports, etc).
I haven’t yet made up my mind whether we should do this: it feels wrong to sacrifice everyday engineering UX for an accidental architectural flaw in the ETL system.
Further optimizations
Other ideas:
- talk straight to the Hive metastore
- cache existing partitions once we know they're there
These are good idea, but at this point (repeatedly) querying Hive with SHOW PARTITION
s is not a bottleneck, so it wouldn't help us.
Conclusion
Airflow is the 5th ETL tool I use: we wrote 3 hand-rolled ETL system at Prezi (one in bash, one in Haskell, one in Go), at Facebook we used Dataswarm, and at Fetchr we use Airflow (which is based on Dataswarm). I think it’s great that we have Airflow, because it’s miles better than a hand-rolled ETL system. Also, as it matures, it will get better!
Having said that, I hope the open source Data Engineering community will improve Airflow in the future to address these issues. My problems with Airflow are three-fold:
- bugs:
- the unreliable pools feature
- Airflow can get stuck in various ways:
- all worker slots are used up by tasks which block; in this case, we have to
ps ax | grep airflow | grep wait | awk ' { print kill -9 $1 } '
on the worker - sometimes tasks get stuck in
null
orqueued
status; in this case, we have to manually re-kick them on the UI - sometimes the scheduler itself runs into a bug and gets stuck; in this case we have to restart the scheduler itself on the master with
systemctl restart airflow-scheduler.service
- all worker slots are used up by tasks which block; in this case, we have to
- architectural shortcomings
- only making progress on the DAG when re-running the
.py
file containing theDAG
- right now, for every task instance Airflow launches a new, expensive Python process on the worker node, which takes hundreds of MBs of memory and the turn-around time is quite slow; it'd be nice to come up with an "in-process" way to launch small inexpensive checks quickly (like
wait
s)
- only making progress on the DAG when re-running the
- resource hungry: we run two beefy EC2 instances, one airflow-master and one airflow-worker, but all these really do is manage a relatively small DAG (~100 DAGs each with ~10 tasks); the actual work is performed on a third node (actually, a cluster), the EC2 nodes that are running Presto and the various ML jobs; still, both nodes show 2-3 load with
top
.
These are real issues that affect our production every day in terms of landing time, dollars and engineering time:
- Airflow is slow to make progress
- we often have to manually kill / clear / re-kick jobs
- we run two EC2 nodes just for Airflow (master and worker)
I definitely don’t regret using Airflow, but it would be nice if the core engine itself would be more efficient, performant, and less wasteful. 10 years ago I was a proud C++ programmer, building database kernels and storage engines, optimizing away bits and bytes. Today, because I try to move fast and focus on impact—which is the right thing to do, despite these issues—throwing hardware and money at a simple problem of managing a small DAG is the best option. Feels weird.