blizz – be blizzful.
Project description
blizz – be blizzful
effortless Python table schema management
If you are looking for…
📋 …an lightweight option to capture data schemas and field annotations for either Pandas or PySpark,
⭐ …which makes table definitions available not just at runtime, but already while developing code – with IDE type checking and autocomplete!
❄️ …which allows dynamic schema inheritance/reference, to express several levels of derived data assets – as common in stable data pipelines – tracing the full data field lineage across your project and boosting refactoring and debugging productivity,
🚀 …which offers pre-built PySpark & Pandas decorators for most common data checks & transformations – all based on metadata,
🏃 …which supports your workflow with boostrapping tools to generate basic code and Sphinx documentations
then blizz is for you!
Contents:
- Installation
- Usage Tutorial
Installation
You can install the latest stable version of blizz simply using Pip:
pip install blizz
Note, that either pandas
or pyspark
need to be installed separately, if you do
not have them already.
If you have any issues, questions or suggestions – do not hesitate to open an GitHub issue.
A test installation from the latest sources is possible as well:
pip install git+https://github.com/joerg-schneider/blizz/#egg=blizz
Usage Tutorial
The following is an introduction and basic tutorial to blizz. You can find also some example scripts in src/tutorial of this repository.
blizz primitives: Relation and Field
blizz.Relation
and blizz.Field
are the main primitives implementing its behaviour.
A relation – in the computer scientific sense – refers here to any table/dataframe/... one
likes to define, made up of rows and columns, the latter defined by blizz.Field
.
DataFrames from PySpark and Pandas are the supported means of capturing data instances to
which a given Relation applies to.
The following sections explain how to use blizz.Relation
and blizz.Field
.
Basic usage with Pandas
The following is a minimal example of creating a Relation with blizz:
- define your Relation as a subclass from blizz's
Relation
superclass - add some field names as members
- add
load()
as a classmethod and implement it to return either a Spark or Pandas DataFrame
from blizz import Relation, Field
import pandas as pd
class Iris(Relation):
SEPAL_LENGTH = Field("sepal_length")
SEPAL_WIDTH = Field("sepal_width")
PETAL_LENGTH = Field("petal_length")
PETAL_WIDTH = Field("petal_width")
SPECIES = Field("species")
@classmethod
def load(cls) -> pd.DataFrame:
return pd.read_csv(
"https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7"
"/raw/0e7a9b0a5d22642a06d3d5b9bcbad9890c8ee534/iris.csv"
)
Calling load()
on the Relation, we can retrieve a dataframe:
iris_df = Iris.load()
Most importantly, now using the Relation's Schema, we can access/modify iris_df
referencing fields we like:
print(iris_df[[Iris.SEPAL_LENGTH, Iris.SEPAL_WIDTH]])
A command like above now references the member variables on Iris to access fields – these will also be autocompleted
by your IDE, checked if they exist (avoiding typos) and allow easy refactoring (e.g. if the column name changes in
the source – just change it once in Field(...)
!).
Using Field definitions inside of load()
Since load()
is a class method, it is easily possible to refer to defined Fields already in it:
from blizz import Relation, Field
import pandas as pd
class Iris(Relation):
SEPAL_LENGTH = Field("sepal_length")
SEPAL_WIDTH = Field("sepal_width")
PETAL_LENGTH = Field("petal_length")
PETAL_WIDTH = Field("petal_width")
SPECIES = Field("species")
# a newly defined field:
SEPAL_PETAL_WIDTH = Field("sepal_petal_width")
@classmethod
def load(cls) -> pd.DataFrame:
iris = pd.read_csv(
"https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7"
"/raw/0e7a9b0a5d22642a06d3d5b9bcbad9890c8ee534/iris.csv"
)
iris[cls.SEPAL_PETAL_WIDTH] = iris[cls.SEPAL_WIDTH] + iris[cls.PETAL_WIDTH]
return iris
In this example, one can see how to add another, derived field to the Relation which is just built on loading, leveraging existing Field references.
Defining Metadata properties on Field
Field
is implemented in an elegant way: It cleanly folds down to a Python string when you use it as part
of an expression (e.g. inside of square brackets for Pandas), but actually, it is not a string.
In the previous examples, Fields have been only constructed using its default first attribute, .name
.
However, it does also support a big variety of useful declarations one might like to make (see
API Reference on Relation and Field
below!), such as defining datatypes, adding a description or
the Field's default value:
class Iris(Relation):
# setting some example additional Field attributes:
# 1. of course most importantly, one can set a datatype to complete the
# Relation's schema definition in the classical sense:
SEPAL_LENGTH = Field("sepal_length", datatype=float)
# for use with Pandas, `datatype` accepts Python inbuilts (e.g. float, object, int),
# quoted names of Pandas datatypes, and also instances of numpy types, such as
# numpy.int, numpy.number, etc.
# 2. to capture metadata, and for documentation purposes, a description can be set:
SEPAL_WIDTH = Field("sepal_width", description="The Sepal length")
# 3. `default` also to capture a default row value for a Field:
PETAL_LENGTH = Field("petal_length", default=0.0)
PETAL_WIDTH = Field("petal_width")
# 4. the boolean flag `key` allows to specify key fields:
SPECIES = Field("species", key=True)
@classmethod
def load(cls) -> pd.DataFrame:
iris = pd.read_csv(
"https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7"
"/raw/0e7a9b0a5d22642a06d3d5b9bcbad9890c8ee534/iris.csv"
)
# using all defined key fields, one could run a simply deduplication command:
iris_dedup = iris.drop_duplicates(subset=cls.get_key_fields())
return iris_dedup
Relation hierarchies
Since Fields are Python variables (class members of Relation
), this makes building connected Relations very easy:
# based on Iris, we can define a derived second Relation `IrisSepal`:
class IrisSepal(Relation):
""" All records of Iris but filtered on a subset of Fields, just for the Sepal."""
# we can simply reference the existing definitions that were made:
SEPAL_LENGTH = Iris.SEPAL_LENGTH
SEPAL_WIDTH = Iris.SEPAL_WIDTH
SPECIES = Iris.SPECIES
@classmethod
def load(cls) -> pd.DataFrame:
# we call Iris's load() method but filter it down to `IrisSepal`'s fields:
return Iris.load()[cls.get_field_names()]
In the example above, the load()
method additionally shows that each Relation
offers helpful
methods to interact with metadata – in this case get_field_names()
can retrieve all defined
fields as strings. See API Reference on Relation and Field for the full list of available
methods.
Making use of metadata declarations: blizz.check & blizz.apply
Having defined structured metadata on a Relation's fields can be very powerful to carry out basic operations.
For this purpose, blizz
defines two modules with Python function decorators, meant to be
added on-top of the load()
method:
blizz.check
: defines several utilities to check a loaded Relation against the definitionblizz.apply
: defines several utilities for often occuring transformations to the Relation's dataframe
All of these utlities work are implemented with equal functionality on Pandas and Pyspark.
Consider this example, which takes care of schema checking (field names, field types, field keys), deduplication and checking for duplicates:
from blizz import Relation, Field
import blizz.check, blizz.apply
import pandas as pd
class Iris(Relation):
SEPAL_LENGTH = Field("sepal_length", datatype=float)
SEPAL_WIDTH = Field("sepal_width", datatype=float)
PETAL_LENGTH = Field("petal_length", default=0.0)
PETAL_WIDTH = Field("petal_width", datatype=float)
SPECIES = Field("species_renamed", datatype=object, key=True, source_name="species")
@classmethod
@blizz.check.types
@blizz.check.fields
@blizz.check.keys
@blizz.apply.defaults(fill=PETAL_LENGTH) # you can use field as arguments, too!
@blizz.apply.deduplication
@blizz.apply.renames # renames should be applied first
def load(cls) -> pd.DataFrame:
return pd.read_csv(
"https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7"
"/raw/0e7a9b0a5d22642a06d3d5b9bcbad9890c8ee534/iris.csv"
)
print(Iris.load())
It is important to note, that these decorator utilities execute after the load() function
has finished, and the order of writing them matters. As an example, it makes more sense
to execute blizz.apply.deduplication
first, and then check its result with blizz.check.keys
(unless you already expect it deduplicated, then avoid blizz.apply.deduplication
entirely!).
In this use case, your decorators have to be defined in this order (lowest one runs first):
@classmethod
@blizz.check.keys
@blizz.apply.deduplication
def load(cls)->pd.DataFrame:
pass
In general, when you rename source fields, run @blizz.apply.renames
first (e.g. place it
lowest), so that following checks/applies find the fields they are looking for.
It is possible to pass user defined functions meant to run as checks/applies using blizz.check.func
and blizz.apply.func
decorators – use the keyword argument function
to pass in any
callable with a signature of Type[Relation], DataFrame -> DataFrame
. Refer to
custom_decorators as an example script.
Using blizz with PySpark
Using blizz with PySpark works very similarly to the examples shown above – the only difference
lies in using different datatypes for Field definitions (either instances of pyspark.sql.types.DataType
or the PySpark datatypes simpleString
representation (e.g. "int", "string", ...)), and of course
having to implement load()
on each Relation differently, to return a pyspark.sql.DataFrame
:
from pyspark import SparkFiles
from blizz import Relation, Field
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import blizz.check
class Iris(Relation):
SEPAL_LENGTH = Field("sepal_length", datatype=DoubleType)
SEPAL_WIDTH = Field("sepal_width", datatype="double")
PETAL_LENGTH = Field("petal_length", datatype=DoubleType)
PETAL_WIDTH = Field("petal_width", datatype=DoubleType)
SPECIES = Field("species", datatype="string")
@classmethod
@blizz.check.types
@blizz.check.fields
def load(cls, spark_session) -> DataFrame:
spark_session.sparkContext.addFile(
"https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7"
"/raw/0e7a9b0a5d22642a06d3d5b9bcbad9890c8ee534/iris.csv"
)
df = spark_session.read.csv(
SparkFiles.get("iris.csv"), inferSchema=True, header=True
)
return df
# set up a simple spark session:
spark = SparkSession.builder.getOrCreate()
# calling load(), we can retrieve a dataframe for the Relation:
iris_df = Iris.load(spark)
print(iris_df)
# using the Relation's Schema, we can access/modify iris_df referencing fields we like:
iris_df.select(Iris.SEPAL_WIDTH, Iris.SEPAL_LENGTH).show()
spark.stop()
API Reference on Relation and Field
The Field()
constructor accepts the following arguments (besides name
, all optional):
name
: the target field name, referencing a field in the Relation as retrieved byload()
. If you want to retrieve an originally differently name field, then capture its source name insource_name
source_name
: the field name as in the source – useful forload()
to retrieve the Relation, before fields get renamed – such renaming can be done byblizz.apply.renames
datatype
: the Field's datatype – for Pandas, can be defined as Python built-in, numpy type, or as a String value ("int"). For PySpark, usepyspark.sql.types.DataType
or the PySpark datatypessimpleString
representation (e.g. "int", "string", ...).default
: allows one to define a literal as a default value on NULLdescription
: allows one to provide a string as a description/comment on the Fieldkey
: a boolean flag indicating if this field is part of the primary keymock
: allows one to define a callable mocking data for this Field – useful for testing of Relation's with synthetic data.
Each Relation
defines the following useful methods on class level:
Relation.get_fields()
: returns all Fields as definedRelation.get_field_names()
: returns all Fields names as definedRelation.get_types()
: returns all Fields datatypes, as a DictField -> Datatype
. Note that datatypes here are as heterogenous, as you defined them, e.g. they can either be strings or applicable classes such asint
Relation.get_key_fields()
: returns all Fields, defined as keyRelation.get_key_field_names()
: returns all names of Fields, defined as keyRelation.get_defaults()
: return a Dictionary mapping from Field to its default value, where definedRelation.get_default(<Field>)
: return the default value for a FieldRelation.mock()
: to be overridden by eachRelation
– allows one to define a mocked version ofload()
to produce synthetic data for testing.
Of course it is entirely possibly, to define a subclass of Relation
for your own purposes, which
deals with common use-cases by own class functions it adds! For instance, you might want to create
a SQLRelation
which already bundles useful methods or definitions to allow you quickly
retrieving these kind of Relations from a SQL RDBMS.
API Reference on blizz.check & blizz.apply
blizz.check
offers the following decorators for checking:
blizz.check.fields
: validates on the DataFrame returned byload()
, that it contains at least all fields (by name) as defined on the Relationblizz.check.types
: validates that all fields that have defined data types on the Relation match with their data types as in the DataFrame returned byload()
blizz.check.keys
: validates that the subset of all Fields marked as keys is uniqueblizz.check.func
: run the user defined function passed as thefunction
keyword parameter and check if it raises an exception. The function's signature should beType[Relation], DataFrame -> DataFrame
.
All blizz.check
decorators accept the argument on_fail
, which can be set to:
blizz.check.WARN
: raise a warning on failing the checkblizz.check.RAISE
: raise an exception on failing the check
blizz.apply
offers the following decorators for transformations:
blizz.apply.deduplication
: deduplicate the DataFrame returned byload()
either by the Fields already marked as keys of the Relation, or the subset of Field given as argumentkey:List[Field]
to the decorator (which has precedence). Additionally the decorator supports the argumentssort_on:List[Field]
andsort_order
(which can be eitherblizz.apply.ASC
orblizz.apply.DESC
) – this can ensure which non-key records are preferrably kept on duplication. E.g. when you find a duplicate based on the key-column, then pick among those duplicates the most recently updated row – here one might use something like:sort_on = [UPDATE_TIMESTAMP], sort_order = blizz.apply.DESC
.blizz.apply.defaults
: fill defined defaults for NULL values in the DataFrame returned byload()
, either using the defined value defaults from the Relation's Fields (if exist) or as given with as the argumentfill: List[Field]
to the decorator (which has precedence).blizz.apply.renames
: perform Field renames for columns in the DataFrame as returned byload()
, either using the defined pairs ofField.source_name
->Field.name
(where exists) or using the rename definitions as given as argumentcolumns:Dict[str,str]
to the decorator, which will be added to the renames to perform.blizz.apply.func
: run the user defined function passed as thefunction
keyword parameter and return the transformed data. The function's signature should beType[Relation], DataFrame -> DataFrame
.
Bootstrapping – generate schema definitions from DataFrames
As you've seen above, blizz can be a powerful tool in your Python data pipelines. However it needs properly defined Relations, which can be tedious at the start.
For this, blizz can boostrap Relations from existing PySpark or Pandas DataFrames using
blizz.relation_from_dataframe()
.
>>> blizz.relation_from_dataframe(df=departments_df, name="Departments")
import blizz.check
from blizz import Relation, Field
from pyspark.sql import DataFrame
from pyspark.sql.types import *
class Departments(Relation):
"""
todo: describe relation Departments in this docstring
"""
DEPARTMENT_ID = Field(name="Department_ID", datatype=StringType)
DEPARTMENT_NAME = Field(name="Department_Name", datatype=StringType)
DOE = Field(name="DOE", datatype=TimestampType)
@classmethod
@blizz.check.keys(on_fail = blizz.check.WARN)
@blizz.check.types(on_fail = blizz.check.WARN)
@blizz.check.fields(on_fail = blizz.check.WARN)
def load(cls) -> DataFrame:
"""
todo: describe relation Departments load process in this docstring
"""
# todo: implement data source loading here
pass
You can then simply copy and paste and adjust this as needed.
Beyond: blizz Feature Library
EXPERIMENTAL – only supported with PySpark
On top of blizz's primitives, the plan is to build a lightweight Feature Library component for Machine Learning. There is an example of this experimental API included as part of src/tutorial. Conceptually the API allows to express features programmatically, but in a well structured way using lightweight classes. This makes it possible then, to decouple the feature generation from the pure feature definition.
blizz
has also a CLI – serving as a client to instruct a PySpark process which features to build,
all based on a defined blizz Feature Library.
In order to do this, one:
- defines a Feature Library in code as done with src/tutorial/example_feature_library
- defines a feature list definition: this is a YAML file of all FeatureGroups/Features one wants to have calculated, being also able to specify parameters for parameterizable features and to specify the output storage format. An example is given here src/tutorial/ExampleFeatureList.yaml
- calls the
blizz
CLI with both and the output path:blizz build src/tutorial/ExampleFeatureList.yaml src/tutorial/example_feature_library my_features
This brings together the code-based definition of all data sources (by blizz primitives) and features (by blizz FeatureGroup & Feature) with the instruction in the form of the YAML file what and how (parameterized features) to calculate.
Why is this approach powerful?
- how to retrieve data sources, check them, and build features on-top is purely captured in code. This means it is fully supported by IDE type checks, autocompletion and refactoring abilities.
- due to this code centric means of definition, the whole end to end pipeline is easily managed and versioned – as a Python package.
- expressing ML features in a structured way allows to leverage a common, powerful Spark based factory to build
them – whoever wants features, only needs to call
blizz build ...
with their YAML build instructions and pointing to a blizz Feature Library codebase (which can be checked out at a version as desired). As internally, blizz pipelines are just nested function calls on top ofpyspark.sql
, this heavily benefits from both lazy evaluation and execution plan optimizations. This happens completely transparently thanks to Spark. - in this way, definitions to calculate features can be centralized and shared purely on the code level. Consumers
of features, e.g. specialized ML models, just need to install
blizz
, write YAML build instructions and point to the FeatureLibrary – their model does not even need to be in Python.
Beyond: Generate Documentation
Thanks to its rich metadata about Relations, Fields and Features, the task of documentation becomes easier
for data practitioners. On top of structured information serving as documentation, Python docstrings will
be automatically used, e.g. on class level underneath a Relation
, when describing Relation.load()
and
similarly for FeatureGroup
and Feature
.
The philosophy here is, not write documentation twice – e.g. in code and somewhere else – but rather to make it code driven as well.
A nice (experimental) feature is the following, which will create a Sphinx project from a blizz.FeatureGroup
's
metadata and serve it:
blizz docs src/tutorial/example_feature_library --serve
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.