A framework for content-based routing of records in a Dynamodb Stream to the callable that should handle them
Project description
dynamodb-stream-router
WARNING - Version 0.0.6 is a breaking change from version 0.0.5. Please review the documentation before upgrading
Provies a framework for mapping records in a Dynamodb stream to callables based on the event name (MODIFY, INSERT, DELETE) and content.
Installation
pip install dynamodb-stream-router
Routing
- Routes are determined by examining the
eventNamein thedynamodbsection of the DynamoDB streamRecordand through aconditionwhich examines the contents of theRecord. - Conditions can either be a callable that takes a
RouteRecordand returnsTrueorFalse, or it may be a string expression that will be parsed into a callable. See below for the expression language. - Routes have a priority, which is honored in an acending order. If multiple matching routes have the same priority, they will be executed in random order (concurrectly, if an
executoris provided) - Routes are matched based on a
RouteRecord, which is a helper class that (lazily) deserializers the DynamoDB item structure (used inKeys,NewImageandOldImage) into Python types, exactly in the same way that the boto3 dynamodb Table resource does. - Route handling functions take a
RouteRecordand can return anything. The return value is not used by the framework.
Expressions
Keywords and types:
| Type | Description | Example |
|---|---|---|
| VALUE | A quoted string (single or double quote), integer, or float representing a literal value | 'foo', 1, 3.8 |
| $OLD | A reference to RouteRecord.old_image |
$OLD.foo |
| $NEW | A reference to RouteRecord.old_image |
$NEW.foo |
| PATH | A path starting from a root of $OLD or $NEW. Can be specified using dot syntax or python style keys. When using dot reference paths must conform to python's restrictions. | $OLD.foo, $NEW.foo.bar, $OLD["foo"] |
| INDEX | An integer used as an index into a list or set | $OLD.foo[0] |
Operators:
| Symbol | Action |
|---|---|
| & | Logical AND |
| | | Logical OR |
| () | Statement grouping |
| == | Equality |
| != | Non equality |
| > | Greater than |
| >= | Greater than or equal to |
| < | Less than |
| <= | Less than or equal to |
| =~ | Regex comparison PATH =~ 'regex' where 'regex' is a quoted VALUE |
Comparison operators, except for regex comparison, can compare PATH to VALUE, PATH to PATH, or even VALUE to VALUE.
functions
| Function | Arguments | Description |
|---|---|---|
| has_changed(VALUE, VALUE) | VALUE - Comma separated list of quoted values | Tests $OLD and $NEW. If value is in one and not the other, or in both and differs, the the function will return True. Returns True if any key meets conditions. |
| is_type(PATH, TYPE) |
|
Tests if PATH exists and the VALUE at PATH is of type TYPE. |
| attribute_exists(PATH) | PATH - The path to test | Returns True if the provided path exists |
| from_json(PATH) | PATH - The path to decode | Returns object decoded using simplejson.loads() |
Examples
from dynamodb_stream_router import on_insert, on_modify, on_remove, on_operations, Operation, route_records, RouteRecord
@on_insert("$NEW.foo == 'bar'", 0)
def print_new_record(record: RouteRecord) -> None:
print(record.new_image)
def test_old_foo(record: RouteRecord) -> bool:
return record.old_image["foo"] == "bar2"
@on_remove(test_old_foo, 0)
def print_old_record(record: RouteRecord) -> None:
print(record.old_record)
@on_modify("has_changed('foo') & attribute_exists($NEW.foo)", 1)
def print_changed_foo(record: RouteRecord) -> None:
print(f'{record.old_image.get("foo")} -> {record.new_image.get("foo")}')
@on_operations({Operation.INSERT, Operation.MODIFY, Operation.REMOVE}, 1)
def hello_world(record: RouteRecord) -> str:
return "Hello, DB STREAM"
def lambda_handler(event, context):
route_records(event["Records"])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dynamodb-stream-router-0.0.9.tar.gz.
File metadata
- Download URL: dynamodb-stream-router-0.0.9.tar.gz
- Upload date:
- Size: 14.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2abc0de5aa9b8af2d2e585d95490f368644b3b3c8f02ded68b9d385d20e8a570
|
|
| MD5 |
74532c294936a48801dded29de6e366d
|
|
| BLAKE2b-256 |
fe7babfb992b4050e78bb698c00fadc2d954cd03935465ee6fb4c42adf548ee9
|
File details
Details for the file dynamodb_stream_router-0.0.9-py3-none-any.whl.
File metadata
- Download URL: dynamodb_stream_router-0.0.9-py3-none-any.whl
- Upload date:
- Size: 13.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/34.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.9 tqdm/4.63.0 importlib-metadata/4.11.3 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
426fac55229572a12aa2e5714cb6e8347ccdef2540e6d9a16a4eb4a06d82384a
|
|
| MD5 |
6ccccfa375e1c6b4c7f333c9ef9fb965
|
|
| BLAKE2b-256 |
dee98fe0462d167746e61c5f8949ee3b7ed50a8906424c1b03ae24006ea2f0cf
|