Belle II specific examples

The following examples are not of interest to the general audience, but only for basf2 users.

Running at the NAF

The environment on the workers is different than on the scheduling machine, so we can not just copy the environment variables as on KEKCC.

You can use setup script (e.g. called setup_basf2.sh) with the following content

source /cvmfs/belle.cern.ch/tools/b2setup release-XX-XX-XX

All you have to do is specify the setup script as the env_script setting and also set the executable explicitly.

from time import sleep
import os

import b2luigi


class MyTask(b2luigi.Task):
    parameter = b2luigi.IntParameter()

    def output(self):
        yield self.add_to_output("test.txt")

    def run(self):
        sleep(self.parameter)

        with open(self.get_output_file_name("test.txt"), "w") as f:
            f.write("Test")


class Wrapper(b2luigi.Task):
    def requires(self):
        for i in range(10):
            yield MyTask(parameter=i)

    def output(self):
        yield self.add_to_output("test.txt")

    def run(self):
        with open(self.get_output_file_name("test.txt"), "w") as f:
            f.write("Test")


if __name__ == '__main__':
    # Choose htcondor as our batch system
    b2luigi.set_setting("batch_system", "htcondor")

    # Setup the correct environment on the workers
    b2luigi.set_setting("env_script", "setup_basf2.sh")

    # Most likely your executable from the submission node is not the same on
    # the worker node, so specify it explicitly
    b2luigi.set_setting("executable", ["python3"])

    # Where to store the results
    b2luigi.set_setting("result_dir", "results")

    b2luigi.process(Wrapper(), batch=True, workers=100)

Of course it is also possible to set those settings in the settings.json or as task-specific parameters. Please check out b2luigi.get_setting() for more information.

Please note that the called script as well as the results folder need to be accessible from both the scheduler and the worker machines. If needed, you can also include more setup steps in the source script.

Running at KEKCC

KEKCC uses LSF as the batch system. As this is the default for b2luigi there is nothing you need to do.

nTuple Generation

import b2luigi as luigi
from b2luigi.basf2_helper import Basf2PathTask, Basf2nTupleMergeTask

import basf2

import modularAnalysis


class AnalysisTask(Basf2PathTask):
    experiment_number = luigi.IntParameter()
    run_number = luigi.IntParameter()
    mode = luigi.Parameter()
    file_number = luigi.IntParameter()

    def output(self):
        # Define the outputs here
        yield self.add_to_output("D_n_tuple.root")
        yield self.add_to_output("B_n_tuple.root")

    def create_path(self):
        # somehow create filenames from parameters
        # self.experiment_number, self.run_number,
        # self.mode and self.file_number
        # (parameters just examples)
        input_file_names = ..

        path = basf2.create_path()
        modularAnalysis.inputMdstList('default', input_file_names, path=path)

        # Now fill your particle lists, just examples
        modularAnalysis.fillParticleLists([('K+', 'kaonID > 0.1'), ('pi+', 'pionID > 0.1')],
                                          path=path)
        modularAnalysis.reconstructDecay('D0 -> K- pi+', '1.7 < M < 1.9', path=path)
        modularAnalysis.fitVertex('D0', 0.1, path=path)
        modularAnalysis.matchMCTruth('D0', path=path)
        modularAnalysis.reconstructDecay('B- -> D0 pi-', '5.2 < Mbc < 5.3', path=path)
        modularAnalysis.fitVertex('B+', 0.1, path=path)
        modularAnalysis.matchMCTruth('B-', path=path)

        # When exporting, use the function get_output_file_name()
        modularAnalysis.variablesToNtuple('D0',
                                        ['M', 'p', 'E', 'useCMSFrame(p)', 'useCMSFrame(E)',
                                        'daughter(0, kaonID)', 'daughter(1, pionID)', 'isSignal', 'mcErrors'],
                                        filename=self.get_output_file_name("D_n_tuple.root"),
                                        path=path)
        modularAnalysis.variablesToNtuple('B-',
                                        ['Mbc', 'deltaE', 'isSignal', 'mcErrors', 'M'],
                                        filename=self.get_output_file_name("B_n_tuple.root"),
                                        path=path)
        return path


class MasterTask(luigi.WrapperTask):
    def requires(self):
        # somehow loop over the runs, experiment etc.
            yield self.clone(AnalysisTask, experiment_number=...)


if __name__ == "__main__":
    luigi.process(MasterTask(), workers=500)

Standard Simulation, Reconstruction and some nTuple Generation

