Batch Processing¶
As shown in Quick Start, using the batch instead of local processing is really just a --batch
on the command line or calling process
with batch=True
.
However, there is more to discover!
Choosing the batch system¶
Using b2luigi
’s settings mechanism (described here b2luigi.get_setting()
) you can choose which
batch system should be used.
Currently, htcondor
and lsf
are supported, more will come soon (PR welcome!).
Choosing the Environment¶
If you are doing a local calculation, all calculated tasks will use the same environment (e.g. $PATH
setting, libraries etc.)
as you have currently set up when calling your script(s).
This makes it predictable and simple.
Things get a bit more complicated when using a batch farm, as the workers might not have the same environment set up, the batch submission does not copy the environment (or the local site administrators have forbidden that) or the system on the workers is so different that copying the environment from the scheduling machine does not make sense.
Therefore b2luigi
provides you with three mechanism to set the environment for each task:
You can give a bash script in the
env_script
setting (viaset_setting()
,settings.json
or for each task as usual, seeb2luigi.get_setting()
), which will be called even before anything else on the worker. Use it to set up things like the path variables or the libraries (e.g. when you are using a virtual environment) and your batch system does not support environment copy from the scheduler to the workers. For example a useful script might look like this:# Source my virtual environment source venv/bin/activate # Set some specific settings export MY_IMPORTANT_SETTING 10
You can set the
env
setting to a dictionary, which contains additional variables to be set up before your job runs. Using the mechanism described inb2luigi.get_setting()
it is possible to make this task- or even parameter-dependent.By default,
b2luigi
re-uses the samepython
executable on the workers as you used to schedule the tasks (by calling your script). In some cases, this specific python executable is not present on the worker or is not usable (e.g. because of different operation systems or architectures). You can choose a new executable with theexecutable
setting (it is also possible to just usepython3
as the executable assuming it is in the path). The executable needs to be callable after yourenv_script
or your specificenv
settings are used. Please note, that theenvironment
setting is a list, so you need to pass your python executable with possible arguments like this:b2luigi.set_setting("executable", ["python3"])
Different File System¶
Depending on your batch system, the filesystem on the worker processing the task and the scheduler machine can be different or even unrelated.
However, b2luigi
needs at least three common folders: the result folder, the log folder and the folder of your script.
If possible, use absolute paths for the result and log directory to prevent any problems.
In some cases, the batch system starts the job in an arbitrary folder on the workers.
That is why b2luigi
will change the directory into the path of your called script before starting the job.
In case your script is accessible from a different location on the worker than on the scheduling machine, you can give the setting working_dir
to specify where the job should run.
Your script needs to be in this folder and every relative path (e.g. for results or log) will be evaluated from there.
Drawbacks of the batch mode¶
Although the batch mode has many benefits, it would be unfair to not mention its downsides:
- You have to choose the queue/batch settings/etc. depending in your requirements (e.g. wall clock time) by yourself.
So you need to make sure that the tasks will actually finish before the batch system kills them because of timeout.
There is just no way for
b2luigi
to know this beforehand. - There is currently no resubmission implemented.
This means dying jobs because of batch system failures are just dead.
But because of the dependency checking mechanism of
luigi
it is simple to just redo the calculation and re-calculate what is missing. - The
luigi
feature to request new dependencies while task running (viayield
) is not implemented for the batch mode so far.
Batch System Specific Settings¶
Every batch system has special settings. You can look them up here:
LSF¶
-
class
b2luigi.batch.processes.lsf.
LSFProcess
(*args, **kwargs)¶ Bases:
b2luigi.batch.processes.BatchProcess
Reference implementation of the batch process for a LSF batch system.
Additional to the basic batch setup (see Batch Processing), additional LSF-specific things are:
the LSF queue can be controlled via the
queue
parameter, e.g.class MyLongTask(b2luigi.Task): queue = "l"
The default is the short queue “s”.
By default, the environment variables from the scheduler are copied to the workers. This also applies we start in the same working directory and can reuse the same executable etc. Normally, you do not need to supply
env_script
or alike.
HTCondor¶
-
class
b2luigi.batch.processes.htcondor.
HTCondorProcess
(*args, **kwargs)¶ Bases:
b2luigi.batch.processes.BatchProcess
Reference implementation of the batch process for a HTCondor batch system.
Additional to the basic batch setup (see Batch Processing), additional HTCondor-specific things are:
- Please note that most of the HTCondor applications do not have the same
environment setup on submission and worker machines, so you might always want to give an
env_script
, anenv
setting and/or a differentexecutable
. - You can give an
htcondor_setting
dict setting flag for additional options, such as requested memory etc. It’s value has to be a dictionary containing also HTCondor settings as key/value pairs. These options will be written into the job submission file. For an overview of possible settings refer to the HTCondor documentation.
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
import b2luigi import random import os class MyNumberTask(b2luigi.Task): some_parameter = b2luigi.IntParameter() htcondor_settings = { "request_cpus": 1, "request_memory": "100 MB" } def output(self): yield self.add_to_output("output_file.txt") def run(self): print("I am now starting a task") random_number = random.random() if self.some_parameter == 3: raise ValueError with open(self.get_output_file_name("output_file.txt"), "w") as f: f.write(f"{random_number}\n") class MyAverageTask(b2luigi.Task): htcondor_settings = { "request_cpus": 1, "request_memory": "200 MB" } def requires(self): for i in range(10): yield self.clone(MyNumberTask, some_parameter=i) def output(self): yield self.add_to_output("average.txt") def run(self): print("I am now starting the average task") # Build the mean summed_numbers = 0 counter = 0 for input_file in self.get_input_file_names("output_file.txt"): with open(input_file, "r") as f: summed_numbers += float(f.read()) counter += 1 average = summed_numbers / counter with open(self.get_output_file_name("average.txt"), "w") as f: f.write(f"{average}\n") if __name__ == "__main__": b2luigi.process(MyAverageTask(), workers=200, batch=True)
- Please note that most of the HTCondor applications do not have the same
environment setup on submission and worker machines, so you might always want to give an
Add your own batch system¶
If you want to add a new batch system, all you need to do is to implement the
abstract functions of BatchProcess
for your system:
-
class
b2luigi.batch.processes.
BatchProcess
(task, scheduler, result_queue, worker_timeout)¶ This is the base class for all batch algorithms that allow luigi to run on a specific batch system. This is an abstract base class and inheriting classes need to supply functionalities for * starting a job using the commands in self.task_cmd * getting the job status of a running, finished or failed job * and killing a job All those commands are called from the main process, which is not running on the batch system. Every batch system that is capable of these functions can in principle work together with b2luigi.
- Implementation note:
In principle, using the batch system is transparent to the user. In case of problems, it may however be useful to understand how it is working.
When you start your luigi dependency tree with
process(..., batch=True)
, the normal luigi process is started looking for unfinished tasks and running them etc. Normally, luigi creates a process for each running task and runs them either directly or on a different core (if you have enabled more than one worker). In the batch case, this process is not a normal python multiprocessing process, but this BatchProcess, which has the same interface (one can check the status of the process, start or kill it). The process does not need to wait for the batch job to finish but is asked repeatedly for the job status. By this, most of the core functionality of luigi is kept and reused. This also means, that every batch job only includes a single task and is finished whenever this task is done decreasing the batch runtime. You will need exactly as many batch jobs as you have tasks and no batch job will idle waiting for input data as all are scheduled only when the task they should run is actually runnable (the input files are there).What is the batch command now? In each job, we call a specific executable bash script only created for this task. It contains the setup of the environment (if given by the user via the settings), the change of the working directory (the directory of the python script or a specified directory by the user) and a call of this script with the current python interpreter (the one you used to call this main file or given by the setting
executable
) . However, we give this call an additional parameter, which tells it to only run one single task. Task can be identified by their task id. A typical task command may look like:/<path-to-your-exec>/python /your-project/some-file.py --batch-runner --task-id MyTask_38dsf879w3
if the batch job should run the MyTask. The implementation of the abstract functions is responsible for creating an running the executable file and writing the log of the job into appropriate locations. You can use the functions
create_executable_wrapper
andget_log_file_dir
to get the needed information.Checkout the implementation of the lsf task for some implementation example.
-
get_job_status
()¶ Implement this function to return the current job status. How you identify exactly your job is dependent on the implementation and needs to be handled by your own child class.
Must return one item of the JobStatus enumeration: running, aborted, successful or idle. Will only be called after the job is started but may also be called when the job is finished already. If the task status is unknown, return aborted. If the task has not started already but is scheduled, return running nevertheless (for b2luigi it makes no difference). No matter if aborted via a call to kill_job, by the batch system or by an exception in the job itself, you should return aborted if the job is not finished successfully (maybe you need to check the exit code of your job).
-
kill_job
()¶ This command is used to abort a job started by the start_job function. It is only called once to abort a job, so make sure to either block until the job is really gone or be sure that it will go down soon. Especially, do not wait until the job is finished. It is called for example when the user presses Ctrl-C.
In some strange corner cases it may happen that this function is called even before the job is started (the start_job function is called). In this case, you do not need to do anything (but also not raise an exception).
-
start_job
()¶ Override this function in your child class to start a job on the batch system. It is called exactly once. You need to store any information identifying your batch job on your own.
You can use the
b2luigi.core.utils.get_log_file_dir
and theb2luigi.core.executable.create_executable_wrapper
functions to get the log base name and to create the executable script which you should call in your batch job.After the start_job function is called by the framework (and no exception is thrown), it is assumed that a batch job is started or scheduled.
After the job is finished (no matter if aborted or successful) we assume the stdout and stderr is written into the two files given by b2luigi.core.utils.get_log_file_dir(self.task).