Skip to content

Utils

taskchain.cache

Cache

Bases: abc.ABC

Cache interface.

Source code in taskchain/cache.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class Cache(abc.ABC):
    """Cache interface."""

    @abc.abstractmethod
    def get(self, key: str) -> Any:
        """
        Get value for given key if cached.

        Args:
            key: key under which value is cached

        Returns:
            cached value or NO_VALUE
        """
        pass

    @abc.abstractmethod
    def get_or_compute(self, key: str, computer: Callable, force: bool = False) -> Any:
        """
        Get value for given key if cached or compute and cache it.

        Args:
            key: key under which value is cached
            computer: function which returns value if not cached
            force: recompute value even if it is in cache

        Returns:
            cached or computed value
        """
        pass

    @abc.abstractmethod
    def subcache(self, *args) -> 'Cache':
        """Create separate sub-cache of this cache."""
        pass

get(key) abstractmethod

Get value for given key if cached.

Parameters:

Name Type Description Default
key str

key under which value is cached

required

Returns:

Type Description
Any

cached value or NO_VALUE

Source code in taskchain/cache.py
30
31
32
33
34
35
36
37
38
39
40
41
@abc.abstractmethod
def get(self, key: str) -> Any:
    """
    Get value for given key if cached.

    Args:
        key: key under which value is cached

    Returns:
        cached value or NO_VALUE
    """
    pass

get_or_compute(key, computer, force=False) abstractmethod

Get value for given key if cached or compute and cache it.

Parameters:

Name Type Description Default
key str

key under which value is cached

required
computer Callable

function which returns value if not cached

required
force bool

recompute value even if it is in cache

False

Returns:

Type Description
Any

cached or computed value

Source code in taskchain/cache.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@abc.abstractmethod
def get_or_compute(self, key: str, computer: Callable, force: bool = False) -> Any:
    """
    Get value for given key if cached or compute and cache it.

    Args:
        key: key under which value is cached
        computer: function which returns value if not cached
        force: recompute value even if it is in cache

    Returns:
        cached or computed value
    """
    pass

subcache(*args) abstractmethod

Create separate sub-cache of this cache.

Source code in taskchain/cache.py
58
59
60
61
@abc.abstractmethod
def subcache(self, *args) -> 'Cache':
    """Create separate sub-cache of this cache."""
    pass

DummyCache

Bases: Cache

No caching.

Source code in taskchain/cache.py
64
65
66
67
68
69
70
71
72
73
74
75
76
class DummyCache(Cache):
    """No caching."""

    def get(self, key: str):
        return NO_VALUE

    def get_or_compute(self, key: str, computer: Callable, force: bool = False):
        """"""
        return computer()

    def subcache(self, *args):
        """"""
        return self

InMemoryCache

Bases: Cache

Cache only in memory.

Source code in taskchain/cache.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class InMemoryCache(Cache):
    """Cache only in memory."""

    def __init__(self):
        self._memory = defaultdict(dict)
        self._subcaches = defaultdict(dict)

    def get(self, key: str):
        return self._memory[get_ident()].get(key, NO_VALUE)

    def get_or_compute(self, key: str, computer: Callable, force: bool = False):
        """"""
        if key not in self._memory[get_ident()] or force:
            self._memory[get_ident()][key] = computer()
        return self._memory[get_ident()][key]

    def subcache(self, name):
        """"""
        if name not in self._subcaches[get_ident()]:
            self._subcaches[get_ident()][name] = InMemoryCache()
        return self._subcaches[get_ident()][name]

    def __len__(self):
        return len(self._memory[get_ident()])

FileCache

Bases: Cache

General cache for saving values in files.

