Skip to main content

Size and time aware deduplicated asynchronous buffer.

Project description

Stdbuf

CI codecov

Size and time aware deduplicated asynchronous buffer.

Inspired by ClickHouse buffer engine. Used for the same purpose.

Usage

import asyncio
import time

from stdbuf import Stdbuf


async def read(buf: Stdbuf):
    for i in range(16):
        await buf.put(i)
        await asyncio.sleep(0.5)


async def write(buf: Stdbuf):
    while True:
        start = time.perf_counter()
        # Returns at least every 2 seconds.
        # May return earlier if full.
        data = await buf.get()
        elapsed = time.perf_counter() - start
        assert len(data) <= 4
        assert elapsed <= 2 + 1e-2


async def main():
    with Stdbuf(4, 2, True) as buf:
        done, pending = await asyncio.wait({
            asyncio.create_task(read(buf)),
            asyncio.create_task(write(buf)),
        },
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stdbuf-1.0.2.tar.gz (4.2 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page