Skip to main content

Taskiq pipelines for task chaining.

Project description

Pipelines for taskiq

Taskiq pipelines is a fire-and-forget at its limit.

Imagine you have a really tough functions and you want to call them sequentially one after one, but you don't want to wait for them to complete. taskiq-pipeline solves this for you.

Installation

You can install it from pypi:

pip install taskiq-pipelines

After you installed it you need to add our super clever middleware to your broker.

This middleware actually decides what to do next, after current step is completed.

from taskiq_pipelines.middleware import PipelineMiddleware

my_super_broker = ...


my_super_broker.add_middlewares(
    [
        PipelineMiddleware(),
    ]
)

Also we have to admit that your broker MUST use result_backend that can be read by all your workers. Pipelines work with inmemorybroker, feel free to use it in local development.

Example

For this example I'm going to use one single script file.

import asyncio
from typing import Any, List
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq_pipelines import PipelineMiddleware, Pipeline

broker = InMemoryBroker()
broker.add_middlewares([PipelineMiddleware()])


@broker.task
def add_one(value: int) -> int:
    return value + 1


@broker.task
def repeat(value: Any, reps: int) -> List[Any]:
    return [value] * reps


@broker.task
def check(value: int) -> bool:
    return value >= 0


async def main():
    pipe = (
        Pipeline(
            broker,
            add_one,  # First of all we call add_one function.
        )
        # 2
        .call_next(repeat, reps=4)  #  Here we repeat our value 4 times
        # [2, 2, 2, 2]
        .map(add_one)  # Here we execute given function for each value.
        # [3, 3, 3, 3]
        .filter(check)  # Here we filter some values.
        # But sice our filter filters out all numbers less than zero,
        # our value won't change.
        # [3, 3, 3, 3]
    )
    task = await pipe.kiq(1)
    result = await task.wait_result()
    print("Calculated value:", result.return_value)


if __name__ == "__main__":
    asyncio.run(main())

If you run this example, it prints this:

$ python script.py
Calculated value: [3, 3, 3, 3]

Let's talk about this example. Two notable things here:

  1. We must add PipelineMiddleware in the list of our middlewares.
  2. We can use only tasks as functions we wan to execute in pipeline. If you want to execute ordinary python function - you must wrap it in task.

Pipeline itself is just a convinient wrapper over list of steps. Constructed pipeline has the same semantics as the ordinary task, and you can add steps manually. But all steps of the pipeline must implement taskiq_pipelines.abc.AbstractStep class.

Pipelines can be serialized to strings with dumps method, and you can load them back with Pipeline.loads method. So you can share pipelines you want to execute as simple strings.

Pipeline assign task_id for each task when you call kiq, and executes every step with pre-calculated task_id, so you know all task ids after you call kiq method.

How does it work?

After you call kiq method of the pipeline it pre-calculates all task_ids, serializes itself and adds serialized string to the labels of the first task in the chain.

All the magic happens in the middleware. After task is executed and result is saved, you can easily deserialize pipeline back and calculate pipeline's next move. And that's the trick. You can get more information from the source code of each pipeline step.

Available steps

We have a few steps available for chaining calls:

  1. Sequential
  2. Mapper
  3. Filter

Sequential steps

This type of step is just an ordinary call of the function. If you haven't specified param_name argument, then the result of the previous step will be passed as the first argument of the function. If you did specify the param_name argument, then the result of the previous step can be found in key word arguments with the param name you specified.

You can add sequential steps with .call_next method of the pipeline.

If you don't want to pass the result of the previous step to the next one, you can use .call_after method of the pipeline.

Mapper step

This step runs specified task for each item of the previous task's result spawning multiple tasks. But I have to admit, that the result of the previous task must be iterable. Otherwise it will mark the pipeline as failed.

After the execution you'll have mapped list. You can add mappers by calling .map method of the pipeline.

Filter step

This step runs specified task for each item of the previous task's result. But I have to admit, that the result of the previous task must be iterable. Otherwise it will mark the pipeline as failed.

If called tasks returned True for some element, this element will be added in the final list.

After the execution you'll get a list with filtered results. You can add filters by calling .filter method of the pipeline.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskiq_pipelines-0.1.4.tar.gz (12.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

taskiq_pipelines-0.1.4-py3-none-any.whl (15.8 kB view details)

Uploaded Python 3

File details

Details for the file taskiq_pipelines-0.1.4.tar.gz.

File metadata

  • Download URL: taskiq_pipelines-0.1.4.tar.gz
  • Upload date:
  • Size: 12.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.9.21 Linux/6.8.0-1021-azure

File hashes

Hashes for taskiq_pipelines-0.1.4.tar.gz
Algorithm Hash digest
SHA256 0ccc58a7d5bc0f19457bf25aec6e8b3d82ce5e63ae4416bc85b47f027aede84b
MD5 4b1489096270c7588e667e96a247c7c9
BLAKE2b-256 5779b099aec44fed65f077d88fa4e4a876b7f5b3b50f46f408feb36a4b3c3a71

See more details on using hashes here.

File details

Details for the file taskiq_pipelines-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: taskiq_pipelines-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 15.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.1 CPython/3.9.21 Linux/6.8.0-1021-azure

File hashes

Hashes for taskiq_pipelines-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 09b4999f79e74552e4a751f4670c1829eaf494a28ee06d3f47ec5f6f5bf5105d
MD5 e1e75b661df2922cd4be21eb5ae5d471
BLAKE2b-256 e3e20ba3c3797f466cebd5431d8ac70146704cc34346332127a19b2d5f6b28ac

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page