Make ML or generic pipeline more streamlined
Project description
Streamlined
Making running scripts more streamlined!
Streamlined allows you to declare a pipeline using a declarative syntax.
Install
Steamlined can be installed by running:
pip install streamlined
QuickStart
Create a logger that log message less severe than INFO to stdout and others to stderr.
Instead, you can also use a customized logger as long as it has a log method.
from streamlined.utils import create_logger, conditional_stream_mixin
logger = create_logger(name="pipeline", mixins=[conditional_stream_mixin])
Define the Pipeline configuration
import logging
from streamlined import Pipeline
from streamlined.constants import *
pipeline = Pipeline({
NAME: "adding two numbers",
ARGUMENTS: [
{
NAME: "x",
VALUE: lambda: int(input('x = ')),
LOG: {
VALUE: lambda _value_: f"x is set to {_value_}",
LEVEL: logging.INFO,
LOGGER: logger
}
},
{
NAME: "y",
VALUE: lambda: int(input('y = ')),
LOG: {
VALUE: lambda _value_: f"y is set to {_value_}",
LEVEL: logging.INFO,
LOGGER: logger
}
}
],
RUNSTAGES: [
{
NAME: "compute sum",
ACTION: lambda x, y: x + y,
LOG: {
VALUE: lambda _value_: f"x + y = {_value_}",
LEVEL: logging.INFO,
LOGGER: logger
}
}
]
})
Run the Pipeline
pipeline.run()
Components
Notations
- bolded field name implies a required field
- bolded exposed magic value implies an argument value that is available to current scope and all enclosing scopes.
Argument
Argument component is used to define a in-scope argument that can be utilized in execution component through dependency injection.
For example, suppose x is set to 1 and y is set to x + 1 at arguments section of pipeline scope, then any execution component can access x and y by requiring them as function parameters.
pipeline = Pipeline({
NAME: "adding two numbers",
ARGUMENTS: [
{
NAME: "x",
VALUE: 1
},
{
NAME: "y",
VALUE: lambda x: x + 1
}
],
RUNSTAGES: [
{
NAME: "compute sum",
ACTION: lambda x, y: x + y
}
]
})
Argument definition precedence:
- Arguments in larger scope are defined earlier than arguments in smaller scope. For example, an argument in runstep can reference an argument in runstage in its definition, but not the reverse.
- Arguments appear earlier in list are defined earlier than arguments appear later in list. For example, if
xandyare first and second item in argument list.ycan referencex, but not the reverse.
Argument naming conventions:
- Argument name are encouraged to be unique to avoid arguemnt shadowing. When multiple arguments share the same name, the the argument value in the nearest scope will be used. For example, if
xis defined in pipeline to be1and in runstagefooto be-1, referencingxin a runstep insidefoowill resolve to-1while in runstagebarwill resolve to1. - Argument name should follow Python variable naming convention when it needs to be referenced in execution components. Explicit retrieval is possible if a variable is named differently like
"Menu Items", but it will not be as straightforward as dependency injection. - If an argument is only executed for the effect, its name is encouraged to be
"_".
Syntax
ARGUMENTS: [
{
name: ...,
value: ...,
logging: ...,
cleanup: ...,
validator: ...
},
...
]
| Field Name | Field Value | Expose Magic Value | ||||||
|---|---|---|---|---|---|---|---|---|
| name |
|
_name_ |
||||||
| value |
|
_value_ |
||||||
| logging | See Logging Component | |||||||
| cleanup | See Cleanup Component | |||||||
| validator | See Validator Component |
Cleanup
Cleanup component is exactly the same as the execution component except it will be executed last. Therefore, it is perfect to perform some cleanup actions like closing a file, ending a database connection...
Syntax
CLEANUP: <action>
| Field Name | Field Value | ||||
|---|---|---|---|---|---|
| action |
|
Execution
Execution component is pivotal in pipeline definition as it can produce a new value utilizing already-defined values.
The value for executed action can be any Callable -- a lambda or a function. And if this callable has any parameters, those values will be resolved at invocation time.
Dependency Injection will succeed if and only if parameter name match the name of a in-scope declared argument.
Possible ways of Argument Declaration:
- Through argument component (most frequent)
- Through automatically exposed magic values.
- Through explicitly bound argument -- calls of
bindone,bind,run.
An argument is in scope if and only if it is defined in current scope or any enclosing scope. For example, if x is referenced in a runstep execution component, applicable scopes include this runstep scope, enclosing runstage scope, enclosing pipeline scope (global scope).
Syntax
ACTION: <action>
| Field Name | Field Value | ||||
|---|---|---|---|---|---|
| action |
|
Logging
Logging component is responsible for logging running details of enclosing component.
If logger is not specified, it will use logging.getLogger() to retrieve a default logger. But it is more encouraged to pass in a customized logger befitting your need. The passed in logger should possess a log(level, msg) method.
steamlined.utils.log also expose some utilities methods to quickly create loggers. create_logger takes in a name, level, and mixins to create a logger. If mixins are not passed, then current logger class is used to create a logger with specified name and level. create_async_logger takes same arguments and creates a multithreading-compatible equivalent.
Syntax
-
Full Syntax
LOG: { VALUE: ..., LEVEL: ..., LOGGER: ... }
-
Only specify log message
LOG: ...
| Field Name | Field Value | ||||||
|---|---|---|---|---|---|---|---|
| value |
|
||||||
| level |
|
||||||
| logger |
|
Pipeline
A pipeline component is the topmost-level of configuration. For example, arguments defined at this scope can be referenced in all other scopes. Pipeline is composed by a list of runstages and the return value of the pipeline component is the return values of runstages.
Also _pipeline_ will be exposed as a magic property to reference current pipeline. To explicitly bind an argument at global level, bindone(name, value) can be used.
Skip
Skip is a special field present in pipeline configuration (it is also present in runstage component and runstep component) which controls conditionally execution of enclosing component.
It can be configured in any of the following ways:
-
Boolean Flag:
"skip": Trueor"skip": False -
An execution component that evaluates to boolean flag
"skip": lambda: True -
A dictionary where value determines whether enclosing component should be skipped and action specifies an action to execute in replacement if enclosing component is skipped.
"skip": { "value": True, "action": lambda: print('skipped') }
-
Not specifying any, it will default to
"skip": False
Syntax
{
NAME: ...,
TAG: ...,
ARGUMENTS: ...,
RUNSTAGES: ...,
CLEANUP: ...,
VALIDATOR: ...,
SKIP: ...
}
| Field Name | Field Value | Expose Magic Value | ||||||
|---|---|---|---|---|---|---|---|---|
| name |
|
_name_ |
||||||
| tags |
|
_tags_ |
||||||
| arguments | See Argument Component | |||||||
| runstages | See Runstage Component | |||||||
| validator | See Validator Component | |||||||
| skip | See Skip field |
Runstage
Runstage is the intermediate level between pipeline and runstep -- pipeline is composed of a list of runstages while a runstage is composed of a list of runsteps. In other words, runstage represent a grouping of runsteps.
Arguments defined in runstage will be available through dependency injection to all enclosed runsteps.
Runstage exposes a magic property _runsteps_ which represent enclosed runsteps. It can be used to explicitly bind an argument at runsteps level. For example, if first runstage exposes an argument through lambda _runsteps_: _runsteps_.bindone('x', 1) can be used, later runstages can reference x through dependency injection.
Runstage also has a special action field. When this field is not specified, the default action is to run in order the enclosed runsteps (equivalent of calling _runsteps_.run()) and collect all return values as a list. If action is specified, then this action is responsible for running runsteps explicitly if necessary.
Syntax
RUNSTAGES: [
{
NAME: ...,
TAG: ...,
ARGUMENTS: ...,
RUNSTEPS: ...,
ACTION: ...,
LOG: ...,
CLEANUP: ...,
VALIDATOR: ...,
SKIP: ...
},
...
]
| Field Name | Field Value | Expose Magic Value | ||||||
|---|---|---|---|---|---|---|---|---|
| name |
|
_name_ |
||||||
| tags |
|
_tags_ |
||||||
| arguments | See Argument Component | |||||||
| runsteps | See Runstep Component | _runsteps_ |
||||||
| action | See Execution Component | |||||||
| log | See Log Component | |||||||
| cleanup | See Cleanup Component | |||||||
| validator | See Validator Component | |||||||
| skip | See Skip field |
Runstep
Runstep is the lowest running unit of pipeline. It should ideally represent a trivial task like running a shell script. This task should be defined as the action field.
Syntax
RUNSTAGES: [
{
NAME: ...,
TAG: ...,
ARGUMENTS: ...,
ACTION: ...,
LOG: ...,
CLEANUP: ...,
VALIDATOR: ...,
SKIP: ...
},
...
]
| Field Name | Field Value | Expose Magic Value | ||||||
|---|---|---|---|---|---|---|---|---|
| name |
|
_name_ |
||||||
| tags |
|
_tags_ |
||||||
| arguments | See Argument Component | |||||||
| action | See Execution Component | |||||||
| log | See Log Component | |||||||
| cleanup | See Cleanup Component | |||||||
| validator | See Validator Component | |||||||
| skip | See Skip field |
Validator
Validator component enables validation before or after execution of enclosing component's action. If validation failed, the execution of pipeline will immediately fail because of thrown validation exception.
A common use case is to validate a file not exists before action execution and exists after execution when the enclosing component's action involves creating a new file.
A validator component is composed by before validation stage and (or) after validation stage. Each validation stage is then composed by a predicate that evaluates to a boolean and a log field which is a dictionary from True or False to a logging component configuration.
Syntax
-
Full Syntax
VALIDATOR: { VALIDATION_BEFORE_STAGE: { ACTION: ..., LOG: { True: ..., False: ... }, }, VALIDATION_AFTER_STAGE: { ACTION: ..., LOG: { True: ..., False: ... }, }, }
-
Specify only before validation stage
VALIDATOR: { VALIDATION_BEFORE_STAGE: { ACTION: ..., LOG: { True: ..., False: ... }, } }
-
Specify only after validation stage
VALIDATOR: { VALIDATION_AFTER_STAGE: { ACTION: ..., LOG: { True: ..., False: ... }, } }
This can be further simplified to
VALIDATOR: { ACTION: ..., LOG: { True: ..., False: ... } }
| Field Name | Field Value | ||||||
|---|---|---|---|---|---|---|---|
| before |
|
||||||
| after |
|
There are several variants to validation stage configuration:
-
Full syntax
{ ACTION: ..., LOG: { True: ..., False: ... } }
-
Use default log message
{ ACTION: ... }
| Field Name | Field Value |
|---|---|
| True | See Argument Component |
| False | See Argument Component |
Utilities
This section will cover some utilities exposed by streamlined library. All these utilities are put under streamlined.utils package.
Argument Parser/Loader
-
streamlined.utils.ArgumentParseris a utility built on top of argparse to parse command line arguments iteratively. Seeutils/argument_parser.pyfolder for more details. -
streamlined.utils.ArgumentLoaderallows specifying definition for argument parser inside the dataclass definition -- through themetadataproperty of dataclass field.It supports
- creating an argument parser based on defined dataclass fields
- creating an instance from arguments using a provided argument parser
- create an instance from arguments directly (the argument parser is created based off configuration in defined dataclass fields)
@dataclass class DatabaseConfig(ArgumentLoader): username: str = field( metadata={"name": ["-u", "--username"], "help": "supply username", "default": "admin"} ) password: str = field( metadata={"name": ["-p"], "help": "supply password", "dest": "password"} ) database: InitVar[str] = field( metadata={"help": "supply value for database", "choices": ["mysql", "sqlite", "mongodb"]} ) def __post_init__(self, database): pass
After invoking
DatabaseConfig.from_arguments(<args>), an instance of DatabaseConfig will be created with all values loaded based on parsed arguments.
Configuration Parser/Loader
-
streamlined.utils.ConfigurationParseris a derived class of configparser.ConfigParser that provides the additional functionalities:- CLASSMETHOD add a section --
append_section - CLASSMETHOD remove a section --
remove_section - get an configuration option and cast to specified type --
get_with_type
- CLASSMETHOD add a section --
-
streamlined.utils.ConfigurationLoaderallows loading a configuration file into a dataclass. It can be seen as a trait to be derived by desired dataclass:from dataclasses import dataclass from streamlined.utils import ConfigurationLoader @dataclass class FooConfig(ConfigurationLoader): bar: str
After extending
ConfigurationLoader,FooConfigcan invokefrom_config_file(<config_filepath>, <section>)to create an instance of FooConfig with all values loaded according to their annotation types.ConfigurationLoader is able to handle
ClassVarandInitVaras expected.
Concurrency
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamlined-0.3.1.tar.gz.
File metadata
- Download URL: streamlined-0.3.1.tar.gz
- Upload date:
- Size: 55.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
015eeff72f79d2f1be02499e2fdbfe45f27c9301506683762418e0d084459cb5
|
|
| MD5 |
5845a1c517ba209c61f13827522c032c
|
|
| BLAKE2b-256 |
5be0d4861dd6ca30f5e81e64b3db9c9ae1daa1bdcac5363591b02cd402b74dfe
|
File details
Details for the file streamlined-0.3.1-py3-none-any.whl.
File metadata
- Download URL: streamlined-0.3.1-py3-none-any.whl
- Upload date:
- Size: 70.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.10.1 pkginfo/1.8.2 requests/2.27.1 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
597ae53fc155f20dfee9490347e13df7190bc35e12db5255d11b516063087920
|
|
| MD5 |
3d410563d478118f5380506e9e7c9cd6
|
|
| BLAKE2b-256 |
174cee41e2048a8aaea218ec47260a3ab85d0cb9c8658a6b2da3f463bd08fda9
|