Luigi review

Posted on Sun 20 December 2015 in Data

Introduction

Luigi is an execution framework for writing data pipes in Python code. It supports task-task dependencies, it has a simple central scheduler with an HTTP API and an extensive library of helpers for building data pipes for Hadoop, AWS, Mysql etc. It was written by Spotify for internal use and open sourced in 2012. A number of companies use it, such as Foursquare, Stripe, Asana.

Execution

Suppose that part of your ETL process is to take some data A, apply transformation X on it, and save it as Y. In Luigi, you would write a .py file which contains a class X, which derives from class Task. X would have three methods: requires(), run(), and output().

1
2
3
4
class X(luigi.Task):
    def requires(self): ...
    def run(self): ...
    def output(self): ...

Then you execute luigi and pass this .py file to it, like luigi --module x X if the file name is x.py. When given a Task, luigi:

  1. Calls the output() method, which returns one or more objects deriving from class Target. A Target is something which has an exists() method which returns either True or False. Luigi calls exists() on all the targets to see whether they exist. If all return True, luigi will flag this task as DONE and never call run(). If at least one of the output targets returned False, this job needs to be run.
  2. Luigi then calls the requires() method to see what other tasks need to first run for this task to run successfully. requires() returns one or more objects deriving from class Task, and recursively performs this process for all those. Note: after returning, luigi checks whether the output targets of the required tasks really exists. This is encapsulated in the complete() method, the default implementation just calls exists() on all targets returned by output(); the method can optionally be overridden in the derived Target class. The purpose of complete() is to make sure run() was successful, because if a required target’s run() didn’t raise a Python exception but didn’t actually produce the output needed, then run() shouldn’t be called. In this case the required task is re-run.
  3. Luigi calls the run() method and sets the task status to DONE if no Python exceptions were raised. Note: run() can also dynamically yield dependencies tasks.

Local and central scheduler

When luigi is launched and a task is given to it a Worker object is created. Workers need to talk to a Scheduler, which manages the dependency graph of tasks and tells workers what to do. So when the local worker object is created, it can either:

  1. Create a local scheduler in the process, or
  2. Connect to a remote scheduler using the HTTP API. This is the default.

Local scheduler: The local scheduler can be used by passing --local-scheduler to the luigi runtime. When running with the local scheduler, the algorithm given above is run recursively, and then luigi exits. This is usually only used for testing and debugging purposes.

Central scheduler: More interesting is the central scheduler. The central scheduler is a separate luigid Python Tornado app that workers can talk to over HTTP. It performs two tasks: scheduling of tasks based on the dependency graph and serving a simple web dashboard on port 8082 (default). Note that the central scheduler:

  1. Doesn’t see or execute the Task object's code, hence it never sees or checks whether targets exist; this is always performed by workers.
  2. The task is identified by its signature:
    • Python name of the class; in the example above it’s X.
    • The values of the parameters passed to the task, eg. day=2015-12-01.

Parameters are member variables in the Task objects which derive from class Parameter, eg.:

1
2
3
class X(luigi.Task):
    day = luigi.DateParameter(default=datetime.date.today())
    ...

By specifying significant=False in the Parameter constructor, we can tell Luigi not to treat it as part of the task signature.

The worker builds the local dependency graph and then uploads it to the central scheduler. Then it asks the central scheduler what it should do. The central scheduler potentially receives dependency graphs from several workers, and merges them, assuming tasks with the same name (and parameter values) uploaded from different workers are the same (generate the same output() targets, contain the same run() logic, etc).

Given the dependency graph, the central scheduler then tells workers to start running tasks. A worker can only run tasks that it uploaded to the central scheduler, because those are the tasks that that Python process loaded. So workers are not generic workers, they can only work on the tasks that they were started with!

Given a dependency graph, the scheduler will tell workers to run tasks that have no dependencies. By default, the order is non-deterministic. However, tasks can specify a priority, tasks with higher priority run first. The default priority is 0. Example:

1
2
3
4
5
6
7
class X(luigi.Task):
    @property
    def priority(self):
        if <something>:
            return 2
        else:
            return 1

Because priorities are in code, the worker must evaluate them and pass it on to the central scheduler.

Local parallelism

More than 1 worker thread can be created by passing --workers N to luigi. This is registered to the central scheduler, and if possible N tasks are run in parallel by one worker. So there are multiple levels of parallelism in Luigi: 1. Multiple workers 2. Multiple threads in workers 3. Each task can have further parallelism, eg. a Hadoop MapReduce job.

