Async Support
AsyncSQLPyHelper provides an async-native API for use with FastAPI,
Starlette, and any asyncio-based application.
Drivers used
Database |
Async driver |
|---|---|
SQLite |
|
PostgreSQL |
|
MySQL |
|
SQL Server |
|
Oracle |
|
Installation
Install with async extras:
pip install sqlpyhelper[async-postgres] # PostgreSQL
pip install sqlpyhelper[async-mysql] # MySQL
pip install sqlpyhelper[async-sqlite] # SQLite
pip install sqlpyhelper[async-all] # All async drivers
Basic usage
import asyncio
from sqlpyhelper.async_helper import AsyncSQLPyHelper
async def main():
async with AsyncSQLPyHelper(db_type="sqlite", database="my.db") as db:
await db.execute(
"CREATE TABLE IF NOT EXISTS users (id INTEGER, name TEXT)"
)
await db.execute(
"INSERT INTO users VALUES ($1, $2)", 1, "Alice"
)
rows = await db.fetch_all("SELECT * FROM users")
print(rows)
asyncio.run(main())
Placeholders
All queries use $1, $2, ... style placeholders regardless of database.
AsyncSQLPyHelper translates them automatically to the correct style
for each driver:
Database |
Placeholder style |
|---|---|
PostgreSQL |
|
SQLite |
|
MySQL / SQL Server |
|
Oracle |
|
# Write once, works on all databases
await db.execute(
"INSERT INTO users (id, name) VALUES ($1, $2)",
1, "Alice"
)
FastAPI integration
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlpyhelper.async_helper import AsyncSQLPyHelper
db = AsyncSQLPyHelper(
db_type="postgres",
host="localhost",
user="user",
password="pass",
database="mydb",
)
@asynccontextmanager
async def lifespan(app: FastAPI):
await db.connect()
yield
await db.close()
app = FastAPI(lifespan=lifespan)
@app.get("/users")
async def get_users():
return await db.fetch_all("SELECT * FROM users")
@app.post("/users")
async def create_user(name: str):
await db.execute(
"INSERT INTO users (name) VALUES ($1)", name
)
return {"status": "created"}
Connection pooling
For high-traffic applications, use setup_pool() instead of a single
connection. Supported for PostgreSQL and MySQL:
from sqlpyhelper.async_helper import AsyncSQLPyHelper
db = AsyncSQLPyHelper(
db_type="postgres",
host="localhost",
user="user",
password="pass",
database="mydb",
)
async def startup():
await db.setup_pool(min_size=2, max_size=20)
async def shutdown():
await db.close_pool()
Transactions
async with AsyncSQLPyHelper(db_type="sqlite", database="my.db") as db:
await db.begin_transaction()
try:
await db.execute("INSERT INTO users VALUES ($1, $2)", 1, "Alice")
await db.execute("INSERT INTO orders VALUES ($1, $2)", 1, "Laptop")
await db.commit_transaction()
except Exception:
await db.rollback_transaction()
raise
Bulk inserts
async with AsyncSQLPyHelper(db_type="postgres", ...) as db:
await db.execute_many(
"INSERT INTO users (id, name) VALUES ($1, $2)",
[(1, "Alice"), (2, "Bob"), (3, "Charlie")]
)
API reference
- class sqlpyhelper.async_helper.AsyncSQLPyHelper(db_type: str | None = None, host: str | None = None, user: str | None = None, password: str | None = None, database: str | None = None, driver: str | None = None, port: str | None = None, oracle_sid: str | None = None)[source]
Bases:
objectAsync-native database helper with a unified API across SQLite, PostgreSQL, MySQL, SQL Server, and Oracle.
Use as an async context manager:
async with AsyncSQLPyHelper(db_type="postgres", ...) as db: rows = await db.fetch_all("SELECT * FROM users")
Or manage the connection lifecycle manually:
db = AsyncSQLPyHelper(db_type="sqlite", database="my.db") await db.connect() try: rows = await db.fetch_all("SELECT * FROM users") finally: await db.close()
- async begin_transaction() None[source]
Begin an explicit transaction.
For PostgreSQL, use the transaction() context manager instead, which is the idiomatic asyncpg approach.
- Raises:
AsyncQueryError – If the transaction cannot be started.
- async execute(query: str, *args: Any) None[source]
Execute a SQL statement (INSERT, UPDATE, DELETE, DDL).
Use $1, $2, … for parameterised values:
await db.execute( "INSERT INTO users (id, name) VALUES ($1, $2)", 1, "Alice" )
- Parameters:
query – SQL query string using $1, $2 placeholders.
*args – Query parameters.
- Raises:
AsyncQueryError – If the query fails.
- async execute_many(query: str, args_list: list[tuple]) None[source]
Execute a SQL statement multiple times with different parameters. Efficient for bulk inserts:
await db.execute_many( "INSERT INTO users (id, name) VALUES ($1, $2)", [(1, "Alice"), (2, "Bob"), (3, "Charlie")] )
- Parameters:
query – SQL query string using $1, $2 placeholders.
args_list – List of parameter tuples.
- Raises:
AsyncQueryError – If the operation fails.
- async fetch_all(query: str, *args: Any) list[Any][source]
Execute a SELECT query and return all rows.
- Parameters:
query – SQL query string using $1, $2 placeholders.
*args – Query parameters.
- Returns:
A list of rows (empty list if no rows matched).
- Raises:
AsyncQueryError – If the query fails.
- async fetch_one(query: str, *args: Any) Any | None[source]
Execute a SELECT query and return a single row, or None.
- Parameters:
query – SQL query string using $1, $2 placeholders.
*args – Query parameters.
- Returns:
A single row, or None if no rows matched.
- Raises:
AsyncQueryError – If the query fails.
- async fetch_val(query: str, *args: Any) Any | None[source]
Execute a SELECT query and return a single scalar value.
Useful for COUNT, SUM, or any query returning one value:
count = await db.fetch_val("SELECT COUNT(*) FROM users")
- Parameters:
query – SQL query string using $1, $2 placeholders.
*args – Query parameters.
- Returns:
A single scalar value, or None.
- Raises:
AsyncQueryError – If the query fails.
- async setup_pool(min_size: int = 1, max_size: int = 10) None[source]
Set up an async connection pool.
Supported for PostgreSQL and MySQL only. After calling this, use get_connection_from_pool() to acquire connections.
- Parameters:
min_size – Minimum number of connections in the pool.
max_size – Maximum number of connections in the pool.
- Raises:
AsyncConnectionError – If pool setup fails or db_type does not support pooling.