Skip to main content

A framework for content-based routing of records in a Dynamodb Stream to the callable that should handle them

Project description

dynamodb-stream-router

WARNING - Version 0.0.6 is a breaking change from version 0.0.5. Please review the documentation before upgrading

Provies a framework for mapping records in a Dynamodb stream to callables based on the event name (MODIFY, INSERT, DELETE) and content.

Installation

pip install dynamodb-stream-router

Routing

  • Routes are determined by examining the eventName in the dynamodb section of the DynamoDB stream Record and through a condition which examines the contents of the Record.
  • Conditions can either be a callable that takes a RouteRecord and returns True or False, or it may be a string expression that will be parsed into a callable. See below for the expression language.
  • Routes have a priority, which is honored in an acending order. If multiple matching routes have the same priority, they will be executed in random order (concurrectly, if an executor is provided)
  • Routes are matched based on a RouteRecord, which is a helper class that (lazily) deserializers the DynamoDB item structure (used in Keys, NewImage and OldImage) into Python types, exactly in the same way that the boto3 dynamodb Table resource does.
  • Route handling functions take a RouteRecord and can return anything. The return value is not used by the framework.

Expressions

Keywords and types:

Type Description Example
VALUE A quoted string (single or double quote), integer, or float representing a literal value 'foo', 1, 3.8
$OLD A reference to RouteRecord.old_image $OLD.foo
$NEW A reference to RouteRecord.old_image $NEW.foo
PATH A path starting from a root of $OLD or $NEW. Can be specified using dot syntax or python style keys. When using dot reference paths must conform to python's restrictions. $OLD.foo, $NEW.foo.bar, $OLD["foo"]
INDEX An integer used as an index into a list or set $OLD.foo[0]

Operators:

Symbol Action
& Logical AND
| Logical OR
() Statement grouping
== Equality
!= Non equality
> Greater than
>= Greater than or equal to
< Less than
<= Less than or equal to
=~ Regex comparison PATH =~ 'regex' where 'regex' is a quoted VALUE

Comparison operators, except for regex comparison, can compare PATH to VALUE, PATH to PATH, or even VALUE to VALUE.

functions

Function Arguments Description
has_changed(VALUE, VALUE) VALUE - Comma separated list of quoted values Tests $OLD and $NEW. If value is in one and not the other, or in both and differs, the the function will return True. Returns True if any key meets conditions.
is_type(PATH, TYPE)
  • PATH - The path to test in the form of $OLD.foo.bar</li
  • TYPE - A Dynamodb type. Can be one of S, SS, B, BS, N, NS, L, M, or BOOL
Tests if PATH exists and the VALUE at PATH is of type TYPE.
attribute_exists(PATH) PATH - The path to test Returns True if the provided path exists
from_json(PATH) PATH - The path to decode Returns object decoded using simplejson.loads()

Examples

from dynamodb_stream_router import on_insert, on_modify, on_remove, on_operations, Operation, route_records, RouteRecord

@on_insert("$NEW.foo == 'bar'", 0)
def print_new_record(record: RouteRecord) -> None:
    print(record.new_image)

def test_old_foo(record: RouteRecord) -> bool:
    return record.old_image["foo"] == "bar2"

@on_remove(test_old_foo, 0)
def print_old_record(record: RouteRecord) -> None:
    print(record.old_record)

@on_modify("has_changed('foo') & attribute_exists($NEW.foo)", 1)
def print_changed_foo(record: RouteRecord) -> None:
    print(f'{record.old_image.get("foo")} -> {record.new_image.get("foo")}')

@on_operations({Operation.INSERT, Operation.MODIFY, Operation.REMOVE}, 1)
def hello_world(record: RouteRecord) -> str:
    return "Hello, DB STREAM"

def lambda_handler(event, context):
    route_records(event["Records"])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dynamodb-stream-router-0.0.7.tar.gz (14.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dynamodb_stream_router-0.0.7-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file dynamodb-stream-router-0.0.7.tar.gz.

File metadata

  • Download URL: dynamodb-stream-router-0.0.7.tar.gz
  • Upload date:
  • Size: 14.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.8

File hashes

Hashes for dynamodb-stream-router-0.0.7.tar.gz
Algorithm Hash digest
SHA256 2b61ea31ba013d4b664ffd294340aba674353b1edb154f1d86d209ba95d5287d
MD5 ee0e269d6ba09def6b1c2e74aa073e97
BLAKE2b-256 a6feefee5bc81dce677e5a9fffc8541a3dfeda77cb39925f7900731fe3129b2c

See more details on using hashes here.

File details

Details for the file dynamodb_stream_router-0.0.7-py3-none-any.whl.

File metadata

  • Download URL: dynamodb_stream_router-0.0.7-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.8

File hashes

Hashes for dynamodb_stream_router-0.0.7-py3-none-any.whl
Algorithm Hash digest
SHA256 d7306ef4a3a56ab659f44e111b3f0f69ccf9c93e5b2dc9b19641869a666f7db5
MD5 7c51c04bec6ed6e1d0d4c6d8b3bc6fdf
BLAKE2b-256 cf32974ef6549af9178dda8f16f61d49539e026019d437866d09b1c1eaf8ea06

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page