import b2luigi as luigi
from b2luigi.basf2_helper import Basf2PathTask, Basf2nTupleMergeTask

from enum import Enum

import basf2

import modularAnalysis
import simulation
import vertex
import generators
import reconstruction
from ROOT import Belle2


class SimulationType(Enum):
    y4s = "Y(4S)"
    continuum = "Continuum"


class SimulationTask(Basf2PathTask):
    n_events = luigi.IntParameter()
    event_type = luigi.EnumParameter(enum=SimulationType)

    def create_path(self):
        path = basf2.create_path()
        modularAnalysis.setupEventInfo(self.n_events, path)

        if self.event_type == SimulationType.y4s:
            # in current main branch and release 5 the Y(4S)decay file is moved, so try old and new locations
            find_file_ignore_error = True
            dec_file = Belle2.FileSystem.findFile('analysis/examples/tutorials/B2A101-Y4SEventGeneration.dec',
                                                  find_file_ignore_error)
            if not dec_file:
                dec_file = Belle2.FileSystem.findFile('analysis/examples/simulations/B2A101-Y4SEventGeneration.dec')
        elif self.event_type == SimulationType.continuum:
            dec_file = Belle2.FileSystem.findFile('analysis/examples/simulations/B2A102-ccbarEventGeneration.dec')
        else:
            raise ValueError(f"Event type {self.event_type} is not valid. It should be either 'Y(4S)' or 'Continuum'!")

        generators.add_evtgen_generator(path, 'signal', dec_file)
        modularAnalysis.loadGearbox(path)
        simulation.add_simulation(path)

        path.add_module('RootOutput', outputFileName=self.get_output_file_name('simulation_full_output.root'))

        return path

    def output(self):
        yield self.add_to_output("simulation_full_output.root")


@luigi.requires(SimulationTask)
class ReconstructionTask(Basf2PathTask):
    def create_path(self):
        path = basf2.create_path()

        path.add_module('RootInput', inputFileNames=self.get_input_file_names("simulation_full_output.root"))
        modularAnalysis.loadGearbox(path)
        reconstruction.add_reconstruction(path)

        modularAnalysis.outputMdst(self.get_output_file_name("reconstructed_output.root"), path=path)

        return path

    def output(self):
        yield self.add_to_output("reconstructed_output.root")


@luigi.requires(ReconstructionTask)
class AnalysisTask(Basf2PathTask):
    def create_path(self):
        path = basf2.create_path()
        modularAnalysis.inputMdstList('default', self.get_input_file_names("reconstructed_output.root"), path=path)
        modularAnalysis.fillParticleLists([('K+', 'kaonID > 0.1'), ('pi+', 'pionID > 0.1')], path=path)
        modularAnalysis.reconstructDecay('D0 -> K- pi+', '1.7 < M < 1.9', path=path)
        modularAnalysis.matchMCTruth('D0', path=path)
        modularAnalysis.reconstructDecay('B- -> D0 pi-', '5.2 < Mbc < 5.3', path=path)
        try:  # treeFit is the new function name in light releases after release 4 (e.g. light-2002-janus)
            vertex.treeFit('B+', 0.1, update_all_daughters=True, path=path)
        except AttributeError:  # vertexTree is the function name in release 4
            vertex.vertexTree('B+', 0.1, update_all_daughters=True, path=path)
        modularAnalysis.matchMCTruth('B-', path=path)
        modularAnalysis.variablesToNtuple('D0',
                                          ['M', 'p', 'E', 'useCMSFrame(p)', 'useCMSFrame(E)',
                                           'daughter(0, kaonID)', 'daughter(1, pionID)', 'isSignal', 'mcErrors'],
                                          filename=self.get_output_file_name("D_n_tuple.root"),
                                          path=path)
        modularAnalysis.variablesToNtuple('B-',
                                          ['Mbc', 'deltaE', 'isSignal', 'mcErrors', 'M'],
                                          filename=self.get_output_file_name("B_n_tuple.root"),
                                          path=path)
        return path

    def output(self):
        yield self.add_to_output("D_n_tuple.root")
        yield self.add_to_output("B_n_tuple.root")


class MasterTask(Basf2nTupleMergeTask):
    n_events = luigi.IntParameter()

    def requires(self):
        for event_type in SimulationType:
            yield self.clone(AnalysisTask, event_type=event_type)


if __name__ == "__main__":
    luigi.process(MasterTask(n_events=1), workers=4)