Skip to main content

Consume an AWS Kinesis Data Stream to look over the records from a terminal

Project description

PyPI version Build Status

aws-kinesis-consumer

Consume an AWS Kinesis Data Stream to look over the records from a terminal.

Demo

$ aws-kinesis-consumer --stream-name MyStream

<shard_id=shardId-000000000000, records=1>
Record-001
<shard_id=shardId-000000000001, records=2>
Record-002
Record-003

Install

Python 3.6+ needs to be already installed, then :

pip install aws-kinesis-consumer

Usage

Connect to AWS

Just as the AWS CLI, aws-kinesis-consumer will use the AWS credentials from the AWS environment variables.

Recommended: When working from a local machine, aws configure can be executed once to set the environment variables :

# execute aws configure once...
$ aws configure
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Default region name [None]: eu-central-1

# ...then use aws-kinesis-consumer
$ aws-kinesis-consumer --stream-name MyStream

Alternatively, you can manually set AWS environment variables :

# set the AWS environment variables...
$ export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
$ export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
$ export AWS_DEFAULT_REGION=eu-central-1

# ...then use aws-kinesis-consumer
$ aws-kinesis-consumer --stream-name MyStream

Arguments

Argument Default Description
--stream-name (required) Name of the AWS Kinesis Stream.
--iterator-type latest Defines how to start consuming records from the stream. Use latest to consume the new records only. Or use trim-horizon to consume all the records already existing in the stream.
--endpoint Custom AWS endpoint url to communicate with the AWS API. Could be used in order to specify a region (e.g. https://kinesis.us-east-1.amazonaws.com/).
--help Shows the help message.

FAQ

What is the motivation ? What is the issue with AWS CLI ?

The AWS CLI is able to fetch records from Kinesis, but the users need to list the shards, to generate iterator tokens, use subsequent tokens, delay operations, and so on.

aws-kinesis-consumers in contrary is able to get records by using the stream name, and only the stream name. Therefore there is no need for an extra script.

How to consume a stream hosted in different regions ?

The environment variable AWS_DEFAULT_REGION can be used to specify any AWS region.

AWS_DEFAULT_REGION=eu-central-1 aws-kinesis-consumer --stream-name MyGermanStream

How to filter the records ?

aws-kinesis-consumer can be piped with other command such as grep, or even jq to filter json records.

# all the records
$ aws-kinesis-consumer --stream-name MyStream
{"name":"foo", "status":"ok"}
{"name":"bar", "status":"pending"}
{"name":"baz", "status":"error"}

# records containing the text "ba" (e.g. "bar" and "baz", but not "foo")
$ aws-kinesis-consumer --stream-name MyStream | grep "ba"
{"name":"bar", "status":"pending"}
{"name":"baz", "status":"error"}

# records where the json property "status" has the value "error"
$ aws-kinesis-consumer --stream-name MyStream | jq 'contains({status:"error"})'
{"name":"baz", "status":"error"}

What are the required AWS permissions ?

aws-kinesis-consumer requires the following AWS permissions :

The following policy is an example which can be applied to an AWS user or an AWS role :

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListShards",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:REGION:ACCOUNT-ID:stream/STREAM-NAME"
            ]
        }
    ]
}

Special thanks

  • Thanks to the contributors of the kinesalite project which make test and development of this project extremely easy and reliable!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws-kinesis-consumer-1.0.1.tar.gz (9.1 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page