Mock a datalake easily to be able to test your pyspark data application
Project description
pyspark-data-mocker
pyspark-data-mocker
is a testing tool that facilitates the burden of setting up a desired datalake, so you can test
easily the behavior of your data application. It configures also the spark session to optimize it for testing
purpose.
Install
pip install pyspark-data-mocker
Usage
pyspark-data-mocker
searches the directory you provide in order to seek and load files that can be interpreted as
tables, storing them inside the datalake. That datalake will contain certain databases depending on the folders
inside the root directory. For example, let's take a look into the basic_datalake
$ tree tests/data/basic_datalake -n --charset=ascii # byexample: +rm=~
tests/data/basic_datalake
|-- bar
| |-- courses.csv
| `-- students.csv
`-- foo
`-- exams.csv
~
2 directories, 3 files
This file hierarchy will be respected in the further datalake when loaded: each sub-folder will be considered as spark database, and each file will be loaded as table, using the filename to name the table.
How can we load them using pyspark-data-mocker
? Really simple!
>>> from pyspark_data_mocker import DataLakeBuilder
>>> builder = DataLakeBuilder.load_from_dir("./tests/data/basic_datalake") # byexample: +timeout=20 +pass
And that's it! you will now have in that execution context a datalake with the structure defined in the folder
basic_datalake
. Let's take a closer look by running some queries.
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> spark.sql("SHOW DATABASES").show()
+---------+
|namespace|
+---------+
| bar|
| default|
| foo|
+---------+
We have the default
database (which came for free when instantiating spark), and the two folders inside
tests/data/basic_datalake
: bar
and foo
.
>>> spark.sql("SHOW TABLES IN bar").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| bar| courses| false|
| bar| students| false|
+---------+---------+-----------+
>>> spark.sql("SELECT * FROM bar.courses").show()
+---+------------+
| id| course_name|
+---+------------+
| 1|Algorithms 1|
| 2|Algorithms 2|
| 3| Calculus 1|
+---+------------+
>>> spark.table("bar.students").show()
+---+----------+---------+--------------------+------+
| id|first_name|last_name| email|gender|
+---+----------+---------+--------------------+------+
| 1| Shirleen| Dunford|sdunford0@amazona...|Female|
| 2| Niko| Puckrin|npuckrin1@shinyst...| Male|
| 3| Sergei| Barukh|sbarukh2@bizjourn...| Male|
| 4| Sal| Maidens|smaidens3@senate.gov| Male|
| 5| Cooper|MacGuffie| cmacguffie4@ibm.com| Male|
+---+----------+---------+--------------------+------+
Note how it is already filled with the data each CSV file has! The tool supports all kind of files: csv
, parquet
,
csv
, json
. The application will infer which format to use by looking the file extension.
>>> spark.sql("SHOW TABLES IN foo").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| foo| exams| false|
+---------+---------+-----------+
>>> spark.table("foo.exams").show()
+---+----------+---------+----------+----+
| id|student_id|course_id| date|note|
+---+----------+---------+----------+----+
| 1| 1| 1|2022-05-01| 9|
| 2| 2| 1|2022-05-08| 7|
| 3| 3| 1|2022-06-17| 4|
| 4| 1| 3|2023-05-12| 9|
| 5| 2| 3|2023-05-12| 10|
| 6| 3| 3|2022-12-07| 7|
| 7| 4| 3|2022-12-07| 4|
| 8| 5| 3|2022-12-07| 2|
| 9| 1| 2|2023-05-01| 5|
| 10| 2| 2|2023-05-07| 8|
+---+----------+---------+----------+----+
Cleanup
You can easily clean the datalake by using the cleanup
function
>>> builder.cleanup()
>>> spark.sql("SHOW DATABASES").show()
+---------+
|namespace|
+---------+
| default|
+---------+
Configuration
pyspark-data-mocker
has a default spark configuration that optimize tests executions.
>>> spark_conf = spark.conf
>>> spark_conf.get("spark.app.name")
'test'
>>> spark_conf.get("spark.master")
'local[1]'
>>> spark_conf.get("spark.sql.warehouse.dir")
'/tmp/tmp<...>/spark_warehouse'
>>> spark_conf.get("spark.sql.shuffle.partitions")
'1'
>>> spark_conf.get("spark.ui.showConsoleProgress")
'false'
>>> spark_conf.get("spark.ui.enabled")
'false'
>>> spark_conf.get("spark.ui.dagGraph.retainedRootRDDs")
'1'
>>> spark_conf.get("spark.ui.retainedJobs")
'1'
>>> spark_conf.get("spark.ui.retainedStages")
'1'
>>> spark_conf.get("spark.ui.retainedTasks")
'1'
>>> spark_conf.get("spark.sql.ui.retainedExecutions")
'1'
>>> spark_conf.get("spark.worker.ui.retainedExecutors")
'1'
>>> spark_conf.get("spark.worker.ui.retainedDrivers")
'1'
>>> spark_conf.get("spark.sql.catalogImplementation")
'in-memory'
To better understand what these configuration means and why it is configured like this, you can take a look on Sergey Ivanychev's excellent research on "Faster PySpark Unit Test"
Some of these configurations can be overridden by providing a config yaml file. For example
$ cat /tmp/custom_config.yaml
app_name: test_complete
number_of_cores: 4
enable_hive: True
warehouse_dir: /tmp/full_delta_lake
delta_configuration:
scala_version: '2.12'
delta_version: '2.0.2'
snapshot_partitions: 2
log_cache_size: 3
Let's digest each value and what it controls:
config name | type | description | default value |
---|---|---|---|
number_of_cores |
INTEGER | change the amount of CPU cores The spark session will use | 1 |
enable_hive |
BOOL | Enables the usage of Apache Hive's catalog | false |
warehouse_dir |
STRING | If set, it will create a persistent directory where the wharehouse will live. By default pyspark_data_mocker uses a TemporaryDirectory that will exists as long the builder instance exists |
tempfile.TemporaryDirectory() |
delta_configuration |
DELTA_CONFIG | If set, it will enable Delta Lake framework | None |
Among the things you can change when enabling Delta capabilities are:
config name | type | description |
---|---|---|
scala_version |
STRING | Version of Scala that the spark session will use. Thake into consideration that the scala version MUST be compatible with the Delta-core version used |
delta_version |
STRING | Version of delta core used. The version used highly depends on the pyspark version |
snapshot_partitions |
INTEGER | Tells delta how should the partitions be done |
log_cache_size |
INTEGER | Limits the Delta log cache |
For the delta configuration, take into consideration that ALL VALUES should be explicitly set-up, there is no default value for each one of them.
To use a custom configuration, you can pass a string
or pathlib.Path
optional argument to load_from_dir
.
>>> builder = DataLakeBuilder.load_from_dir("./tests/data/basic_datalake", "/tmp/custom_config.yaml") # byexample: +timeout=20
<...>
>>> spark_conf = SparkSession.builder.getOrCreate().conf
>>> spark_conf.get("spark.app.name")
'test_complete'
>>> spark_conf.get("spark.master")
'local[4]'
>>> spark_conf.get("spark.sql.warehouse.dir")
'/tmp/full_delta_lake/spark_warehouse'
>>> spark_conf.get("spark.jars.packages")
'io.delta:delta-core_2.12:2.0.2'
>>> spark_conf.get("spark.sql.extensions")
'io.delta.sql.DeltaSparkSessionExtension'
>>> spark_conf.get("spark.databricks.delta.snapshotPartitions")
'2'
>>> spark_conf.get("spark.sql.catalog.spark_catalog")
'org.apache.spark.sql.delta.catalog.DeltaCatalog'
>>> spark_conf.get("spark.sql.catalogImplementation")
'hive'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pyspark_data_mocker-0.2.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7cb12fe022d4ea36d042fc382255ffa59b2b55504bc06418ec027667aff33348 |
|
MD5 | 11587b84f80f69533efa435cb3128b59 |
|
BLAKE2b-256 | 8d729c39fb80fe47b7115223e3aef40390b589fd50299270c7a9d4c67af447bb |
Hashes for pyspark_data_mocker-0.2.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 147afddd57d95fc5e619664bd5312f68b7dcb217c110c0b8feeb5af52b63ec89 |
|
MD5 | 1b1ed9bac34904f493967f63cda467a8 |
|
BLAKE2b-256 | 5aaba3b8001095a597842cd9e06e98b5470dbcaf0e79473ccae2e86900ad3790 |