Skip to main content

Arrow -> PostgreSQL encoder

Project description

pgpq

Convert PyArrow RecordBatches to Postgres' native binary format.

Usage

Copying a dataset to PostgreSQL using psycopg

"""Example for README.md"""
from tempfile import mkdtemp
import psycopg
import pyarrow.dataset as ds
import requests
from pgpq import ArrowToPostgresBinaryEncoder

# let's get some example data
tmpdir = mkdtemp()
with open(f"{tmpdir}/yellow_tripdata_2023-01.parquet", mode="wb") as f:
    resp = requests.get(
        "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
    )
    resp.raise_for_status()
    f.write(resp.content)

# load an arrow dataset
# arrow can load datasets from partitioned parquet files locally or in S3/GCS
# it handles buffering, matching globs, etc.
dataset = ds.dataset(tmpdir)

# create an encoder object which will do the encoding
# and give us the expected Postgres table schema
encoder = ArrowToPostgresBinaryEncoder(dataset.schema)
# get the expected Postgres destination schema
# note that this is _not_ the same as the incoming arrow schema
# and not necessarily the schema of your permanent table
# instead it's the schema of the data that will be sent over the wire
# which for example does not have timezones on any timestamps
pg_schema = encoder.schema()
# assemble ddl for a temporary table
# it's often a good idea to bulk load into a temp table to:
# (1) Avoid indexes
# (2) Stay in-memory as long as possible
# (3) Be more flexible with types
#     (you can't load a SMALLINT into a BIGINT column without casting)
cols = [f'"{col_name}" {col.data_type.ddl()}' for col_name, col in pg_schema.columns]
ddl = f"CREATE TEMP TABLE data ({','.join(cols)})"

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)  # type: ignore
        with cursor.copy("COPY data FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            for batch in dataset.to_batches():
                copy.write(encoder.write_batch(batch))
            copy.write(encoder.finish())
        # load into your actual table, possibly doing type casts
        # cursor.execute("INSERT INTO \"table\" SELECT * FROM data")

Defining field encoders

"""Showcase defining encoders for fields."""
import pgpq
import psycopg
import pyarrow as pa
from pgpq import encoders
from pgpq import schema


data = [
    pa.array([1, 2, 3, 4]),
    pa.array(['{"age": 33, "name": "alice"}', '{"age": 24, "name": "bob"}', "{}", "null"]),
]
arrow_schema = pa.schema([("id", pa.int64()), ("properties", pa.string())])
record_batch = pa.RecordBatch.from_arrays(data, schema=arrow_schema)

encoder = pgpq.ArrowToPostgresBinaryEncoder(record_batch.schema)
pg_schema_with_text_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_with_text_properties.columns
] == [("id", "INT8"), ("properties", "TEXT")]

# To support a different PostgreSQL schema, we change the default encoders generated by pgpq:
# * 'id' encoded as INT8 (BIGINT).
# * 'properties' encoded as JSONB.
field_encoders = {
    "id": encoders.Int64EncoderBuilder(pa.field("id", pa.int64())),
    "properties": encoders.StringEncoderBuilder.new_with_output(
        pa.field("properties", pa.string()), schema.Jsonb()
    ),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_with_jsonb_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_with_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]

ddl = """
CREATE TABLE id_properties (
    id INT8, -- Alternative: BIGINT
    properties JSONB
)
"""

# Without the right encoding, PostgreSQL will report errors in the binary data format when
# executing the following COPY: It expects properties to be encoded as JSONB not TEXT.
with psycopg.connect("postgres://posthog:posthog@localhost:5432/posthog") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)

        with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(record_batch))
            copy.write(encoder.finish())

# The 'id' field matches our schema, so we can use the default encoder for it.
# But, we still need to encode properties as JSONB.
# `infer_encoder` can be used to obtain the default encoder for a field.
field_encoders = {
    "id": pgpq.ArrowToPostgresBinaryEncoder.infer_encoder(record_batch.field("id")),
    "properties": encoders.StringEncoderBuilder.new_with_output(
        pa.field("properties", pa.string()), schema.Jsonb()
    ),
}
encoder = pgpq.ArrowToPostgresBinaryEncoder.new_with_encoders(record_batch.schema, field_encoders)
pg_schema_inferred_id_and_jsonb_properties = encoder.schema()

