Event-driven data pipelines
Project description
Introduction
The primary use cases of eventkit are
to send events between loosely coupled components;
to compose all kinds of event-driven data pipelines.
The interface is kept as Pythonic as possible, with familiar names from Python and its libraries where possible. For scheduling asyncio is used and there is seamless integration with it.
See the examples and the introduction notebook to get a true feel for the possibilities.
Installation
pip3 install eventkit
Python version 3.6 or higher is required.
Examples
Create an event and connect two listeners
import eventkit as ev
def f(a, b):
print(a * b)
def g(a, b):
print(a / b)
event = ev.Event()
event += f
event += g
event.emit(10, 5)
Create a simple pipeline
import eventkit as ev
event = (
ev.Sequence('abcde')
.map(str.upper)
.enumerate()
)
print(event.run()) # in Jupyter: await event.list()
Output:
[(0, 'A'), (1, 'B'), (2, 'C'), (3, 'D'), (4, 'E')]
Create a pipeline to get a running average and standard deviation
import random
import eventkit as ev
source = ev.Range(1000).map(lambda i: random.gauss(0, 1))
event = source.array(500)[ev.ArrayMean, ev.ArrayStd].zip()
print(event.last().run()) # in Jupyter: await event.last()
Output:
[(0.00790957852672618, 1.0345673260655333)]
Combine async iterators together
import asyncio
import eventkit as ev
async def ait(r):
for i in r:
await asyncio.sleep(0.1)
yield i
async def main():
async for t in ev.Zip(ait('XYZ'), ait('123')):
print(t)
asyncio.get_event_loop().run_until_complete(main()) # in Jupyter: await main()
Output:
('X', '1')
('Y', '2')
('Z', '3')
Real-time video analysis pipeline
self.video = VideoStream(conf.CAM_ID)
scene = self.video | FaceTracker | SceneAnalyzer
lastScene = scene.aiter(skip_to_last=True)
async for frame, persons in lastScene:
...
Distributed computing
The distex library provides a poolmap extension method to put multiple cores or machines to use:
from distex import Pool
import eventkit as ev
import bz2
pool = Pool()
# await pool # un-comment in Jupyter
data = [b'A' * 1000000] * 1000
pipe = ev.Sequence(data).poolmap(pool, bz2.compress).map(len).mean().last()
print(pipe.run()) # in Jupyter: print(await pipe)
pool.shutdown()
Inspired by:
Documentation
The complete API documentation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventkit-1.0.3.tar.gz.
File metadata
- Download URL: eventkit-1.0.3.tar.gz
- Upload date:
- Size: 28.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
99497f6f3c638a50ff7616f2f8cd887b18bbff3765dc1bd8681554db1467c933
|
|
| MD5 |
12271e13bcbdf4b08056959463103708
|
|
| BLAKE2b-256 |
161e0fac4e45d71ace143a2673ec642701c3cd16f833a0e77a57fa6a40472696
|
File details
Details for the file eventkit-1.0.3-py3-none-any.whl.
File metadata
- Download URL: eventkit-1.0.3-py3-none-any.whl
- Upload date:
- Size: 31.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0e199527a89aff9d195b9671ad45d2cc9f79ecda0900de8ecfb4c864d67ad6a2
|
|
| MD5 |
4def44b13603d87f8df009011bc4230f
|
|
| BLAKE2b-256 |
93d97497d650b69b420e1a913329a843e16c715dac883750679240ef00a921e2
|