Tasks
Task defines computation step in a pipeline and
has to be class inheriting taskchain.Task.
Basic structure
Example
from taskchain import Task, Parameter
import pandas as pd
class MyTask(Task):
class Meta:
input_tasks = [DependencyTask]
parameters = [
Parameter('param')
]
def run(dependency, param) -> pd.DataFrame:
# ... compitation ...
return result
Meta subclass
This class has to be defined inside every task and describes how should be the task handled by TaskChain. The Meta class in not meant to define any methods, it should only define some attributes:
- input_tasks (
List[Task | str]) - dependency tasks, more below - parameters (
List[Parameter]) - parameters required by this task, which come from your configs, more below - abstract (
bool) - ifTruethis task isn't instantiated and is never part of a chain, useful for inheritance - name (
str) - if not defined, name is derived from class name - group (
str) - data_class (
Type[Data]) - custom class to use for output data persistence - data_type (
Type) - output type ofrunmethod, alternative to typing notation
run method
This method is called by TaskChain when output of this task is requested and data are not already persisted.
Definition of the method has to contain return type (using typing -> or data_type in Meta class).
Value returned by run method is checked by TaskChain if matches defined type.
Return type is important for data persistence.
Tip
The method can have arguments. TaskChain try to match and fill these arguments by values of parameters and input task.
Warning
Avoid expensive computation or loading data in __init__.
TaskChain can create task object multiple times and often task is not used at all.
Put all expensive operation to run method.
You can use @persistent decorator.
Task names and groups
Name of the task (str) is either defined by name attribute of Meta or it is derived from class name
(converting to underscore notation and removing task at the end).
Here are some examples:
DataTask->dataFilteredDataTask->filtered_dataFilteredData->filtered_dataLongNameOfTheTask->long_name_of_the
Group of task allows keep some order in larger projects and have impact on location of persisted data.
Usually task with a same group defines pipeline.
The group can be defined in Meta class.
and if it is the fullname of the task is group_name:task_name.
If you need more rich structure of groups, you can use : to separate multiple levels of groups, e.g. group:subgroup.
Tip
Usually all task of a pipeline are defined in one module (file).
To avoid defining same group in all tasks,
it is possible inherit from ModuleTask or DoubleModuleTask instead of Task.
In that case group is set to module name.
Parameters
Parameters are connection between tasks and configs.
Parameters defined in Meta tell TaskChain which values should be extracted from configs
and provided for run method.
Parameter can be accessed through arguments of run method
or directly from class's ParameterRegistry: self.params.my_param_name or self.params['my_param_name'].
Example
class AllDataTask(Task):
class Meta:
parameters = [
Parameter('input_file')
]
def run(self, input_file) -> pd.DataFrame
assert input_file == self.params.input_file
return pd.read_csv(input_file)
class FilteredDataTask(Tasks):
class Meta:
input_tasks = [AllDataTask]
parameters = [
Parameter('min_value', default=0)
Parameter('max_value')
]
def run(self, all_data, min_value, max_value) -> pd.DataFrame
return all_data.query('{min_Value} <= value <= {max_value}')
Parameter's arguments
- name - name for referencing from task
- default - value used if not provided in config, defaults to NO_DEFAULT meaning that param is required
- name_in_config - name used for search in config, defaults to
nameargument - dtype - expected datatype
- ignore_persistence - do not use this parameter in persistence, useful for params without influence on output,
e.g.
verboseordebug - dont_persist_default_value - if value of parameter is same as the default, do not use it in persistence. This is useful for adding new parameters without recomputation of old data
Tip
You can use pathlib.Path as datatype. Expected value in config is str, however,
value provided by the parameter has type of Path.
Reserved parameter names
Following names have special meaning in configs and cannot be used as parameter names:
tasks,excluded_tasksuseshuman_readable_data_nameconfigs,for_namespaces,main_part
Input tasks
Input tasks are connection between tasks.
This Meta argument tells TaskChain which other tasks are prerequisites of this task.
Values (data) of input tasks can be accessed through arguments of run method or
directly from class's InputTasks: self.input_task['my_task'].value.
It is also possible access input task by index: self.input_task[0].value.
This can be useful if task inheritance is used.
Then run method can stay unchanged and only input_tasks can be redefined.
Input task can be defined in following ways:
- by class:
input_tasks = [MyDataTask]- this way is preferred if possible - by name:
input_tasks = ['my_data'] - by name and group:
input_tasks = ['group:my_data'] - by name, group and namespace:
input_tasks = ['namespace::group:my_data'] - by regexp string starting with
~:input_tasks = ['~my_*']- this expands to all matching tasks in chain (tasks starting withmy_)
Data persistence
Persisting output of tasks is main feature of the TaskChain.
When run method produce value (data) TaskChain saves this value and
later when the value of the task is requested again, value is just loaded
instead of calling run method again.
Data class
Saving and loading of values is handled by inheritors of taskchain.task.Data class.
Witch class is used is determined automatically by return data type of run method
or by data_class attribute of Meta.
These Data classes are determined automatically:
- JSONData persists
str,int,float,bool,dict,listtypes into.jsonfiles - NumpyData persists
np.ndarraytype into.npyfile - PandasData persists
pd.DataFrameorpd.Seriestype into.pdfile - FigureData persists
plt.Figuretype into pickle but also saves plot as.pngand.svgfor easy sharing. Usepylaborseabornas usual and just returnplt.gcf(). - GeneratedData is used if return type is
Generator. It is assumed that generated values are json-like. Values are saved to.jsonlfile (json lines).
Other useful Data classes which have to be explicitly defined in data_class attribute.
- InMemoryData - this special type is not persisting and value is saved only in memory of process.
Example
class MyTask(Task):
class Meta:
data_class = InMemoryData
def run() -> WhatEver:
# ...
return whatever
- DirData - this class allows save arbitrary data to provided directory,
but data have to be handled inside
runmethod manually. Value of the task isPathof this directory.
Example
class MyTask(Task):
def run(self) -> DirData:
# ...
data_object = self.get_data_object()
self.save(my_data_1, data_object.dir / 'my_data_1.pickle')
self.save(my_data_2, data_object.dir / 'my_data_2.pickle')
return data_object
def save(self, data, dir_path):
...
- ContinuesData - for task with large run time, e.g. training of large models, it is possible to make computation on multiple runs. This allows to save partial results and next call of run method starts from last checkpoint when computation is interrupted.
Example
class TrainModel(Task):
class Meta:
...
def run(self) -> ContinuesData:
data: ContinuesData = self.get_data_object()
working_dir = data.dir
self.prepare_model()
checkpoint_path = working_dir / 'checkpoint'
if checkpoint_path.exists():
self.load_checkpoint(checkpoint_path)
self.train(
save_path=working_dir / 'trained_model'
checkpoint_path=checkpoint_path
) # training should save checkpoint periodically and trained model at the end
data.finished()
return data
- H5Data - special case of
ContinuesDatawhich allows to compute large data files.
Example
class Embeddings(Task):
class Meta:
...
def run(self) -> H5Data:
data: H5Data = self.get_data_object()
with data.data_file() as data_file:
h5_emb_dataset = data.dataset('embedding', data_file, maxshape=(None, embedding_size), dtype=np.float32)
progress = h5_emb_dataset.len()
for i, row in enumerate(my_dataset[progress:]):
if i % 1000 == 0:
gc.collect()
emb = self.get_embedding(row)
data.append_data(h5_emb_dataset, [emb], dataset_len=progress)
data_file.flush()
progress += 1
data.finished()
return data
You can define ad hoc Data classes to handle other data types.
Returning Data object directly
In some cases it is convenient to return (by run method) Data object directly.
DirData is one example.
Other use case is custom object which inherits from InMemoryData.
See TrainedModel task in example project
which returns RatingModel directly.
This is the way to easily expose a important object to other tasks in the pipeline.
Logging
TaskChain offer two ways to save addition information about computation mainly for debug purposes.
Run info
After run method finishes and result value is saved to disk
Data object also save additional information about computation.
It is possible add any json-like information to this info.
class MyTask(Task):
...
def run(self):
...
self.save_to_run_info('some important information')
self.save_to_run_info({'records_processes': 42, 'errors': 0})
The run info is saved as YAML and is available under task_object.run_info in json-like form.
hash.run_info.yaml
task:
class: Movies
module: movie_ratings.tasks.movies
name: movies:movies
config:
context: null
name: imdb.filtered/movies:movies
namespace: null
input_tasks:
movies:all_movies: 436f7a5e06e540716b275a5f84499a78
log:
- some important information
- records_processes: 42
errors': 0
parameters:
from_year: '1945'
min_vote_count: '1000'
to_year: None
started: '2021-07-11 11:34:01.520866'
ended: '2021-07-11 11:34:01.850913'
time: 0.3300471305847168
user:
name: your_system_name
logger
Each task has its own standard python logger,
which can be used from run method.
class MyTask(Task):
...
def run(self):
...
self.logger.debug('not so important information')
This logger has two handlers
- File handler managed by
Dataobject which saves log along value produced by task. Logging level of this handler is set toDEBUG. - Other handler is managed by chain object and log to console.
Logging level of this handler is set to
WARNINGand can be changed from chain bychain.set_log_level('DEBUG').
Advanced topics
Tasks inheritance
Tasks are classes and can be inherited. This simplifies cases when pipeline contains tasks with similar functionality.
You can inherit a task and change his behaviour by
- changing
Metaclass- you can change input tasks and then computation will be done with different input.
In this case, it is not possible have input tasks in
runarguments, and they can access byself.input_tasks[0].value. This way, use of the task name, which is changing, is avoided. - you can add custom attribute to
Metaclass and access it byself.meta.my_attributeand make computation based on its value.
- you can change input tasks and then computation will be done with different input.
In this case, it is not possible have input tasks in
- you can override some methods used by
runmethod
It is possible declare in Meta class abstract = True.
In that case, task will be not recognized by project.tasks.pipeline.* in config
and will not be part of your pipeline.
This can be useful for tasks, which will be inherited from.
Example of task inheritance can be found in example project
- movies pipeline - search for
ExtractFeatureTask. - model pipeline - search for
DataSelectionTask.