API Documentation

b2luigi summarizes different topics to help you in your everyday task creation and processing. Most important is the b2luigi.process() function, which lets you run arbitrary task graphs on the batch. It is very similar to luigi.build, but lets you hand in additional parameters for steering the batch execution.

Top-Level Function

b2luigi.process(task_like_elements, show_output=False, dry_run=False, test=False, batch=False, **kwargs)

Call this function in your main method to tell b2luigi where your entry point of the task graph is. It is very similar to luigi.build with some additional configuration options.

Example

This example defines a simple task and tells b2luigi to execute it 100 times with different parametes:

import b2luigi
import random


class MyNumberTask(b2luigi.Task):
    some_parameter = b2luigi.Parameter()

    def output(self):
        return b2luigi.LocalTarget(f"results/output_file_{self.some_parameter}.txt")

    def run(self):
        random_number = random.random()
        with self.output().open("w") as f:
            f.write(f"{random_number}\n")

if __name__ == "__main__":
    b2luigi.process([MyNumberTask(some_parameter=i) for i in range(100)])

All flag arguments can also be given as command line arguments. This means the call with:

b2luigi.process(tasks, batch=True)

is equivalent to calling the script with:

python script.py --batch
Parameters:
  • task_like_elements (Task or list) – Task(s) to execute with luigi. Can either be a list of tasks or a task instance.
  • show_output (bool, optional) – Instead of running the task(s), write out all output files which will be generated marked in color, if they are present already. Good for testing of your tasks will do, what you think they should.
  • dry_run (bool, optional) – Instead od running the task(s), write out which tasks will be executed. This is a simplified form of dependency resolution, so this information may be wrong in some corner cases. Also good for testing.
  • test (bool, optional) – Does neither run on the batch system, with multiprocessing or dispatched (see DispatchableTask) but directly on the machine for debugging reasons. Does output all logs to the console.
  • batch (bool, optional) – Execute the tasks on the selected batch system. Refer to Quick Start for more information. The default batch system is LSF, but this can be changed with the batch_system settings. See get_setting on how to define settings.
  • **kwargs – Additional keyword arguments passed to luigi.build.

Super-hero Task Classes

If you want to use the default luigi.Task class or any derivative of it, you are totally fine. No need to change any of your scripts! But if you want to take advantage of some of the recipies we have developed to work with large luigi task sets, you can use the drop in replacements from the b2luigi package. All task classes (except the b2luigi.DispatchableTask) are superclasses of a luigi class. As we import luigi into b2luigi, you just need to replace

import luigi

with

import b2luigi as luigi

and you will have all the functionality of luigi and b2luigi without the need to change anything!

class b2luigi.Task(*args, **kwargs)

Bases: luigi.task.Task

Drop in replacement for luigi.Task which is 100% API compatible. It just adds some useful methods for handling output file name generation using the parameters of the task. See Quick Start on information on how to use the methods.

Example:

class MyAverageTask(b2luigi.Task):
    def requires(self):
        for i in range(100):
            yield self.clone(MyNumberTask, some_parameter=i)

    def output(self):
        yield self.add_to_output("average.txt")

    def run(self):
        # Build the mean
        summed_numbers = 0
        counter = 0
        for input_file in self.get_input_file_names("output_file.txt"):
            with open(input_file, "r") as f:
                summed_numbers += float(f.read())
                counter += 1

        average = summed_numbers / counter

        with self.get_output_file("average.txt").open("w") as f:
            f.write(f"{average}\n")
add_to_output(output_file_name)

Call this in your output() function to add a target to the list of files, this task will output. Always use in combination with yield. This function will automatically add all current parameter values to the file name when used in the form result_path/param_1=value/param_2=value/output_file_name

This function will automatically use a LocalTarget. If you do not want this, you can override the _get_output_file_target function.

Example

This adds two files called some_file.txt and some_other_file.txt to the output:

def output(self):
    yield self.add_to_output("some_file.txt")
    yield self.add_to_output("some_other_file.txt")
Parameters:output_file_name (str) – the file name of the output file. Refer to this file name as a key when using get_input_file_names, get_output_file_names or get_output_file.
get_input_file_names(key=None)

Get a dictionary of input file names of the tasks, which are defined in our requirements. Either use the key argument or dictionary indexing with the key given to add_to_output to get back a list (!) of file paths.

Parameters:key (str, optional) – If given, only return a list of file paths with this given key.
Returns:If key is none, returns a dictionary of keys to list of file paths. Else, returns only the list of file paths for this given key.
get_output_file_name(key)

Analogous to get_input_file_names this function returns a an output file defined in out output function with the given key.

In contrast to get_input_file_names, only a single file name will be returned (as there can only be a single output file with a given name).

