Skip to main content

A python package to query data via amazon athena and bring it into a pandas df using aws-wrangler.

Project description

pydbtools

A package that is used to run SQL queries speficially configured for the Analytical Platform. This packages uses AWS Wrangler's Athena module but adds additional functionality (like Jinja templating, creating temporary tables) and alters some configuration to our specification.

Installation

Requires a pip release above 20.

## To install from pypi
pip install pydbtools

## Or install from git with a specific release
pip install "pydbtools @ git+https://github.com/moj-analytical-services/pydbtools@v4.0.1"

Quickstart guide

The examples directory contains more detailed notebooks demonstrating the use of this library, many of which are borrowed from the mojap-aws-tools-demo repo.

Read an SQL Athena query into a pandas dataframe

import pydbtools as pydb
df = pydb.read_sql_query("SELECT * from a_database.table LIMIT 10")

Run a query in Athena

response = pydb.start_query_execution_and_wait("CREATE DATABASE IF NOT EXISTS my_test_database")

Create a temporary table to do further separate SQL queries on later

pydb.create_temp_table("SELECT a_col, count(*) as n FROM a_database.table GROUP BY a_col", table_name="temp_table_1")
df = pydb.read_sql_query("SELECT * FROM __temp__.temp_table_1 WHERE n < 10")

pydb.dataframe_to_temp_table(my_dataframe, "my_table")
df = pydb.read_sql_query("select * from __temp__.my_table where year = 2022")

Introduction

This package is a wrapper for awswrangler that which presets/defines some of the input parameters to the athena module functions to align with our platform setup. See the awswrangler API reference documentation for Athena to see what functions you can call from pydbtools.

The function parameters that are locked down / altered by pydbtools are:

  • boto3_session: This is auto generated by pydbtools (in order to grab the user credentials from the sts client - this is needed for the R version of this package which calls this package under the hood. In short forcing refreshed credentials are needed in R as boto3 credentials timeout and do not refresh when using reticulate, though this does not apply to the latest version of the platform currently being rolled out.)
  • s3_output: The S3 path where database queries are written to. This is defined by pydbtools based on the IAM user/role calling the query (ensures that each role can only read/write to a S3 path only they can access).
  • database: Will either be set to None or __temp__ depending on other user parameters (if ctas_approach=True). __temp__ is an alias to an autogenerated temp database name which is generated from pydbtools again based on the IAM user/role. References to this temporary database can be referenced by the keyword __temp__ in SQL queries see additional functionality to awswrangler section.
  • sql: We allows reference to the database name __temp__ which is an alias to a user specific temporary database. When a function call has an SQL parameter the SQL is checked with an SQL parser and then any reference to __temp__ as a database is replaced with the actual database name which is autogenerated. This replacement only occurs for SELECT queries.
  • pyarrow_additional_kwargs: This is set to {"coerce_int96_timestamp_unit": "ms", "timestamp_as_object": True} by default. Doing this solves this awswrangler issue)

Additional Functionality

As well as acting as a wrapper function for awswrangler this package also allows you to do the following:

Run query and wait for a response

This function essentially calls two functions from awswrangler.athena. First start_query_execution followed by wait_query.

import pydbtools as pydb

response = pydb.start_query_execution_and_wait("SELECT * from a_database.table LIMIT 10")

Create Temporary Tables

You can use the create_temp_table function to write SQL to create a store a temporary table that sits in your __temp__ database.

import pydbtools as pydb

pydb.create_temp_table("SELECT * from a_database.table LIMIT 10", table_name="temp_table_1")
df = pydb.read_sql_query("SELECT * from __temp__.temp_table_1")
df.head()

See the example notebook for a more detailed example.

Create databases and tables

import pydbtools as pydb
import pandas as pd

pydb.create_database("my_db")
pydb.file_to_table(
    "local_file_path/data.csv", 
    database="my_db",
    table="my_table",
    location="s3://my_s3_location/my_table"
)
pydb.dataframe_to_table(
    my_dataframe, 
    database="my_db",
    table="my_other_table",
    location="s3://my_s3_location/my_other_table"
)
pydb.create_table(
    "select * from my_db.my_other_table where month = 'March'",
    database="my_db",
    table="my_march_table",
    location="s3://my_s3_location/my_other_table"
)

See the notebook on MoJAP tools for more details.

Run SQL from a string of statements or a file

It wil often be more convenient to write your SQL in an editor with language support rather than as a Python string. You can create temporary tables within SQL using the syntax below.

import pydbtools as pydb

sql = """
create temp table A as (
    select * from database.table1
    where year = 2021
);

create temp table B as (
    select * from database.table2
    where amount > 10
);

select * from __temp__.A
left join __temp__.B
on A.id = B.id;
"""

with open("queries.sql", "w") as f:
    f.write(sql)
    