Managing a library of tasks

What if we’re managing a library of 100s or 1000s of ETL jobs? While I haven’t used Luigi for this, it seems that the basic building blocks are:

  1. Python import statements: our jobs are distributed into different .py files, so we need to import them to use them in requires().
  2. WrapperTask objects: these are special sink tasks which don’t have an output, they just require other tasks to be run.

This part puts a lot of work on the user of Luigi: 1. If we create a new task and forget to add it to the sink task, it won’t be executed (unless it’s a dependency for something else). 2. If we refactor a job (eg. rename the task class, change parameters), we have to search and replace all references in subsequent requires() methods. Since Python isn’t a statically typed language, this has to be done by hand. 3. If running workers on separate machines, it’s our job to synchronize the library of .py files (eg. using git and cron jobs to sync very often). Different versions of tasks with different logic or local, uncommitted changes propagating to the central scheduler will lead to hard to find bugs and data corruption.

Date parameters

In an ETL system, most tasks will have a date(time) parameter which tells the code which day/hour to run the scripts for. For example, a Daily Active User (DAU) script computes the number of unique DAUs for a given day. Because this is such a common use-case, Luigi has a number of helper classes for dealing with date parameters:

Often tasks have to be re-run for a number of days. One way to do this is to call luigi repeatedly from the command line. Or we can use the built in RangeDailyBase (also RangeHourlyBase) helpers:

1
2
3
4
# instead of calling this repeatedly:
    # luigi task Task --date 2015-01-XX
# do this:
$ luigi --module task RangeDailyBase --of Task --start 2015-01-01 --stop 2015-01-31

The name of the date parameter of the task can be specified with --param_name==.

When we pass in a large number of dates (as an interval), the RangeXBase classes will instantiate a task object for each date and call complete() to check whether that task needs to be run. This can be very slow, eg. if each one creates a database connection and then closes it down. There are two optimization classes RangeDaily and RangeHourly that solve this problem. These are used just like the two Base versions from the command line. But instead of instantiating many tasks which potentially don’t have to be run, they assume and call the task’s bulk_complete() classmethod to get a list of dates which have to be run. So the user has to implement a bulk_complete() to use RangeDaily and RangeHourly.

1
$ luigi --module task RangeDaily --of Task --start 2015-01-01 --stop 2015-01-31

Note: it seems Luigi doesn’t support bulk running of parameter intervals.

Scheduling

Most ETL systems have jobs which need to run every hour or every day. Luigi doesn’t have a concept of calendar scheduling, this is up to the user. The recommended method by the authors is to create sink tasks and run them from cron when the external input files (eg. raw log files) are likely to be available.

Rescheduling failed tasks is influenced by the following parameters in the central scheduler’s luigi.cfg:

  • retry-delay: when to re-schedule, default 900 seconds
  • remove-delay: how long the central scheduler keeps tasks around that have no stakeholder; a stakeholder is a worker who uploaded that task
  • disable-hard-timeout: if a task fails again after this much time, it is disabled for good

In the worker’s luigi.cfg:

  • worker-keep-alive: you probably need to set this to true, so workers will stay alive when they run out of jobs to run, as long as they have some pending job waiting to be run. Otherwise workers will disconnect from the central scheduler and exit if there’s nothing to do, even if there are tasks which will be scheduled a few minutes from now.
  • retry-external-tasks: If true, incomplete external tasks (i.e. tasks where the run() method is NotImplemented) will be retested for completion while Luigi is running. This means that if external dependencies are satisfied after a workflow has started, any tasks dependent on that resource will be eligible for running.

The central scheduler has a feature called task history. This logs task completion to a database, and exposes it on the dashboard.

For tasks where the output is a database table, Luigi needs to keep track of successful inserts. It uses a special marker table for this (set with marker-table in luigi.cfg, default name is table_updates). When a task finishes whose target is a database table, an entry is created in the marker table with the task’s task_id (its name and parameter values). When the target’s exists() method is called, this marker table is queried to check whether the task has been run (the task_id is passed by the task to the Target in its constructor).

Resources

Resources can be used to introduce limits on task parallelism. For example, suppose we never want to run more than 10 mysql tasks, or we never want to run more than 3 instances of the hourly job count_users.

Resources are declared in the luigi.cfg file of the scheduler:

