parsl.dataflow.memoization.Memoizer

class parsl.dataflow.memoization.Memoizer(*, memoize: bool = True, checkpoint_files: Sequence[str] | None, checkpoint_period: str | None, checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None)[source]

Memoizer is responsible for ensuring that identical work is not repeated.

When a task is repeated, i.e., the same function is called with the same exact arguments, the result from a previous execution is reused. wiki

The memoizer implementation here does not collapse duplicate calls at call time, but works only when the result of a previous call is available at the time the duplicate call is made.

For instance:

No advantage from                 Memoization helps
memoization here:                 here:

 TaskA                            TaskB
   |   TaskA                        |
   |     |   TaskA                done  (TaskB)
   |     |     |                                (TaskB)
 done    |     |
       done    |
             done

The memoizer creates a lookup table by hashing the function name and its inputs, and storing the results of the function.

When a task is ready for launch, i.e., all of its arguments have resolved, we add its hash to the task datastructure.

__init__(*, memoize: bool = True, checkpoint_files: Sequence[str] | None, checkpoint_period: str | None, checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None)[source]

Initialize the memoizer.

KWargs:
  • memoize (Bool): enable memoization or not.

  • checkpoint (Dict): A checkpoint loaded as a dict.

Methods

__init__(*[, memoize])

Initialize the memoizer.

check_memo(task)

Create a hash of the task and its inputs and check the lookup table for this hash.

checkpoint_one(*, task)

Checkpoint a single task to a checkpoint file.

checkpoint_queue()

Checkpoint all tasks registered in self.checkpointable_tasks.

close()

load_checkpoints(checkpointDirs)

Load checkpoints from the checkpoint files into a dictionary.

make_hash(task)

Create a hash of the task inputs.

start()

update_checkpoint(task_record)

update_memo_exception(task, e)

update_memo_result(task, r)

Attributes

run_dir

check_memo(task: TaskRecord) Future[Any] | None[source]

Create a hash of the task and its inputs and check the lookup table for this hash.

If present, the results are returned.

Parameters:

task (-) – task from the dfk.tasks table

Returns:

  • Result of the function if present in table, wrapped in a Future

This call will also set task[‘hashsum’] to the unique hashsum for the func+inputs.

checkpoint_one(*, task: TaskRecord) None[source]

Checkpoint a single task to a checkpoint file.

By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/tasks.pkl

Kwargs:
  • task : A task to checkpoint.

Note

Checkpointing only works if memoization is enabled

checkpoint_queue() None[source]

Checkpoint all tasks registered in self.checkpointable_tasks.

By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/tasks.pkl

Note

Checkpointing only works if memoization is enabled

close() None[source]
load_checkpoints(checkpointDirs: Sequence[str] | None) Dict[str, Future][source]

Load checkpoints from the checkpoint files into a dictionary.

The results are used to pre-populate the memoizer’s lookup_table

Kwargs:
  • checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]

Returns:

  • dict containing, hashed -> future mappings

make_hash(task: TaskRecord) str[source]

Create a hash of the task inputs.

Parameters:

task (-) – Task dictionary from dfk.tasks

Returns:

A unique hash string

Return type:

  • hash (str)

run_dir: str[source]
start() None[source]
update_checkpoint(task_record: TaskRecord) None[source]
update_memo_exception(task: TaskRecord, e: BaseException) None[source]
update_memo_result(task: TaskRecord, r: Any) None[source]