Luigi review
Marton Trencseni - Sun 20 December 2015 - Data
Introduction
Luigi is an execution framework for writing data pipes in Python code. It supports task-task dependencies, it has a simple central scheduler with an HTTP API and an extensive library of helpers for building data pipes for Hadoop, AWS, Mysql etc. It was written by Spotify for internal use and open sourced in 2012. A number of companies use it, such as Foursquare, Stripe, Asana.
Execution
Suppose that part of your ETL process is to take some data A, apply transformation X on it, and save it as Y. In Luigi, you would write a .py
file which contains a class X, which derives from class Task
. X would have three methods: requires(), run(), and output()
.
1 2 3 4 |
|
Then you execute luigi and pass this .py
file to it, like luigi --module x X
if the file name is x.py
. When given a Task
, luigi:
- Calls the
output()
method, which returns one or more objects deriving from classTarget
. ATarget
is something which has anexists()
method which returns eitherTrue
orFalse
. Luigi callsexists()
on all the targets to see whether they exist. If all returnTrue
, luigi will flag this task asDONE
and never callrun()
. If at least one of the output targets returnedFalse
, this job needs to be run. - Luigi then calls the
requires()
method to see what other tasks need to first run for this task to run successfully.requires()
returns one or more objects deriving from classTask
, and recursively performs this process for all those. Note: after returning, luigi checks whether the output targets of the required tasks really exists. This is encapsulated in thecomplete()
method, the default implementation just callsexists()
on all targets returned byoutput()
; the method can optionally be overridden in the derivedTarget
class. The purpose ofcomplete()
is to make surerun()
was successful, because if a required target’srun()
didn’t raise a Python exception but didn’t actually produce the output needed, thenrun()
shouldn’t be called. In this case the required task is re-run. - Luigi calls the
run()
method and sets the task status toDONE
if no Python exceptions were raised. Note:run()
can also dynamicallyyield
dependencies tasks.
Local and central scheduler
When luigi is launched and a task is given to it a Worker
object is created. Workers need to talk to a Scheduler
, which manages the dependency graph of tasks and tells workers what to do. So when the local worker object is created, it can either:
- Create a local scheduler in the process, or
- Connect to a remote scheduler using the HTTP API. This is the default.
Local scheduler: The local scheduler can be used by passing --local-scheduler
to the luigi runtime. When running with the local scheduler, the algorithm given above is run recursively, and then luigi exits. This is usually only used for testing and debugging purposes.
Central scheduler: More interesting is the central scheduler. The central scheduler is a separate luigid
Python Tornado app that workers can talk to over HTTP. It performs two tasks: scheduling of tasks based on the dependency graph and serving a simple web dashboard on port 8082 (default). Note that the central scheduler:
- Doesn’t see or execute the
Task
object's code, hence it never sees or checks whether targets exist; this is always performed by workers. - The task is identified by its signature:
- Python name of the class; in the example above it’s X.
- The values of the parameters passed to the task, eg.
day=2015-12-01
.
Parameters are member variables in the Task
objects which derive from class Parameter
, eg.:
1 2 3 |
|
By specifying significant=False
in the Parameter
constructor, we can tell Luigi not to treat it as part of the task signature.
The worker builds the local dependency graph and then uploads it to the central scheduler. Then it asks the central scheduler what it should do. The central scheduler potentially receives dependency graphs from several workers, and merges them, assuming tasks with the same name (and parameter values) uploaded from different workers are the same (generate the same output()
targets, contain the same run()
logic, etc).
Given the dependency graph, the central scheduler then tells workers to start running tasks. A worker can only run tasks that it uploaded to the central scheduler, because those are the tasks that that Python process loaded. So workers are not generic workers, they can only work on the tasks that they were started with!
Given a dependency graph, the scheduler will tell workers to run tasks that have no dependencies. By default, the order is non-deterministic. However, tasks can specify a priority, tasks with higher priority run first. The default priority is 0. Example:
1 2 3 4 5 6 7 |
|
Because priorities are in code, the worker must evaluate them and pass it on to the central scheduler.
Local parallelism
More than 1 worker thread can be created by passing --workers N
to luigi. This is registered to the central scheduler, and if possible N tasks are run in parallel by one worker.
So there are multiple levels of parallelism in Luigi:
1. Multiple workers
2. Multiple threads in workers
3. Each task can have further parallelism, eg. a Hadoop MapReduce job.
Managing a library of tasks
What if we’re managing a library of 100s or 1000s of ETL jobs? While I haven’t used Luigi for this, it seems that the basic building blocks are:
- Python
import
statements: our jobs are distributed into different.py
files, so we need toimport
them to use them inrequires()
. WrapperTask
objects: these are special sink tasks which don’t have an output, they just require other tasks to be run.
This part puts a lot of work on the user of Luigi:
1. If we create a new task and forget to add it to the sink task, it won’t be executed (unless it’s a dependency for something else).
2. If we refactor a job (eg. rename the task class, change parameters), we have to search and replace all references in subsequent requires()
methods. Since Python isn’t a statically typed language, this has to be done by hand.
3. If running workers on separate machines, it’s our job to synchronize the library of .py
files (eg. using git
and cron
jobs to sync very often). Different versions of tasks with different logic or local, uncommitted changes propagating to the central scheduler will lead to hard to find bugs and data corruption.
Date parameters
In an ETL system, most tasks will have a date(time) parameter which tells the code which day/hour to run the scripts for. For example, a Daily Active User (DAU) script computes the number of unique DAUs for a given day. Because this is such a common use-case, Luigi has a number of helper classes for dealing with date parameters:
- DateParameter
- MonthParameter
- YearParameter
- DateHourParameter
- DateMinuteParameter
- DateIntervalParameter
Often tasks have to be re-run for a number of days. One way to do this is to call luigi repeatedly from the command line. Or we can use the built in RangeDailyBase
(also RangeHourlyBase
) helpers:
1 2 3 4 |
|
The name of the date parameter of the task can be specified with --param_name==
.
When we pass in a large number of dates (as an interval), the RangeXBase
classes will instantiate a task object for each date and call complete()
to check whether that task needs to be run. This can be very slow, eg. if each one creates a database connection and then closes it down.
There are two optimization classes RangeDaily
and RangeHourly
that solve this problem. These are used just like the two Base
versions from the command line. But instead of instantiating many tasks which potentially don’t have to be run, they assume and call the task’s bulk_complete()
classmethod to get a list of dates which have to be run. So the user has to implement a bulk_complete()
to use RangeDaily
and RangeHourly
.
1 |
|
Note: it seems Luigi doesn’t support bulk running of parameter intervals.
Scheduling
Most ETL systems have jobs which need to run every hour or every day. Luigi doesn’t have a concept of calendar scheduling, this is up to the user. The recommended method by the authors is to create sink tasks and run them from cron
when the external input files (eg. raw log files) are likely to be available.
Rescheduling failed tasks is influenced by the following parameters in the central scheduler’s luigi.cfg
:
retry-delay
: when to re-schedule, default 900 secondsremove-delay
: how long the central scheduler keeps tasks around that have no stakeholder; a stakeholder is a worker who uploaded that taskdisable-hard-timeout
: if a task fails again after this much time, it is disabled for good
In the worker’s luigi.cfg
:
worker-keep-alive
: you probably need to set this to true, so workers will stay alive when they run out of jobs to run, as long as they have some pending job waiting to be run. Otherwise workers will disconnect from the central scheduler and exit if there’s nothing to do, even if there are tasks which will be scheduled a few minutes from now.retry-external-tasks
: If true, incomplete external tasks (i.e. tasks where therun()
method isNotImplemented
) will be retested for completion while Luigi is running. This means that if external dependencies are satisfied after a workflow has started, any tasks dependent on that resource will be eligible for running.
The central scheduler has a feature called task history. This logs task completion to a database, and exposes it on the dashboard.
For tasks where the output is a database table, Luigi needs to keep track of successful inserts. It uses a special marker table for this (set with marker-table
in luigi.cfg
, default name is table_updates
). When a task finishes whose target is a database table, an entry is created in the marker table with the task’s task_id
(its name and parameter values). When the target’s exists()
method is called, this marker table is queried to check whether the task has been run (the task_id
is passed by the task to the Target
in its constructor).
Resources
Resources can be used to introduce limits on task parallelism. For example, suppose we never want to run more than 10 mysql tasks, or we never want to run more than 3 instances of the hourly job count_users
.
Resources are declared in the luigi.cfg
file of the scheduler:
1 2 3 |
|
Resource use is given in the resources property of the task object in the Python code, like:
1 |
|
Contrib stuff
Luigi has an impressive library of stock Target
and Task
classes, each with lots of functionality baked in as helper methods. This is the big reason why I think Luigi is popular and why I would consider using it.
Luigi has Task
and Target
classes which support:
- Google Bigquery
- Hadoop jobs
- Hive queries
- Pig queries
- Scalding jobs
- Spark jobs
- Postgresql, Redshift, Mysql tables
- and more…
Source code and tests
I spent a fair amount of time digging through the Luigi Python source code. It’s pretty clean Python code with a lot of tests. Code size is about 18KLOC plus 16KLOC tests. It’s pretty easy to understand and extend.
Sample cases
Trying it out on a free cloud9 Docker instance:
1 2 3 4 5 6 7 |
|
In another terminal, this is the default Luigi sample to try:
1 2 3 4 5 |
|
Let’s play around with Luigi. Let’s create this x.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
|
And run it like:
1 |
|
This will create an X
task and 10 Bar
tasks. The 10 Bar
tasks will touch /tmp/bar/…
and that’s it.
Let’s delete the tmp files, and create a similarly named y.py, with identical X
and Bar
tasks, except X
renamed to Y
. Let’s launch two workers, one with x and one with y. Notice that the central scheduler will merge the dependency graphs and treat the Bar
tasks coming from the different workers/codes as the same, because their task_id
(class name plus parameters) are identical. It’s a bit weird, but this is how Luigi works. Another thing you’ll notice is that at the end of the execution, one of X
and Y
will be unfinished (not green on the dashboard). This is because the workers are run without --worker-keep-alive
. So the first worker who finishes its tasks and is waiting for the other worker to finish the last Bar
will exit (it’s got nothing to do). If that worker was eg. the x worker, then task X
is not going to be run by anyone! if we turn on --worker-keep-alive
in the command-line, this oddity goes away.
Conclusion
When designing an ETL framework, I would make (and have made) different design decisions compared to Luigi. But if I were tasked with creating a new ETL framework from scratch (eg. at a new company), I would definitely consider using Luigi. There is simply too much useful stuff there to ignore (and re-implement). However, I would expect to:
- Find unexpected and painful behaviour in Luigi.
- Write significant scaffolding code to make it useful:
- Syncing the task library to different workers
- Scheduling series of tasks
- Monitoring
- Alerting
- Dashboard for the ETL datasets and jobs (see below)
Downsides of Luigi:
- Sometimes unexpected behaviour: for example, a wrapper task can reach
DONE
status without ever running therun()
method depending on non-deterministic execution order. - The biggest downside to Luigi is that ETL jobs are specified as programmatic Python Task objects and not given is some sort of DSL. This means no external tool can reasonably/easily parse a library of tasks and extract dependency information, which would be useful for eg. generating documentation of the ETL system. Also, analysts have to learn Python.
- The web dashboard of the central scheduler is basically useless.