Source code in taskchain/cache.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class FileCache(Cache):
    """General cache for saving values in files."""

    def __init__(self, directory: Union[str, Path]):
        self.directory = Path(directory)
        self.directory.mkdir(exist_ok=True, parents=True)

    def filepath(self, key: str) -> Path:
        key_hash = sha256(key.encode()).hexdigest()
        directory = self.directory / key_hash[:5]
        directory.mkdir(exist_ok=True)
        return directory / f'{key_hash[5:]}.{self.extension}'

    def get(self, key):
        filepath = self.filepath(key)
        lock = FileLock(str(filepath) + '.lock')
        with lock:
            filepath_exists = filepath.exists()
        if filepath_exists:
            try:
                return self.load_value(filepath, key)
            except CacheException as error:
                raise error
            except Exception as error:
                logger.warning('Cannot load cached value.')
                logger.exception(error)
        return NO_VALUE

    def get_or_compute(self, key, computer, force=False):
        """"""
        filepath = self.filepath(key)
        lock = FileLock(str(filepath) + '.lock')
        with lock:
            filepath_exists = filepath.exists()
        if filepath_exists and not force:
            try:
                return self.load_value(filepath, key)
            except CacheException as error:
                raise error
            except Exception as error:
                logger.warning('Cannot load cached value.')
                logger.exception(error)

        with lock:
            logger.debug(f'Computing cache for key {key} | file: {filepath}')
            value = computer()
            self.save_value(filepath, key, value)
        return value

    @abc.abstractmethod
    def save_value(self, filepath: Path, key: str, value: Any):
        pass

    @abc.abstractmethod
    def load_value(self, filepath: Path, key: str) -> Any:
        pass

    def subcache(self, directory: Union[str, Path]):
        """"""
        return self.__class__(self.directory / directory)

    @property
    @abc.abstractmethod
    def extension(self):
        pass

JsonCache

Bases: FileCache

Cache json-like objects in .json files.

Source code in taskchain/cache.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class JsonCache(FileCache):
    """Cache json-like objects in `.json` files."""

    def __init__(self, directory, allow_nones=True):
        super().__init__(directory)
        self.allow_nones = allow_nones

    def save_value(self, filepath: Path, key: str, value: Any):
        if value is None and not self.allow_nones:
            raise CacheException(f'The cache value for key {key} is None')
        with filepath.open('w') as f:
            json.dump({'key': key, 'value': value}, f)

    def load_value(self, filepath: Path, key: str) -> Any:
        with filepath.open('r') as file:
            loaded = json.load(file)
            if key != loaded['key']:
                raise CacheException(
                    f'The expected cache key {key} does not match to the retrieved one {loaded["key"]}'
                )
            if loaded['value'] is None and not self.allow_nones:
                raise CacheException(f'The cache value for key {key} is None, file: {filepath}')
            return loaded['value']

    @property
    def extension(self):
        return 'json'

DataFrameCache

Bases: FileCache

Cache pandas DataFrame objects in .pd files.

Source code in taskchain/cache.py
201
202
203
204
205
206
207
208
209
210
211
212
class DataFrameCache(FileCache):
    """Cache pandas DataFrame objects in `.pd` files."""

    def save_value(self, filepath: Path, key: str, value: Any):
        value.to_pickle(filepath)

    def load_value(self, filepath: Path, key: str) -> Any:
        return pd.read_pickle(filepath)

    @property
    def extension(self):
        return 'pd'

NumpyArrayCache

Bases: FileCache

Cache numpy arrays in .npy files.

Source code in taskchain/cache.py
215
216
217
218
219
220
221
222
223
224
225
226
class NumpyArrayCache(FileCache):
    """Cache numpy arrays in `.npy` files."""

    def save_value(self, filepath: Path, key: str, value: Any):
        np.save(filepath, value)

    def load_value(self, filepath: Path, key: str) -> Any:
        return np.load(filepath, allow_pickle=True)

    @property
    def extension(self):
        return 'npy'

cached

Decorator for automatic caching of method results. Decorated method is for given arguments called only once a result is cached. Cache key is automatically constructed based on method arguments. Cache can be defined in decorator or as attribute of object.

