Skip to main content

Build and run queries against data

Project description

DataFusion in Python

Python test Python Release Build

This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.

Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.

It also allows you to use UDFs and UDAFs for complex operations.

The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.

Its query engine, DataFusion, is written in Rust, which makes strong assumptions about thread safety and lack of memory leaks.

Technically, zero-copy is achieved via the c data interface.

How to use it

Simple usage:

import datafusion
from datafusion import col
import pyarrow

# create a context
ctx = datafusion.SessionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
    names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

# create a new statement
df = df.select(
    col("a") + col("b"),
    col("a") - col("b"),
)

# execute and collect the first (and only) batch
result = df.collect()[0]

assert result.column(0) == pyarrow.array([5, 7, 9])
assert result.column(1) == pyarrow.array([-3, -3, -3])

UDFs

import pyarrow
from datafusion import udf

def is_null(array: pyarrow.Array) -> pyarrow.Array:
    return array.is_null()

is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')

# create a context
ctx = datafusion.SessionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
    names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

df = df.select(is_null_arr(col("a")))

result = df.collect()[0]

assert result.column(0) == pyarrow.array([False] * 3)

UDAF

import pyarrow
import pyarrow.compute
import datafusion
from datafusion import udaf, Accumulator
from datafusion import col


class MyAccumulator(Accumulator):
    """
    Interface of a user-defined accumulation.
    """
    def __init__(self):
        self._sum = pyarrow.scalar(0.0)

    def update(self, values: pyarrow.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())

    def merge(self, states: pyarrow.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())

    def state(self) -> pyarrow.Array:
        return pyarrow.array([self._sum.as_py()])

    def evaluate(self) -> pyarrow.Scalar:
        return self._sum

# create a context
ctx = datafusion.SessionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
    names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')

df = df.aggregate(
    [],
    [my_udaf(col("a"))]
)

result = df.collect()[0]

assert result.column(0) == pyarrow.array([6.0])

How to install (from pip)

pip install datafusion
# or
python -m pip install datafusion

You can verify the installation by running:

>>> import datafusion
>>> datafusion.__version__
'0.6.0'

How to develop

This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin.

Bootstrap:

# fetch this repo
git clone git@github.com:apache/arrow-datafusion-python.git
# prepare development environment (used to build wheel / install in development)
python3 -m venv venv
# activate the venv
source venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies (for Python 3.8+)
python -m pip install -r requirements-310.txt

The tests rely on test data in git submodules.

git submodule init
git submodule update

Whenever rust code changes (your changes or via git pull):

# make sure you activate the venv using "source venv/bin/activate" first
maturin develop
python -m pytest

How to update dependencies

To change test dependencies, change the requirements.in and run

# install pip-tools (this can be done only once), also consider running in venv
python -m pip install pip-tools
python -m piptools compile --generate-hashes -o requirements-310.txt

To update dependencies, run with -U

python -m piptools compile -U --generate-hashes -o requirements-310.txt

More details here

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datafusion-0.7.0.tar.gz (150.6 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

datafusion-0.7.0-cp37-abi3-win_amd64.whl (10.5 MB view details)

Uploaded CPython 3.7+Windows x86-64

datafusion-0.7.0-cp37-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (12.5 MB view details)

Uploaded CPython 3.7+manylinux: glibc 2.12+ x86-64

datafusion-0.7.0-cp37-abi3-macosx_10_7_x86_64.whl (9.9 MB view details)

Uploaded CPython 3.7+macOS 10.7+ x86-64

File details

Details for the file datafusion-0.7.0.tar.gz.

File metadata

  • Download URL: datafusion-0.7.0.tar.gz
  • Upload date:
  • Size: 150.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.8.13

File hashes

Hashes for datafusion-0.7.0.tar.gz
Algorithm Hash digest
SHA256 5d85d9328acfb3651eec4dfc0c049de2b9b1bd7d30971f00eac0a901b6148782
MD5 4567f1ef093cdbb7c29dce7fddcb93aa
BLAKE2b-256 a44f5c588562ec6ab1651659ff35e34c197a7c1eaa7663360f2ea9d7d777547d

See more details on using hashes here.

File details

Details for the file datafusion-0.7.0-cp37-abi3-win_amd64.whl.

File metadata

  • Download URL: datafusion-0.7.0-cp37-abi3-win_amd64.whl
  • Upload date:
  • Size: 10.5 MB
  • Tags: CPython 3.7+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.8.13

File hashes

Hashes for datafusion-0.7.0-cp37-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 5aa2a14bbcc3c40cc42b6c1da30c002a6baa52209fbfe14836a3e9d4a623ce20
MD5 e770cd0f9d79a8f7b23066ac3bfa687a
BLAKE2b-256 86945954d1f5a2c5adbe37d828b0c77d7980bc93ba679497ee115f0d17f7bf3b

See more details on using hashes here.

File details

Details for the file datafusion-0.7.0-cp37-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.

File metadata

File hashes

Hashes for datafusion-0.7.0-cp37-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.whl
Algorithm Hash digest
SHA256 55f1ac40004d2e20ca903946c2f138e39ea71e08aedf7554958875e23b7895d3
MD5 dbf049206e2e22c65695eb19935c0921
BLAKE2b-256 c9bdc22701c4782887b23561688e818bed14d10722b070ec8865f6353ac558f4

See more details on using hashes here.

File details

Details for the file datafusion-0.7.0-cp37-abi3-macosx_10_7_x86_64.whl.

File metadata

File hashes

Hashes for datafusion-0.7.0-cp37-abi3-macosx_10_7_x86_64.whl
Algorithm Hash digest
SHA256 d717d965f3f632615694e2e0211ede41a6252f1715bb9ba9e656a9392a2b5eda
MD5 d1352e0bb20d054689e4b3065621be98
BLAKE2b-256 16562aa1e1d427e6fe03c737de08040a7f46b1c9e738971de31593f73b563629

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page