parsl.dataflow.memoization.BasicMemoizer

class parsl.dataflow.memoization.BasicMemoizer(*, checkpoint_files: Sequence[str] | None = None, checkpoint_period: str | None = None, checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None = None, memoize: bool = True)[source]

Memoizer is responsible for ensuring that identical work is not repeated.

When a task is repeated, i.e., the same function is called with the same exact arguments, the result from a previous execution is reused. wiki

The memoizer implementation here does not collapse duplicate calls at call time, but works only when the result of a previous call is available at the time the duplicate call is made.

For instance:

No advantage from                 Memoization helps
memoization here:                 here:

 TaskA                            TaskB
   |   TaskA                        |
   |     |   TaskA                done  (TaskB)
   |     |     |                                (TaskB)
 done    |     |
       done    |
             done

The memoizer creates a lookup table by hashing the function name and its inputs, and storing the results of the function.

When a task is ready for launch, i.e., all of its arguments have resolved, we add its hash to the task datastructure.

__init__(*, checkpoint_files: Sequence[str] | None = None, checkpoint_period: str | None = None, checkpoint_mode: Literal['task_exit', 'periodic', 'dfk_exit', 'manual'] | None = None, memoize: bool = True)[source]

Initialize the memoizer.

KWargs:

  • checkpoint_filessequence of str, optional

    List of paths to checkpoint files. See parsl.utils.get_all_checkpoints() and parsl.utils.get_last_checkpoint() for helpers. Default is None.

  • checkpoint_periodstr, optional

    Time interval (in “HH:MM:SS”) at which to checkpoint completed tasks. Only has an effect if checkpoint_mode='periodic'.

  • checkpoint_modestr, optional

    Checkpoint mode to use, can be 'dfk_exit', 'task_exit', 'periodic' or 'manual'. If set to None, checkpointing will be disabled. Default is None.

  • memoize : str, enable memoization or not.

Methods

__init__(*[, checkpoint_files, ...])

Initialize the memoizer.

check_memo(task)

Create a hash of the task and its inputs and check the lookup table for this hash.

checkpoint()

This is the user-facing interface to manual checkpointing.

checkpoint_one(cc)

Checkpoint a single task to a checkpoint file.

checkpoint_queue()

Checkpoint all tasks registered in self.checkpointable_tasks.

close()

Called at DFK shutdown.

load_checkpoints(checkpointDirs)

Load checkpoints from the checkpoint files into a dictionary.

start(*, run_dir)

Called by the DFK when it starts up.

update_memo_exception(task, e)

Called by the DFK when a task completes with an exception.

update_memo_result(task, r)

Called by the DFK when a task completes with a successful result.

Attributes

run_dir

check_memo(task: TaskRecord) Future[Any] | None[source]

Create a hash of the task and its inputs and check the lookup table for this hash.

If present, the results are returned.

Parameters:

task (-) – task from the dfk.tasks table

Returns:

  • Result of the function if present in table, wrapped in a Future

This call will also set task[‘hashsum’] to the unique hashsum for the func+inputs.

checkpoint() None[source]

This is the user-facing interface to manual checkpointing.

checkpoint_one(cc: CheckpointCommand) None[source]

Checkpoint a single task to a checkpoint file.

By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/tasks.pkl

Kwargs:
  • task : A task to checkpoint.

Note

Checkpointing only works if memoization is enabled

checkpoint_queue() None[source]

Checkpoint all tasks registered in self.checkpointable_tasks.

By default the checkpoints are written to the RUNDIR of the current run under RUNDIR/checkpoints/tasks.pkl

Note

Checkpointing only works if memoization is enabled

close() None[source]

Called at DFK shutdown. This gives the checkpoint system an opportunity for graceful shutdown.

load_checkpoints(checkpointDirs: Sequence[str] | None) Dict[str, Future][source]

Load checkpoints from the checkpoint files into a dictionary.

The results are used to pre-populate the memoizer’s lookup_table

Kwargs:
  • checkpointDirs (list) : List of run folder to use as checkpoints Eg. [‘runinfo/001’, ‘runinfo/002’]

Returns:

  • dict containing, hashed -> future mappings

run_dir: str[source]
start(*, run_dir: str) None[source]

Called by the DFK when it starts up.

This is an opportunity for the memoization/checkpoint system to initialize itself.

The path to the base run directory is passed as a parameter.

update_memo_exception(task: TaskRecord, e: BaseException) None[source]

Called by the DFK when a task completes with an exception.

On every task completion, either this method or update_memo_result will be called, but not both.

This is an opportunity for the memoization/checkpoint system to record the outcome of a task for later discovery by a call to check_memo.

update_memo_result(task: TaskRecord, r: Any) None[source]

Called by the DFK when a task completes with a successful result.

On every task completion, either this method or update_memo_exception will be called, but not both.

This is an opportunity for the memoization/checkpoint system to record the outcome of a task for later discovery by a call to check_memo.