A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
Project description
A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
Latest release 20250120: BaseTask.init: accept the state as positional or keyword.
Class BaseTask(cs.fsm.FSM, cs.resources.RunStateMixin)
A base class subclassing cs.fsm.FSM with a RunStateMixin.
Note that this class and the FSM base class does not provide
a FSM_DEFAULT_STATE attribute; a default state value of
None will leave .fsm_state unset.
This behaviour is is chosen mostly to support subclasses
with unusual behaviour, particularly Django's Model class
whose refresh_from_db method seems to not refresh fields
which already exist, and setting .fsm_state from a
FSM_DEFAULT_STATE class attribute thus breaks this method.
Subclasses of this class and Model should not provide a
FSM_DEFAULT_STATE attribute, instead relying on the field
definition to provide this default in the usual way.
BaseTask.as_dot(self, name=None, **kw):
Return a DOT syntax digraph starting at this Task.
Parameters are as for Task.tasks_as_dot.
BaseTask.dot_node_label(self):
The default DOT node label.
BaseTask.tasks_as_dot(tasks, name=None, *, follow_blocking=False, sep=None):
Return a DOT syntax digraph of the iterable tasks.
Nodes will be coloured according to DOT_NODE_FILLCOLOR_PALETTE
based on their state.
Parameters:
* `tasks`: an iterable of `Task`s to populate the graph
* `name`: optional graph name
* `follow_blocking`: optional flag to follow each `Task`'s
`.blocking` attribute recursively and also render those
`Task`s
* `sep`: optional node seprator, default `'
'`
BaseTask.tasks_as_svg(tasks, name=None, **kw):
Return an SVG diagram of the iterable tasks.
This takes the same parameters as tasks_as_dot.
Class BlockedError(TaskError)
Raised by a blocked Task if attempted.
main(argv)
Dummy main programme to exercise something.
make(*tasks, fail_fast=False, queue=None)
Generator which completes all the supplied tasks by dispatching them
once they are no longer blocked.
Yield each task from tasks as it completes (or becomes cancelled).
Parameters:
tasks:Tasks as positional parametersfail_fast: defaultFalse; if true, cease evaluation as soon as a task completes in a state with is notDONEqueue: optional callable to submit a task for execution later via some queue such asLateror celery
The following rules are applied by this function:
- if a task is being prepared, raise an
FSMError - if a task is already running or queued, wait for its completion
- if a task is pending:
- if any prerequisite has failed, fail this task
- if any prerequisite is cancelled, cancel this task
- if any prerequisite is pending, make it first
- if any prerequisite is not done, fail this task
- otherwise dispatch this task and then yield it
- if
fail_fastand the task is not done, return
Examples:
>>> t1 = Task('t1', lambda: print('doing t1'), track=True)
>>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
>>> list(make(t2)) # doctest: +ELLIPSIS
t1 PENDING->dispatch->RUNNING
doing t1
t1 RUNNING->done->DONE
t2 PENDING->dispatch->RUNNING
doing t2
t2 RUNNING->done->DONE
[Task('t2',<function <lambda> at ...>,state='DONE')]
make_later(L, *tasks, fail_fast=False)
Dispatch the tasks via L:Later for asynchronous execution
if it is not already completed.
The caller can wait on t.result for completion.
This calls make_now() in a thread and uses L.defer to
queue the task and its prerequisites for execution.
make_now(*tasks, fail_fast=False, queue=None)
Run the generator make(*tasks) to completion and return the
list of completed tasks.
Class Task(BaseTask, cs.threads.HasThreadState)
A task which may require the completion of other tasks.
The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if self.run(func,...) raises an exception from
func then this Task will still block dependent Tasks.
Dually, a Task which completes without an exception is
considered complete and does not block dependent Tasks.
Keyword parameters:
cancel_on_exception: if true, cancel thisTaskif.runraises an exception; the default isFalse, allowing repair and retrycancel_on_result: optional callable to test theTask.resultafter.run; if the callable returnsTruetheTaskis marked as cancelled, allowing repair and retryfunc: the function to call to complete theTask; it will be called asfunc(*func_args,**func_kwargs)func_args: optional positional arguments, default()func_kwargs: optional keyword arguments, default{}lock: optional lock, default anRLockstate: initial state, default fromself._state.initial_state, which is initally 'PENDING'track: defaultFalse; ifTruethen apply a callback for all states to print task transitions; otherwise it should be a callback function suitable forFSM.fsm_callbackOther arguments are passed to theResultinitialiser.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Users wanting more immediate semantics can supply
cancel_on_exception and/or cancel_on_result to control
these behaviours.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Task.__call__(self):
Block on self.result awaiting completion
by calling self.result().
Task.bg(self):
Dispatch a function to complete the Task in a separate Thread,
returning the Thread.
This raises BlockedError for a blocked task.
otherwise the thread runs self.dispatch().
Task.block(self, otask):
Block another task until we are complete.
The converse of .require().
Task.blockers(self):
A generator yielding tasks from self.required
which should block this task.
Aborted tasks are not blockers
but if we encounter one we do abort the current task.
Task.cancel(self):
Transition this Task to CANCELLED state.
If the task is running, set .cancelled on the RunState,
allowing clean task cancellation and subsequent transition
(mediated by the .run() method).
Otherwise fire the 'cancel' event directly.
Task.dispatch(self):
Dispatch the Task:
If the task is blocked, raise BlockedError.
If a prerequisite is aborted, fire the 'abort' method.
Otherwise fire the 'dispatch' event and then run the
task's function via the .run() method.
Task.isblocked(self):
A task is blocked if any prerequisite is not complete.
Task.iscompleted(self):
This task is completed (even if failed) and does not block contingent tasks.
Task.join(self):
Wait for this task to complete.
Task.make(self, fail_fast=False):
Complete self and its prerequisites.
This calls the global make() function with self.
It returns a Boolean indicating whether this task completed.
Task.perthread_state
Task.require(self, otask: 'TaskSubType'):
Add a requirement that otask be complete before we proceed.
This is the simple Task only version of .then().
Task.run(self):
Run the function associated with this task,
completing the self.result Result appropriately when finished.
WARNING: this ignores the current state and any blocking Tasks.
You should usually use dispatch or make.
During the run the thread local Task.default()
will be self and the self.runstate will be running.
Otherwise run func_result=self.func(*self.func_args,**self.func_kwargs)
with the following effects:
- if the function raises a
CancellationError, cancel theTask - if the function raises another exception,
if
self.cancel_on_exceptionthen cancel the task else completeself.resultwith the exception and fire the'error'`event - if
self.runstate.canceledorself.cancel_on_resultwas provided andself.cancel_on_result(func_result)is true, cancel the task - otherwise complete
self.resultwithfunc_resultand fire the'done'event
Task.then(self, func: Union[str, Callable, ForwardRef('TaskSubType')], *a, func_args=(), func_kwargs=None, **task_kw):
Prepare a new Task or function which may not run before self completes.
This may be called in two ways:
task.then(some_Task): block theTaskinstancesome_Taskbehindself`task.then([name,]func[,func_args=][,func_kwargs=][,Task_kwargs...]): make a newTaskto be blocked behindselfReturn the newTask`.
This supports preparing a chain of actions:
>>> t_root = Task("t_root", lambda: 0)
>>> t_leaf = t_root.then(lambda: 1).then(lambda: 2)
>>> t_root.iscompleted() # the root task has not yet run
False
>>> t_leaf.iscompleted() # the final task has not yet run
False
>>> # t_leaf is blocked by t_root
>>> t_leaf.dispatch() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
cs.taskqueue.BlockedError: ...
>>> t_leaf.make() # make the leaf, but make t_root first
True
>>> t_root.iscompleted() # implicitly completed by make
True
>>> t_leaf.iscompleted()
True
Class TaskError(cs.fsm.FSMError)
Raised by Task related errors.
Class TaskQueue
A task queue for managing and running a set of related tasks.
Unlike make and Task.make, this is aimed at a "dispatch" worker
which dispatches individual tasks as required.
Example 1, put 2 dependent tasks in a queue and run:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, t2)
>>> for _ in q.run(): pass
...
t1
t2
Example 2, put 1 task in a queue and run. The queue only runs the specified tasks:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1)
>>> for _ in q.run(): pass
...
t1
Example 2, put 1 task in a queue with run_dependent_tasks=True and run.
The queue pulls in the dependencies of completed tasks and also runs those:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, run_dependent_tasks=True)
>>> for _ in q.run(): pass
...
t1
t2
TaskQueue.__init__(self, *tasks, run_dependent_tasks=False):
Initialise the queue with the supplied tasks.
TaskQueue.add(self, task):
Add a task to the tasks managed by this queue.
TaskQueue.as_dot(self, name=None, **kw):
Compute a DOT syntax graph description of the tasks in the queue.
TaskQueue.get(self):
Pull a completed or an unblocked pending task from the queue.
Return the task or None if nothing is available.
The returned task is no longer tracked by this queue.
TaskQueue.run(self, runstate=None, once=False):
Process tasks in the queue until the queue has no completed tasks,
yielding each task, immediately if task.iscompleted()
otherwise after taks.dispatch().
An optional RunState may be provided to allow early termination
via runstate.cancel().
An incomplete task is dispatched before yield;
ideally it will be complete when the yield happens,
but its semantics might mean it is in another state such as CANCELLED.
The consumer of run must handle these situations.
Release Log
Release 20250120: BaseTask.init: accept the state as positional or keyword.
Release 20240423: Small fixes.
Release 20230401: Add missing requirement to DISTINFO.
Release 20230331:
- Task: subclass BaseTask instead of (FSM, RunStateMixin).
- BaseTask.init: use @uses_runstate to ensure we've got a RunState.
Release 20230217: Task: subclass HasThreadState, drop .current_task() class method.
Release 20221207:
- Pull out core stuff from Task into BaseTask, aids subclassing.
- BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.
- BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.
- BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.
Release 20220805: Initial PyPI release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cs_taskqueue-20250120.tar.gz.
File metadata
- Download URL: cs_taskqueue-20250120.tar.gz
- Upload date:
- Size: 19.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
29d862921adf2680b6f61f30f0be5ebf82cb4328afedbe728c7c992178dbe2e4
|
|
| MD5 |
64605c3352c78f6c72f5862f9783096d
|
|
| BLAKE2b-256 |
ea3b7ac4f5d455f9134a1f7207dbc99dcdd74b9f79abc1f9c70e3f447582cca2
|
File details
Details for the file cs_taskqueue-20250120-py2.py3-none-any.whl.
File metadata
- Download URL: cs_taskqueue-20250120-py2.py3-none-any.whl
- Upload date:
- Size: 17.4 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.0.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c8fe7182ce98ba01a2706c0332807f72456bdd037a465a4765440f9a841a30a8
|
|
| MD5 |
863b51c1a2ab4c0d2f7184a4d192e0f5
|
|
| BLAKE2b-256 |
52d28b76fd6aae0672ad0f4ce660f31d853035cd6ecc0d4220ea5591fbe487b6
|