assert [
    (col_name, col.data_type.ddl())
    for col_name, col in pg_schema_inferred_id_and_jsonb_properties.columns
] == [("id", "INT8"), ("properties", "JSONB")]

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:

        with cursor.copy("COPY id_properties FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            copy.write(encoder.write_batch(record_batch))
            copy.write(encoder.finish())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgpq-0.11.0.tar.gz (64.5 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

pgpq-0.11.0-cp39-abi3-win_amd64.whl (613.4 kB view details)

Uploaded CPython 3.9+Windows x86-64

pgpq-0.11.0-cp39-abi3-win32.whl (549.2 kB view details)

Uploaded CPython 3.9+Windows x86

pgpq-0.11.0-cp39-abi3-musllinux_1_2_x86_64.whl (906.9 kB view details)

Uploaded CPython 3.9+musllinux: musl 1.2+ x86-64

pgpq-0.11.0-cp39-abi3-musllinux_1_2_i686.whl (951.8 kB view details)

Uploaded CPython 3.9+musllinux: musl 1.2+ i686

pgpq-0.11.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (733.2 kB view details)

Uploaded CPython 3.9+manylinux: glibc 2.17+ x86-64

pgpq-0.11.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl (766.3 kB view details)

Uploaded CPython 3.9+manylinux: glibc 2.5+ i686

pgpq-0.11.0-cp39-abi3-macosx_10_12_x86_64.whl (672.7 kB view details)

Uploaded CPython 3.9+macOS 10.12+ x86-64

pgpq-0.11.0-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl (1.3 MB view details)

Uploaded CPython 3.9+macOS 10.12+ universal2 (ARM64, x86-64)macOS 10.12+ x86-64macOS 11.0+ ARM64

File details

Details for the file pgpq-0.11.0.tar.gz.

File metadata

  • Download URL: pgpq-0.11.0.tar.gz
  • Upload date:
  • Size: 64.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.0.tar.gz
Algorithm Hash digest
SHA256 c0f059a668ff24fcba266fabc1c29eb4018cea89721b772e50af8a2e6f22fb06
MD5 b14c24e95707d959219fd2886f519204
BLAKE2b-256 1aaffe21053eb4879ac310676f0df93f4c54d27bfa4682a91ab34f677511a59e

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-win_amd64.whl.

File metadata

  • Download URL: pgpq-0.11.0-cp39-abi3-win_amd64.whl
  • Upload date:
  • Size: 613.4 kB
  • Tags: CPython 3.9+, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-win_amd64.whl
Algorithm Hash digest
SHA256 58f80a497ba2c14ca4543558f26ccc6979dda9b399fc4a8026fe2b540e608996
MD5 0298122d024ea773e0e7b6331fd8dfe8
BLAKE2b-256 a4f405a7632e0c87e2ef3757f5ccf27bd511a566746d7a4ca7e8434845022b3f

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-win32.whl.

File metadata

  • Download URL: pgpq-0.11.0-cp39-abi3-win32.whl
  • Upload date:
  • Size: 549.2 kB
  • Tags: CPython 3.9+, Windows x86
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-win32.whl
Algorithm Hash digest
SHA256 fde96b45615867a6d9fdcbf5d682f9546924a5b2805e1ef5b174fc7611a4029f
MD5 2d2e287dcec2b7859169675ceda29f75
BLAKE2b-256 59efd0c73405b0569577c736da0c3fcc0ccf986d4957eea4f72710c921e832b7

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-musllinux_1_2_x86_64.whl.

File metadata

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-musllinux_1_2_x86_64.whl
Algorithm Hash digest
SHA256 5de5c7368b83a289e1dd77678054dbf8c6744c5bf81dfbcce804b9bbac2cb651
MD5 30f799765c5a1c26f8893e1f1dbd4c3c
BLAKE2b-256 437b0ca36c6e60700c303a7b15ecb61e9dc268fe3d7b7268b8f91789a536dbb5

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-musllinux_1_2_i686.whl.

File metadata

  • Download URL: pgpq-0.11.0-cp39-abi3-musllinux_1_2_i686.whl
  • Upload date:
  • Size: 951.8 kB
  • Tags: CPython 3.9+, musllinux: musl 1.2+ i686
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-musllinux_1_2_i686.whl
Algorithm Hash digest
SHA256 e1cdbd752fadb74d3c05cca95d949fa3fb1627dbbe6e1ffc2aa44b9bde7d7cb1
MD5 93ae06054bc50421624d0c8d3711a74f
BLAKE2b-256 898decf97b297c5c4f7f2f81874d0b4fb2f5eb9d621f4425524dd88770a803b5

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 848680d83dd41ff6fbfda3a822b694988f8c994923150a511cd553cd7b8b3bea
MD5 d805b7bf2b0484cb10bac045c03692bb
BLAKE2b-256 6628aff26645d6fdb8f1228629238a29673dffd639772a116720af7f4f7bdeab

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl.

File metadata

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl
Algorithm Hash digest
SHA256 65cc67b71d93f901cb603df665009ed714efff8eabe87d934d7f1b7d4af010d1
MD5 c5a0aacded0470c95895af7a9be45c9d
BLAKE2b-256 90d875a31443af9d27c0663066073ddedb07327200024facbc4e7fd2a3e97172

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 05be4342c6d3fbe510dabe347c35c4d670342680d3df048a851a420be8c09945
MD5 e8e7f71f3673bcbcc182a09485f99612
BLAKE2b-256 86997e6e51049709fe4ebe7489c617c3c30a7241895ac49e85f25a9d8c7d55e9

See more details on using hashes here.

File details

Details for the file pgpq-0.11.0-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl.

File metadata

File hashes

Hashes for pgpq-0.11.0-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl
Algorithm Hash digest
SHA256 6f3e93bd2c7d90414b9721ddad2791a0b8b1c896a0befc43495e4ff5d4b04242
MD5 6f28202aa1d946793aacbf907013d81f
BLAKE2b-256 7804b7702bfcc5683cbecfe44d61f78329d6eecee459b4b349d7bb2ffa384ff8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page