with open("queries.sql", "r") as f:
    df = pydb.read_sql_queries(f.read())

Multiple SELECT queries can be returned as a generator of dataframes using read_sql_queries_gen.

See the notebook on creating temporary tables with SQL and the notebook on database administration with SQL for more detailed examples.

Additionally you can use Jinja templating to inject arguments into your SQL.

sql_template = """
SELECT *
FROM {{ db_name }}.{{ table }}
"""
sql = pydb.render_sql_template(sql_template, {"db_name": db_name, "table": "department"})
pydb.read_sql_query(sql)

with open("tempfile.sql", "w") as f:
    f.write("SELECT * FROM {{ db_name }}.{{ table_name }}")
sql = pydb.get_sql_from_file("tempfile.sql", jinja_args={"db_name": db_name, "table_name": "department"})
pydb.read_sql_query(sql)
"""

See the notebook on SQL templating for more details.

Delete databases, tables and partitions together with the data on S3

import pydbtools as pydb

pydb.delete_partitions_and_data(database='my_database', table='my_table', expression='year = 2020 or year = 2021')
pydb.delete_table_and_data(database='my_database', table='my_table')
pydb.delete_database('my_database')

# These can be used for temporary databases and tables.
pydb.delete_table_and_data(database='__temp__', table='my_temp_table')

For more details see the notebook on deletions.

Usage / Examples

Simple

import pydbtools as pydb

# Run a query using pydbtools
response = pydb.start_query_execution_and_wait("CREATE DATABASE IF NOT EXISTS my_test_database")

# Read data from an athena query directly into pandas
pydb.read_sql("SELECT * from a_database.table LIMIT 10")

# Create a temp table to do further seperate SQL queries later on
pydb.create_temp_table("SELECT a_col, count(*) as n FROM a_database.table GROUP BY a_col", table_name="temp_table_1")
df = pydb.read_sql_query("SELECT * FROM __temp__.temp_table_1 WHERE n < 10")

More advanced usage

Get the actual name for your temp database, create your temp db then delete it using awswrangler (note: awswrangler will raise an error if the database does not exist)

import awswrangler as wr
import pydbtools as pydb

user_id, out_path = pydb.get_user_id_and_table_dir()
temp_db_name = pydb.get_database_name_from_userid(user_id)
print(temp_db_name)
pydb.create_temp_table()
print(wr.catalog.delete_database(name=temp_db_name))

DEPRECATED

Functions

The functions:

  • pydbtools.get_athena_query_response
  • pydbtools.read_sql

Are now deprecated and calls to these functions will raise an warning. They have been replaced by pydbtools.start_query_execution_and_wait and pydbtools.read_sql_query.

Notes:

  • Amazon Athena using a flavour of SQL called presto docs can be found here
  • To query a date column in Athena you need to specify that your value is a date e.g. SELECT * FROM db.table WHERE date_col > date '2018-12-31'
  • To query a datetime or timestamp column in Athena you need to specify that your value is a timestamp e.g. SELECT * FROM db.table WHERE datetime_col > timestamp '2018-12-31 23:59:59'
  • Note dates and datetimes formatting used above. See more specifics around date and datetimes here
  • To specify a string in the sql query always use '' not "". Using ""'s means that you are referencing a database, table or col, etc.

See changelog for release changes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydbtools-5.5.2.tar.gz (16.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydbtools-5.5.2-py3-none-any.whl (13.9 kB view details)

Uploaded Python 3

File details

Details for the file pydbtools-5.5.2.tar.gz.

File metadata

  • Download URL: pydbtools-5.5.2.tar.gz
  • Upload date:
  • Size: 16.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.3.2 CPython/3.9.16 Linux/5.15.0-1033-azure

File hashes

Hashes for pydbtools-5.5.2.tar.gz
Algorithm Hash digest
SHA256 7329bfb3ddb3185eff4cbfa2bfd600b9fd3f3341b90000d444dd88a4e34955a7
MD5 722b96b372f36fdfce8e8607c20f16b0
BLAKE2b-256 2a405dec6471ddc3dbc72659c3cf979ab54e92bbbee83b2da466c78f31d9de7f

See more details on using hashes here.

File details

Details for the file pydbtools-5.5.2-py3-none-any.whl.

File metadata

  • Download URL: pydbtools-5.5.2-py3-none-any.whl
  • Upload date:
  • Size: 13.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.3.2 CPython/3.9.16 Linux/5.15.0-1033-azure

File hashes

Hashes for pydbtools-5.5.2-py3-none-any.whl
Algorithm Hash digest
SHA256 102387b0545ceb9abd381af140e3c32941d3b8c9968ed2183f42a13e789f84ff
MD5 1a8d4ee6d1de88ea889bcd6b8917590c
BLAKE2b-256 aa6b16a84bd858f98a7dc628e543297462a7d2280cb8a61c469c21db2e41b0ff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page