Skip to main content

HTTP database access.

Project description

Server Usage

import asyncio
import peewee_async
import peewee_asyncext
import peewee
import aiodata.server as aiodata
import aiohttp.web
import functools

# get event loop
loop = asyncio.get_event_loop()

# create an asyncio-compatible peewee database
database = peewee_asyncext.PostgresqlExtDatabase('test')

# used for async query executions
objects = peewee_async.Manager(database)

# build a model
class Users(peewee.Model):

  name = peewee.TextField(null = True)

  tag = peewee.IntegerField(null = True)

  hobby = peewee.TextField(null = True)

  class Meta:

    database = database

    primary_key = peewee.CompositeKey('name', 'tag')

# ensure it exists
Users.create_table()
# ...do other stuff

# will create a function checking the 'Authorization'
# header of each request for this token and validating;
# raises an aiohttp.web.HTTPUnauthorized error upon failure
authorize = aiodata.utils.authorizer('super secret')

# heartbeat
interval = 30

# create the client;
# for no auth, simply pass `lambda request: None`
client = aiodata.Client(authorize, interval)

# utils.fail is a convenience function for error raising with json data;
# this is the error handlers will raise for bad requests, passing an
# arbitrary amount of positional arguments, starting with a code (str)
# representing the error, ex: error('database error', output)
fail = functools.partial(aiodata.utils.fail, aiohttp.web.HTTPBadRequest)

# make the application
app = aiohttp.web.Application()

# will create handlers and then dispatchers for the client;
# also saves information related to the smooth execution of
# websocket connections utilized by the peers - important!
# yields route assets in the form of (method, path, handler)
routes = aiodata.utils.setup(client, objects, Users, authorize, fail)

# iterate through
for route in routes:

  # and add them to our router
  app.router.add_route(*route)

# also add some route for the websocket connection
app.router.add_route('GET', '/state', client.connect)

# disable sync
database.set_allow_sync(False)

# run the app localy
aiohttp.web.run_app(app)

Requests should be in the form of METHOD */model with json [keys, data].

import requests

url = 'http://localhost:8080/users'

keys = ['cat', 40]

data = {'hobby': 'meowing'}

body = [keys, data]

headers = {'Authorization': 'super secret'}

response = requests.post(url, json = body, headers = headers)

# will be an array of dicts as rows with all their columns;
# every successful request returns all the rows affected
entries = response.json()

print(entries)

Client Usage

import asyncio
import aiohttp
import aiodata.client as aiodata

# get event loop
loop = asyncio.get_event_loop()

# create the session
session = aiohttp.ClientSession(loop = loop)

# beep boop
host = 'http://localhost:8080'

# create a function injecting the headers
# with an 'Authorization' key against this token
authorize = aiodata.utils.authorizer('super secret')

# create the client;
# for no auth, simply pass `lambda kwargs: None`
client = aiodata.Client(session, host, authorize)

# listen for rows being updated
@client.track('update', 'users')
async def listener(entries):

  for before, after in entries:

    for key, value in before.items():

      check = after[key]

      if value == check:

        continue

      print('changed', key, 'from', value, 'to', check)

# demonstration
async def execute():

  # will connect to websocket,
  # receive info, build, fill cache,
  # listen for database interactions
  await client.start('/state')

  # cache can be poked like so;
  # entries are simple objects,
  # tables are overwritten dicts
  entry = client.cache.users.get('cat', 40)

  # see before
  print(entry.hobby)

  # change the tag (primary) and hobby
  # make a request, patching all rows with 'cat' as name;
  # the positional nature of the keys implies entries
  # can't be bulk edited by leading primary keys alone
  entries = await client.update('cat', tag = 80, hobby = 'barking')

  print(entries)

  await client.close()

  await session.close()

coroutine = execute()

loop.run_until_complete(coroutine)

Installing

python3 -m pip install aiodata

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiodata-1.4.6.tar.gz (11.3 kB view hashes)

Uploaded Source

Built Distribution

aiodata-1.4.6-py3-none-any.whl (14.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page