Parameters:key (str) – Return the file path with this given key.
Returns:Returns only the file path for this given key.
class b2luigi.ExternalTask(*args, **kwargs)

Bases: b2luigi.core.task.Task, luigi.task.ExternalTask

Direct copy of luigi.ExternalTask, but with the capabilities of Task added.

class b2luigi.WrapperTask(*args, **kwargs)

Bases: b2luigi.core.task.Task, luigi.task.WrapperTask

Direct copy of luigi.WrapperTask, but with the capabilities of Task added.

b2luigi.dispatch(run_function)

In cases you have a run function calling external, probably insecure functionalities, use this function wrapper around your run function.

Example

The run function can include any code you want. When the task runs, it is started in a subprocess and monitored by the parent process. When it dies unexpectedly (e.g. because of a segfault etc.) the task will be marked as failed. If not, it is successful. The log output will be written to two files in the log folder (marked with the parameters of the task), which you can check afterwards:

import b2luigi


class MyTask(b2luigi.Task):
    @b2luigi.dispatch
    def run(self):
        call_some_evil_function()
Implementation note:
In the subprocess we are calling the current sys.executable (which should by python hopefully) with the current input file as a parameter, but let it only run this specific task (by handing over the task id and the –batch-worker option). The run function notices this and actually runs the task instead of dispatching again.

You have the possibility to control what exactly is used as executable by setting the “executable” setting, which needs to be a list of strings. Additionally, you can add a cmd_prefix parameter to your class, which also needs to be a list of strings, which are prefixed to the current command (e.g. if you want to add a profiler to all your tasks)

class b2luigi.DispatchableTask(*args, **kwargs)

Bases: b2luigi.core.task.Task

Instead of using the dispatch function wrapper, you can also inherit from this class. Except that, it has exactly the same functionality as a normal Task.

Important

You need to overload the process function instead of the run function in this case!

process()

Override this method with your normal run function. Do not touch the run function itself!

Parameters

As b2luigi automatically also imports luigi, you can use all the parameters from luigi you know and love. We have just added a single new flag called hashed to the parameters constructor. Turning it to true (it is turned off by default) will make b2luigi use a hashed version of the parameters value, when constructing output or log file paths. This is especially useful if you have parameters, which may include “dangerous” characters, like “/” or “{” (e.g. when using list or dictionary parameters). See also one of our FAQ.

Settings

b2luigi.get_setting(key, default=None)

b2luigi adds a settings management to luigi and also uses it at various places.

With this function, you can get the current value of a specific setting with the given key. If there is no setting defined with this name, either the default is returned or, if you did not supply any default, a value error is raised.

For information on how settings are set, please see set_setting. Settings can be of any type, but are mostly strings.

Parameters:
  • key (str) – The name of the parameter to query.
  • default (optional) – If there is no setting which the name, either return this default or if it is not set, raise a ValueError.
b2luigi.set_setting(key, value)

There are two possibilities to set a setting with a given name:

  • Either you use this function and supply the key and the value. The setting is then defined globally for all following calls to get_setting with the specific key.

  • Or you add a file called settings.json the the current working directory or any folder above that. In the json file, you need to supply a key and a value for each setting you want to have, e.g:

    {
        "result_path": "results",
        "some_setting": "some_value"
    }
    

    By looking also in the parent folders for setting files, you can define project settings in a top folder and specific settings further down in your local folders.

b2luigi.clear_setting(key)

Clear the setting with the given key

Other functions

b2luigi.on_temporary_files(run_function)

Wrapper for decorating a task’s run function to use temporary files as outputs.

A common problem when using long running tasks in luigi is the thanksgiving bug. It occurs, when you define an output of a task and in its run function you create this output and fill it with content during a long lasting calculation. It may happen, that during the creation of the output and the finish of the calculation some other tasks look if the output is already there, find it existing and assume, that the task is already finished (although there is probably only nonsense in the file).

A solution is already given by luigi itself, when using the temporary_path() function of the file system targets, which is really nice! Unfortunately, this means you have to open all your output files with a context manager and this is very hard to do if you have external tasks also (because they will probably use the output file directly instead of the temporary file version of if).

This wrapper simplifies the usage of the temporary files:

import b2luigi

class MyTask(b2luigi.Task):
    def output(self):
        yield self.add_to_output("test.txt")

    @b2luigi.on_temporary_files
    def run(self):
        with open(self.get_output_file_name("test.txt"), "w") as f:
            raise ValueError()
            f.write("Test")

Instead of creating the file “test.txt” at the beginning and filling it with content later (which will never happen because of the exception thrown, mich makes the file existing but the task actually not finished), the file will be written to a temporary file first and copied to its final location at the end of the run function (but only if there was no error).

Attention:

The decorator only edits the function get_output_file_name. If you are using the output directly, you have to take care of using the temporary path correctly by yourself!