Async Support

AsyncSQLPyHelper provides an async-native API for use with FastAPI, Starlette, and any asyncio-based application.

Drivers used

Database

Async driver

SQLite

aiosqlite

PostgreSQL

asyncpg

MySQL

aiomysql

SQL Server

aioodbc

Oracle

python-oracledb (async mode)

Installation

Install with async extras:

pip install sqlpyhelper[async-postgres]   # PostgreSQL
pip install sqlpyhelper[async-mysql]      # MySQL
pip install sqlpyhelper[async-sqlite]     # SQLite
pip install sqlpyhelper[async-all]        # All async drivers

Basic usage

import asyncio
from sqlpyhelper.async_helper import AsyncSQLPyHelper

async def main():
    async with AsyncSQLPyHelper(db_type="sqlite", database="my.db") as db:
        await db.execute(
            "CREATE TABLE IF NOT EXISTS users (id INTEGER, name TEXT)"
        )
        await db.execute(
            "INSERT INTO users VALUES ($1, $2)", 1, "Alice"
        )
        rows = await db.fetch_all("SELECT * FROM users")
        print(rows)

asyncio.run(main())

Placeholders

All queries use $1, $2, ... style placeholders regardless of database. AsyncSQLPyHelper translates them automatically to the correct style for each driver:

Database

Placeholder style

PostgreSQL

$1, $2 (native)

SQLite

?

MySQL / SQL Server

%s

Oracle

:1, :2

# Write once, works on all databases
await db.execute(
    "INSERT INTO users (id, name) VALUES ($1, $2)",
    1, "Alice"
)

FastAPI integration

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlpyhelper.async_helper import AsyncSQLPyHelper

db = AsyncSQLPyHelper(
    db_type="postgres",
    host="localhost",
    user="user",
    password="pass",
    database="mydb",
)

@asynccontextmanager
async def lifespan(app: FastAPI):
    await db.connect()
    yield
    await db.close()

app = FastAPI(lifespan=lifespan)

@app.get("/users")
async def get_users():
    return await db.fetch_all("SELECT * FROM users")

@app.post("/users")
async def create_user(name: str):
    await db.execute(
        "INSERT INTO users (name) VALUES ($1)", name
    )
    return {"status": "created"}

Connection pooling

For high-traffic applications, use setup_pool() instead of a single connection. Supported for PostgreSQL and MySQL:

from sqlpyhelper.async_helper import AsyncSQLPyHelper

db = AsyncSQLPyHelper(
    db_type="postgres",
    host="localhost",
    user="user",
    password="pass",
    database="mydb",
)

async def startup():
    await db.setup_pool(min_size=2, max_size=20)

async def shutdown():
    await db.close_pool()

Transactions

async with AsyncSQLPyHelper(db_type="sqlite", database="my.db") as db:
    await db.begin_transaction()
    try:
        await db.execute("INSERT INTO users VALUES ($1, $2)", 1, "Alice")
        await db.execute("INSERT INTO orders VALUES ($1, $2)", 1, "Laptop")
        await db.commit_transaction()
    except Exception:
        await db.rollback_transaction()
        raise

Bulk inserts

async with AsyncSQLPyHelper(db_type="postgres", ...) as db:
    await db.execute_many(
        "INSERT INTO users (id, name) VALUES ($1, $2)",
        [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
    )

API reference

class sqlpyhelper.async_helper.AsyncSQLPyHelper(db_type: str | None = None, host: str | None = None, user: str | None = None, password: str | None = None, database: str | None = None, driver: str | None = None, port: str | None = None, oracle_sid: str | None = None)[source]

Bases: object

Async-native database helper with a unified API across SQLite, PostgreSQL, MySQL, SQL Server, and Oracle.

Use as an async context manager:

async with AsyncSQLPyHelper(db_type="postgres", ...) as db:
    rows = await db.fetch_all("SELECT * FROM users")

Or manage the connection lifecycle manually:

db = AsyncSQLPyHelper(db_type="sqlite", database="my.db")
await db.connect()
try:
    rows = await db.fetch_all("SELECT * FROM users")
finally:
    await db.close()
async begin_transaction() None[source]

Begin an explicit transaction.

For PostgreSQL, use the transaction() context manager instead, which is the idiomatic asyncpg approach.

Raises:

AsyncQueryError – If the transaction cannot be started.

async close() None[source]

Close the database connection.

async close_pool() None[source]

Close the async connection pool.

async commit_transaction() None[source]

Commit the current transaction.

async connect() None[source]

Open the database connection.

async execute(query: str, *args: Any) None[source]

Execute a SQL statement (INSERT, UPDATE, DELETE, DDL).

Use $1, $2, … for parameterised values:

await db.execute(
    "INSERT INTO users (id, name) VALUES ($1, $2)",
    1, "Alice"
)
Parameters:
  • query – SQL query string using $1, $2 placeholders.

  • *args – Query parameters.

Raises:

AsyncQueryError – If the query fails.

async execute_many(query: str, args_list: list[tuple]) None[source]

Execute a SQL statement multiple times with different parameters. Efficient for bulk inserts:

await db.execute_many(
    "INSERT INTO users (id, name) VALUES ($1, $2)",
    [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
)
Parameters:
  • query – SQL query string using $1, $2 placeholders.

  • args_list – List of parameter tuples.

Raises:

AsyncQueryError – If the operation fails.

async fetch_all(query: str, *args: Any) list[Any][source]

Execute a SELECT query and return all rows.

Parameters:
  • query – SQL query string using $1, $2 placeholders.

  • *args – Query parameters.

Returns:

A list of rows (empty list if no rows matched).

Raises:

AsyncQueryError – If the query fails.

async fetch_one(query: str, *args: Any) Any | None[source]

Execute a SELECT query and return a single row, or None.

Parameters:
  • query – SQL query string using $1, $2 placeholders.

  • *args – Query parameters.

Returns:

A single row, or None if no rows matched.

Raises:

AsyncQueryError – If the query fails.

async fetch_val(query: str, *args: Any) Any | None[source]

Execute a SELECT query and return a single scalar value.

Useful for COUNT, SUM, or any query returning one value:

count = await db.fetch_val("SELECT COUNT(*) FROM users")
Parameters:
  • query – SQL query string using $1, $2 placeholders.

  • *args – Query parameters.

Returns:

A single scalar value, or None.

Raises:

AsyncQueryError – If the query fails.

async rollback_transaction() None[source]

Roll back the current transaction.

async setup_pool(min_size: int = 1, max_size: int = 10) None[source]

Set up an async connection pool.

Supported for PostgreSQL and MySQL only. After calling this, use get_connection_from_pool() to acquire connections.

Parameters:
  • min_size – Minimum number of connections in the pool.

  • max_size – Maximum number of connections in the pool.

Raises:

AsyncConnectionError – If pool setup fails or db_type does not support pooling.

class sqlpyhelper.async_helper.AsyncConnectionError[source]

Bases: Exception

Raised when an async database connection fails.

class sqlpyhelper.async_helper.AsyncQueryError[source]

Bases: Exception

Raised when an async query fails.