Source code in taskchain/cache.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
class cached:
    """
    Decorator for automatic caching of method results.
    Decorated method is for given arguments called only once a result is cached.
    Cache key is automatically constructed based on method arguments.
    Cache can be defined in decorator or as attribute of object.
    """

    def __init__(
        self,
        cache_object: Cache = None,
        key: Callable = None,
        cache_attr: str = 'cache',
        ignore_kwargs: List[str] = None,
    ):
        """
        Args:
            cache_object: Cache used for caching.
            key: custom function for computing key from arguments
            cache_attr: if `cache_object` is None, object attribute with this name is used
            ignore_kwargs: kwargs to ignore in key construction, e.g. `verbose`
        """
        if callable(cache_object):
            self.method = cache_object
            cache_object = None
        self.cache_object = cache_object
        self.key = key
        self.cache_attr = cache_attr
        self.ignore_params = ignore_kwargs if ignore_kwargs else []

    def __call__(self, method):
        @functools.wraps(method)
        def decorated(obj, *args, force_cache=False, store_cache_value=NO_VALUE, only_cache=False, **kwargs):
            assert store_cache_value is NO_VALUE or not only_cache
            if self.cache_object is None:
                assert hasattr(obj, self.cache_attr), f'Missing cache argument for obj {obj}'
                cache = getattr(obj, self.cache_attr).subcache(method.__name__)
            else:
                cache = self.cache_object

            if self.key is None:
                for i, (arg, parameter) in enumerate(signature(method).parameters.items()):
                    if i == 0:
                        # skip self
                        continue
                    if i - 1 < len(args):
                        kwargs[arg] = args[i - 1]
                    if parameter.default != Parameter.empty and arg not in kwargs:
                        kwargs[arg] = parameter.default
                args = []
                key_kwargs = {k: v for k, v in kwargs.items() if k not in self.ignore_params}
                # we use json module from standard library to ensure backward
                # compatibility
                cache_key = orig_json.dumps(key_kwargs, sort_keys=True)
            else:
                cache_key = self.key(*args, **kwargs)

            if only_cache:
                return cache.get(cache_key)

            if store_cache_value is NO_VALUE:
                computer = lambda: method(obj, *args, **kwargs)  # noqa: E731
            else:
                computer = lambda: store_cache_value  # noqa: E731

            return cache.get_or_compute(cache_key, computer, force=force_cache)

        return decorated

    def __get__(self, instance, instancetype):
        return functools.wraps(self.method)(functools.partial(self(self.method), instance))

__init__(cache_object=None, key=None, cache_attr='cache', ignore_kwargs=None)

Parameters:

Name Type Description Default
cache_object Cache

Cache used for caching.

None
key Callable

custom function for computing key from arguments

None
cache_attr str

if cache_object is None, object attribute with this name is used

'cache'
ignore_kwargs List[str]

kwargs to ignore in key construction, e.g. verbose

None
Source code in taskchain/cache.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def __init__(
    self,
    cache_object: Cache = None,
    key: Callable = None,
    cache_attr: str = 'cache',
    ignore_kwargs: List[str] = None,
):
    """
    Args:
        cache_object: Cache used for caching.
        key: custom function for computing key from arguments
        cache_attr: if `cache_object` is None, object attribute with this name is used
        ignore_kwargs: kwargs to ignore in key construction, e.g. `verbose`
    """
    if callable(cache_object):
        self.method = cache_object
        cache_object = None
    self.cache_object = cache_object
    self.key = key
    self.cache_attr = cache_attr
    self.ignore_params = ignore_kwargs if ignore_kwargs else []

taskchain.utils.clazz

persistent

Method decorator. Has to be used on decorator without arguments. Saves result in self.__method_name and next time does not call decorated method and only return saved value.

Source code in taskchain/utils/clazz.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class persistent:
    """
    Method decorator.
    Has to be used on decorator without arguments.
    Saves result in `self.__method_name`
    and next time does not call decorated method and only return saved value.
    """

    def __init__(self, method):
        self.method = method

    def __call__(self, obj):
        attr = f'__{self.method.__name__}'
        if not hasattr(obj, attr) or getattr(obj, attr) is None:
            setattr(obj, attr, self.method(obj))
        return getattr(obj, attr)

    def __get__(self, instance, instancetype):
        return functools.partial(self.__call__, instance)

repeat_on_error

Method decorator which calls method again on error.

Source code in taskchain/utils/clazz.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class repeat_on_error:
    """ Method decorator which calls method again on error. """

    def __init__(self, retries: int = 10, waiting_time: int = 1, wait_extension: float = 1.):
        """
        Args:
            retries: how many times try to call again
            waiting_time: how many seconds wait before first retry
            wait_extension: how many times increase waiting time after each retry
        """
        if callable(retries):
            self.method = retries
            retries = 10
        self.retries = retries
        self.waiting_time = waiting_time
        self.wait_extension = wait_extension

    def __call__(self, method):
        def decorated(*args, **kwargs):
            waiting_time = self.waiting_time
            for i in range(self.retries):
                try:
                    return method(*args, **kwargs)
                except Exception as error:
                    if i + 1 == self.retries:
                        raise error
                    sleep(waiting_time)
                    waiting_time *= self.wait_extension
            assert False
        return decorated

    def __get__(self, instance, instancetype):
        return functools.partial(self(self.method), instance)

__init__(retries=10, waiting_time=1, wait_extension=1.0)

Parameters:

