Skip to main content

Parallel Stream Task for Python

Project description

Streamtask

Streamtask is a lightweight python parallel framework for parallelizing the computationally intensive pipelines. It is similar to Map/Reduce, while it is more lightweight. It parallelizes each module in the pipeline with a given processing number to make it possible to leverage the different speeds in different modules. It improves the performance especially there are some heavy I/O operations in the pipeline.

Example

Suppose we want to process the data in a pipline with 3 blocks, f1, f2 and f3. We can use the following code to parallelize the processing. We can also directly add a data list or give a file name by using add_data.

def f1(total):
    import time
    for i in range(total):
        time.sleep(0.002)
        yield i * 2

def f2(n, add, third = 0.01):
    time.sleep(0.02)
    return n + add + third

def f3_the_final(n):
    time.sleep(0.03)
    return n + 1

if __name__ == "__main__":
    total = 100000
    stk = StreamTask(total = total)
    stk.add_data(data = range(total), batch_size=10) # Also support directly stream reading file
    #stk.add_module(f1, 1, total = total)
    stk.add_module(f2, 2, args = [0.5], third = 0.02)
    stk.add_module(f3_the_final, 2)
    stk.run(parallel = True)
    stk.join()
    res = stk.get_results()
    print(stk.get_results())
stream_reader (1/1):  79%|██████████████████████████████████████████████▋            | 7923/10000 [00:05<00:01, 1567.68it/s]
f2 (2/2):  29%|████████████████████▉                                                   | 1927/6635 [00:04<00:09, 476.72it/s]
f3_the_final (2/2): 100%|█████████████████████████████████████████████████████████████▉| 1927/1928 [00:04<00:00, 476.55it/s]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamtask-0.0.9.tar.gz (4.8 kB view hashes)

Uploaded Source

Built Distribution

streamtask-0.0.9-py3-none-any.whl (5.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page