Collection of transforms for the Apache beam python SDK.
Project description
A collection of random transforms that I use on my Apache beam python pipelines. Many are simple (or trivial) transforms. The most useful ones are those for reading/writing from/to relational databases.
Installation
- Using pip
pip install beam-nuggets
- From source
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .
Usage
Below example shows how you can use beam-nugget's relational_db.Read transform to read from a PostgreSQL database table.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
with beam.Pipeline(options=PipelineOptions()) as p:
source_config = relational_db.SourceConfiguration(
drivername='postgresql',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
)
records = p | "Reading records from db" >> relational_db.Read(
source_config=source_config,
table_name='months',
)
records | 'Writing to stdout' >> beam.Map(print)
An example to write to PostgreSQL table using beam-nugget's relational_db.Write transform.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
records = [
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2}
]
source_config = relational_db.SourceConfiguration(
drivername='postgresql',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
create_if_missing=True # create the database if not there
)
table_config = relational_db.TableConfiguration(
name='months',
create_if_missing=True, # automatically create the table if not there
primary_key_columns=['num'] # and use 'num' column as primary key
)
with beam.Pipeline(options=PipelineOptions()) as p:
months = p | "Reading month records" >> beam.Create(records)
months | 'Writing to DB' >> relational_db.Write(
source_config=source_config,
table_config=table_config
)
Supported transforms
IO
- relational_db.Read for reading from relational database tables.
- relational_db.Write
for writing to relational database tables.
Above transforms uses SqlAlchemy to communicate with the database, and hence they can read from and write to all relational databases supported by SqlAlchemy. The transforms are tested against PostgreSQL, MySQL and SQLite.
- csvio.Read for reading CSV files.
Others
- SelectFromNestedDict Selects a subset from records formed of nested dictionaries.
- ParseJson
- AssignUniqueId
Documentation
See here.
Development
- Install
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
export BEAM_NUGGETS_ROOT=`pwd`
pip install -e .[dev]
- Make changes on separate branches
- Run tests
cd $BEAM_NUGGETS_ROOT
python -m unittest discover -v
- Generate docs
cd $BEAM_NUGGETS_ROOT
docs/generate_docs.sh
- Create a PR against master.
Backlog
- upload to pypi
- version docs?
- Summarize the investigation of using Source/Sink Vs ParDo(and GroupBy) for IO
- Example how to run on GCP
- Sql queries support in relational_db.Read
- more nuggets: WriteToCsv
- integration tests
- DB transforms failures handling on IO transforms
- more nuggets: Elasticsearch, Mongo
- WriteToRelationalDB, logging
Licence
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
beam-nuggets-0.9.0.tar.gz
(16.4 kB
view hashes)
Built Distribution
Close
Hashes for beam_nuggets-0.9.0-py2-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d95fccc16d983bcb35d543e00b867f79d2a8cd2bc5b1c4581fa316980a8a670d |
|
MD5 | 0acb7521b0129af6371a671d025482b2 |
|
BLAKE2b-256 | 1166bf1f4726f12777f680eb32fde96cfab1cd0e6b3c73acbd281d2db9518c21 |