1
2
3
[resources]
mysql: 10
count_users: 3

Resource use is given in the resources property of the task object in the Python code, like:

1
resources = {mysql: 2} # using 2 mysql connections in this task

Contrib stuff

Luigi has an impressive library of stock Target and Task classes, each with lots of functionality baked in as helper methods. This is the big reason why I think Luigi is popular and why I would consider using it.

Luigi has Task and Target classes which support:

  • Google Bigquery
  • Hadoop jobs
  • Hive queries
  • Pig queries
  • Scalding jobs
  • Spark jobs
  • Postgresql, Redshift, Mysql tables
  • and more…

Source code and tests

I spent a fair amount of time digging through the Luigi Python source code. It’s pretty clean Python code with a lot of tests. Code size is about 18KLOC plus 16KLOC tests. It’s pretty easy to understand and extend.

Sample cases

Trying it out on a free cloud9 Docker instance:

1
2
3
4
5
6
7
$ pip install tornado # luigi uses the tornado web server
$ export PATH=$PATH:/home/ubuntu/workspace/luigi/bin
$ export PYTHONPATH=/home/ubuntu/workspace/luigi:.
$ luigid
2015-12-19 14:18:08,492 luigi-interface[11022] INFO: Loaded []
2015-12-19 14:18:08,494 luigi.server[11022] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with clean slate
2015-12-19 14:18:08,497 luigi.server[11022] INFO: Scheduler starting up

In another terminal, this is the default Luigi sample to try:

1
2
3
4
5
$ cd luigi/examples
$ luigi --module top_artists AggregateArtists --date-interval 2012-06
# does the job, creates files locally!
$ luigi --module top_artists AggregateArtists --date-interval 2012-06
# notices files are there, doesn’t do anything

Let’s play around with Luigi. Let’s create this x.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import luigi

class X(luigi.WrapperTask):
    task_namespace = 'examples'
    def run(self):
        print("Running X")
    def requires(self):
        for i in range(10):
            yield Bar(i)

class Bar(luigi.Task):
    task_namespace = 'examples'
    num = luigi.IntParameter()
    def run(self):
        print("Bar %s" % self.num)
        time.sleep(10)
        self.output().open('w').close()
        print("Bar touched %s" % self.output())
    def output(self):
        return luigi.LocalTarget('/tmp/bar/%d' % self.num)

And run it like:

1
$ luigi --module x examples.X

This will create an X task and 10 Bar tasks. The 10 Bar tasks will touch /tmp/bar/… and that’s it. Let’s delete the tmp files, and create a similarly named y.py, with identical X and Bar tasks, except X renamed to Y. Let’s launch two workers, one with x and one with y. Notice that the central scheduler will merge the dependency graphs and treat the Bar tasks coming from the different workers/codes as the same, because their task_id (class name plus parameters) are identical. It’s a bit weird, but this is how Luigi works. Another thing you’ll notice is that at the end of the execution, one of X and Y will be unfinished (not green on the dashboard). This is because the workers are run without --worker-keep-alive. So the first worker who finishes its tasks and is waiting for the other worker to finish the last Bar will exit (it’s got nothing to do). If that worker was eg. the x worker, then task X is not going to be run by anyone! if we turn on --worker-keep-alive in the command-line, this oddity goes away.

Conclusion

When designing an ETL framework, I would make (and have made) different design decisions compared to Luigi. But if I were tasked with creating a new ETL framework from scratch (eg. at a new company), I would definitely consider using Luigi. There is simply too much useful stuff there to ignore (and re-implement). However, I would expect to:

  1. Find unexpected and painful behaviour in Luigi.
  2. Write significant scaffolding code to make it useful:
    1. Syncing the task library to different workers
    2. Scheduling series of tasks
    3. Monitoring
    4. Alerting
    5. Dashboard for the ETL datasets and jobs (see below)

Downsides of Luigi:

  1. Sometimes unexpected behaviour: for example, a wrapper task can reach DONE status without ever running the run() method depending on non-deterministic execution order.
  2. The biggest downside to Luigi is that ETL jobs are specified as programmatic Python Task objects and not given is some sort of DSL. This means no external tool can reasonably/easily parse a library of tasks and extract dependency information, which would be useful for eg. generating documentation of the ETL system. Also, analysts have to learn Python.
  3. The web dashboard of the central scheduler is basically useless.

Links, talks