Name Type Description Default
retries int

how many times try to call again

10
waiting_time int

how many seconds wait before first retry

1
wait_extension float

how many times increase waiting time after each retry

1.0
Source code in taskchain/utils/clazz.py
53
54
55
56
57
58
59
60
61
62
63
64
65
def __init__(self, retries: int = 10, waiting_time: int = 1, wait_extension: float = 1.):
    """
    Args:
        retries: how many times try to call again
        waiting_time: how many seconds wait before first retry
        wait_extension: how many times increase waiting time after each retry
    """
    if callable(retries):
        self.method = retries
        retries = 10
    self.retries = retries
    self.waiting_time = waiting_time
    self.wait_extension = wait_extension

taskchain.utils.io

write_jsons(jsons, filename, use_tqdm=True, overwrite=True, nan_to_null=True, **kwargs)

Write json-like object to .jsonl file (json lines).

Parameters:

Name Type Description Default
jsons Iterable

Iterable of json-like objects.

required
filename Path | str required
use_tqdm bool

Show progress bar.

True
overwrite bool

Overwrite existing file.

True
nan_to_null bool

Change nan values to nulls.

True
**kwargs

other arguments to tqdm.

{}
Source code in taskchain/utils/io.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def write_jsons(jsons, filename, use_tqdm=True, overwrite=True, nan_to_null=True, **kwargs):
    """
    Write json-like object to `.jsonl` file (json lines).
    Args:
        jsons (Iterable): Iterable of json-like objects.
        filename (Path | str):
        use_tqdm (bool): Show progress bar.
        overwrite (bool): Overwrite existing file.
        nan_to_null (bool): Change nan values to nulls.
        **kwargs: other arguments to tqdm.
    """
    filename = Path(filename)
    assert not filename.exists() or overwrite, 'File already exists'
    with filename.open('w') as f:
        for j in progress_bar(jsons, disable=not use_tqdm, desc=f'Writing to {f.name}', **kwargs):
            f.write(json.dumps(j) + '\n')

iter_json_file(filename, use_tqdm=True, **kwargs)

Yield loaded jsons from .jsonl file (json lines).

Parameters:

Name Type Description Default
filename Path | str required
use_tqdm bool True
**kwargs

additional arguments to tqdm

{}
Source code in taskchain/utils/io.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def iter_json_file(filename, use_tqdm=True, **kwargs):
    """
    Yield loaded jsons from `.jsonl` file (json lines).

    Args:
        filename (Path | str):
        use_tqdm (bool):
        **kwargs: additional arguments to tqdm

    Returns:

    """
    filename = Path(filename)
    with filename.open() as f:
        for row in progress_bar(f, disable=not use_tqdm, desc=f'Reading from {f.name}', **kwargs):
            yield json.loads(row.strip())

taskchain.utils.iter

list_or_str_to_list(value)

Helper function for cases where list of string is expected but single string is also ok.

Parameters:

Name Type Description Default
value Union[None, List[str], str] required

Returns:

Type Description
List[str]

original list or original string in list

Source code in taskchain/utils/iter.py
45
46
47
48
49
50
51
52
53
54
55
56
def list_or_str_to_list(value: Union[None, List[str], str]) -> List[str]:
    """ Helper function for cases where list of string is expected but single string is also ok.

    Args:
        value:

    Returns:
        original list or original string in list
    """
    if isinstance(value, str):
        return [value]
    return value

taskchain.utils.migration

migrate_to_parameter_mode(config, target_dir, dry=True, verbose=True)

Migrate a chain to parameter mode.

Parameters:

Name Type Description Default
config Config

config defining the chain

required
target_dir

dir to migrate data to

required
dry bool

show only info, do not copy data

