Skip to main content

Rest_api

Project description

agave

test codecov PyPI

Agave is a library for building REST APIs using a Blueprint pattern, with support for both AWS Chalice and FastAPI frameworks. It simplifies the creation of JSON-based endpoints for querying, modifying, and creating resources.

Installation

Choose the installation option based on your framework:

Chalice Installation

pip install agave[chalice]

FastAPI Installation

pip install agave[fastapi]

SQS task support (only FastApi based app):

pip install agave[fastapi,tasks]

Models

AsyncDocument for FastAPI

When using FastAPI, models should inherit from mongoengine_plus.aio.AsyncDocument to enable async MongoDB operations:

from mongoengine import StringField, DateTimeField
from mongoengine_plus.aio import AsyncDocument
from mongoengine_plus.models import BaseModel

class Account(BaseModel, AsyncDocument):
    name = StringField(required=True)
    user_id = StringField(required=True)
    # ...other fields
    
    # Use async methods:
    # await account.async_save()
    # await Account.objects.async_get(id=id)
    # await Account.objects.filter(...).async_to_list()

Document for Chalice

For Chalice, use standard MongoEngine Document:

from mongoengine import Document, StringField, DateTimeField

class Account(Document):
    name = StringField(required=True)
    user_id = StringField(required=True)
    # ...other fields
    
    # Use sync methods:
    # account.save()
    # Account.objects.get(id=id)

Usage

Chalice Example

Create a REST API blueprint as follows:

import datetime as dt
from chalice import Response
from agave.chalice import RestApiBlueprint

app = RestApiBlueprint()

# The @app.resource decorator automatically creates these endpoints:
# - GET /accounts             => Query with filters
# - GET /accounts/{id}        => Get account by ID
# 
# Additional endpoints are created only if you define the corresponding methods:
# - POST /accounts            => created if 'create' method is defined
# - PATCH /accounts/{id}      => created if 'update' method is defined
# - DELETE /accounts/{id}     => created if 'delete' method is defined
@app.resource('/accounts')
class Account:
    model = AccountModel
    query_validator = AccountQuery
    update_validator = AccountUpdateRequest
    get_query_filter = generic_query

    # Optional: Define create method to enable POST endpoint
    @staticmethod
    @app.validate(AccountRequest)
    def create(request: AccountRequest) -> Response:
        account = AccountModel(
            name=request.name,
            user_id=app.current_user_id,
            platform_id=app.current_platform_id,
        )
        account.save()
        return Response(account.to_dict(), status_code=201)

    # Optional: Define update method to enable PATCH endpoint
    @staticmethod
    def update(
        account: AccountModel, request: AccountUpdateRequest
    ) -> Response:
        account.name = request.name
        account.save()
        return Response(account.to_dict(), status_code=200)

    # Optional: Define delete method to enable DELETE endpoint
    @staticmethod
    def delete(account: AccountModel) -> Response:
        account.deactivated_at = dt.datetime.utcnow().replace(microsecond=0)
        account.save()
        return Response(account.to_dict(), status_code=200)

FastAPI Example

import datetime as dt
from fastapi import Request
from fastapi.responses import JSONResponse as Response
from agave.fastapi import RestApiBlueprint

app = RestApiBlueprint()

# The @app.resource decorator automatically creates these endpoints:
# - GET /accounts             => Query with filters
# - GET /accounts/{id}        => Get account by ID
# 
# Additional endpoints are created only if you define the corresponding methods:
# - POST /accounts            => created if 'create' method is defined
# - PATCH /accounts/{id}      => created if 'update' method is defined
# - DELETE /accounts/{id}     => created if 'delete' method is defined
@app.resource('/accounts')
class Account:
    model = AccountModel  # AsyncDocument model
    query_validator = AccountQuery
    update_validator = AccountUpdateRequest
    get_query_filter = generic_query
    response_model = AccountResponse  # FastAPI specific

    # Optional: Define create method to enable POST endpoint
    @staticmethod
    async def create(request: AccountRequest) -> Response:
        """This is the description for OpenAPI documentation"""
        account = AccountModel(
            name=request.name,
            user_id=app.current_user_id,
            platform_id=app.current_platform_id,
        )
        await account.async_save()
        return Response(content=account.to_dict(), status_code=201)

    # Optional: Define update method to enable PATCH endpoint
    @staticmethod
    async def update(
        account: AccountModel,
        request: AccountUpdateRequest,
    ) -> Response:
        account.name = request.name
        await account.async_save()
        return Response(content=account.to_dict(), status_code=200)

    # Optional: Define delete method to enable DELETE endpoint
    @staticmethod
    async def delete(account: AccountModel, _: Request) -> Response:
        account.deactivated_at = dt.datetime.utcnow().replace(microsecond=0)
        await account.async_save()
        return Response(content=account.to_dict(), status_code=200)

Async Tasks

Agave's SQS tasks support Pydantic model validation. When you send a JSON message to an SQS queue, the task will automatically parse and convert it to the specified Pydantic model:

from pydantic import BaseModel
from agave.tasks.sqs_tasks import task

# Define Pydantic model for request validation
class UserData(BaseModel):
    name: str
    age: int
    email: str

QUEUE_URL = 'https://sqs.region.amazonaws.com/account/queue'
AWS_DEFAULT_REGION = 'us-east-1'

# Task with Pydantic model type hint
@task(
    queue_url=QUEUE_URL,
    region_name=AWS_DEFAULT_REGION,
    visibility_timeout=30,
    max_retries=10,
)
async def process_user(user: UserData):
    # user is already a validated UserData instance, not a dict
    print(f"Processing user {user.name} with email {user.email}")
    # Your processing logic here
    return {"success": True, "user_name": user.name}

Use RetryTask exception to implement retry logic:

from agave.core.exc import RetryTask
from agave.tasks.sqs_tasks import task

@task(queue_url=QUEUE_URL, region_name=AWS_DEFAULT_REGION, max_retries=3)
async def process_with_retry(data: dict):
    try:
        # Your processing logic
        result = await some_operation(data)
    except TemporaryError:
        # Will retry automatically
        raise RetryTask
    
    return result

Running Tests

Run the tests using the following command:

make test

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agave-1.5.2.tar.gz (35.1 kB view details)

Uploaded Source

File details

Details for the file agave-1.5.2.tar.gz.

File metadata

  • Download URL: agave-1.5.2.tar.gz
  • Upload date:
  • Size: 35.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for agave-1.5.2.tar.gz
Algorithm Hash digest
SHA256 20eda6095c8c746251179afc9dff166e06d796a6c99511b4bd68116340554437
MD5 a1c94619cff6bf463771bf38f405d8fe
BLAKE2b-256 5774f6da1b852b2c19d840ed0e6d6b1762c4be1ce29f5553fce9fb6299fdbde8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page