True
verbose bool True
Source code in taskchain/utils/migration.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def migrate_to_parameter_mode(config: Config, target_dir, dry: bool = True, verbose: bool = True):
    """
    Migrate a chain to parameter mode.

    Args:
        config: config defining the chain
        target_dir: dir to migrate data to
        dry: show only info, do not copy data
        verbose:
    """
    assert config.base_dir != target_dir, 'target_dir has to be different from configs base_dir'
    old_chain = {
        t.fullname: t
        for t in config.chain(parameter_mode=False).tasks.values()
    }
    new_chain = {
        t.fullname: t
        for t in Config(target_dir, config._filepath, global_vars=config.global_vars, context=config.context).chain().tasks.values()
    }
    print(f'Set dry=False to make copies')
    for name, old_task in old_chain.items():
        print()
        new_task = new_chain[name]
        print(f'{name}  -  {new_task.name_for_persistence}')
        if verbose:
            print(f'  parameters: `{new_task.params.repr}`')
            print(f' input tasks: `{"###".join(f"{n}={it}" for n, it in sorted(new_task.get_config().input_tasks.items()))}`')

        if issubclass(old_task.data_class, InMemoryData):
            print('   not persisting')
            continue

        if not old_task.has_data:
            print('   no data found')
            continue

        print(f'\n    original: `{old_task.data_path}`')
        print(f'      target: `{new_task.data_path}`')

        if new_task.has_data:
            # HACK: pd files do not have to have the same size with the same data
            if new_task.data_path.name.endswith('.pd'):
                assert isclose(new_task.data_path.stat().st_size, old_task.data_path.stat().st_size, rel_tol=2e-7, abs_tol=10), f'{new_task.data_path.stat().st_size} vs. {old_task.data_path.stat().st_size}'
            else:
                assert new_task.data_path.stat().st_size == old_task.data_path.stat().st_size
            print(f'    target already exists')
            continue

        if dry:
            print('    to copy')
        else:
            print('    copying')
            if old_task.data_path.is_file():
                copyfile(old_task.data_path, new_task.data_path)
            else:
                copytree(old_task.data_path, new_task.data_path)
            print('    copied')

taskchain.utils.threading

parallel_map(fun, iterable, threads=10, sort=True, use_tqdm=True, desc='Running tasks in parallel.', total=None, chunksize=1000)

Map function to iterable in multiple threads.

Parameters:

Name Type Description Default
fun Callable

function to apply

required
iterable Iterable required
threads int

number of threads

10
sort bool

return values in same order as itarable

True
use_tqdm bool

show progressbar

True
desc str

text of progressbar

'Running tasks in parallel.'
total int

size of iterable to allow show better progressbar

None

Returns:

Name Type Description
list

of returned values by fce

Source code in taskchain/utils/threading.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def parallel_map(
    fun: Callable,
    iterable: Iterable,
    threads: int = 10,
    sort: bool = True,
    use_tqdm: bool = True,
    desc: str = 'Running tasks in parallel.',
    total: int = None,
    chunksize: int = 1000,
):
    """
    Map function to iterable in multiple threads.

    Args:
        fun: function to apply
        iterable:
        threads: number of threads
        sort: return values in same order as itarable
        use_tqdm: show progressbar
        desc: text of progressbar
        total: size of iterable to allow show better progressbar

    Returns:
        list: of returned values by fce
    """
    iterable = iterable
    if isinstance(iterable, list) and total is None:
        total = len(iterable)
    if threads == 1:
        return [fun(v) for v in (tqdm(iterable, desc=desc, total=total, maxinterval=2) if use_tqdm else iterable)]

    def _fun(i, arg):
        return i, fun(arg)

    pbar = tqdm(desc=desc, total=total, maxinterval=2) if use_tqdm else None

    async def _run(chunk):
        with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
            loop = asyncio.get_event_loop()
            futures = [loop.run_in_executor(executor, _fun, i, input_value) for i, input_value in enumerate(chunk)]
            result = []
            for output_value in asyncio.as_completed(futures):
                to_append = await output_value
                if use_tqdm:
                    pbar.update()
                result.append(to_append)
            return result
        gc.collect()

    loop = asyncio.get_event_loop()
    result = []
    for chunk in chunked(iterable, chunksize=chunksize):
        chunk_result = loop.run_until_complete(_run(chunk))
        for _, res in sorted(chunk_result, key=lambda ires: ires[0]) if sort else chunk_result:
            result.append(res)
    return result

parallel_starmap(fun, iterable, **kwargs)

Allows use parallel_map for function with multiple arguments.

Parameters:

Name Type Description Default
fun Callable

function with multiple arguments

required
iterable Iterable

lists or tuples of arguments

required
Source code in taskchain/utils/threading.py
69
70
71
72
73
74
75
76
77
78
79
80
81
def parallel_starmap(fun: Callable, iterable: Iterable, **kwargs):
    """
    Allows use `parallel_map` for function with multiple arguments.

    Args:
        fun: function with multiple arguments
        iterable: lists or tuples of arguments
    """

    def _call(d):
        return fun(*d)

    return parallel_map(_call, iterable, **kwargs)