psycopg
Connect Python applications using psycopg to PgBeam for connection pooling, caching, and global routing. Includes SQLAlchemy integration.
Connect your Python application to PgBeam by updating the connection string. This guide covers psycopg 3, psycopg connection pools, and SQLAlchemy integration.
Setup
Set the connection string
export DATABASE_URL=postgresql://user:pass@abc.aws.pgbeam.app:5432/mydbConnect with psycopg
import os
import psycopg
conn = psycopg.connect(os.environ["DATABASE_URL"])
with conn.cursor() as cur:
cur.execute("SELECT 'hello from pgbeam'")
print(cur.fetchone())
conn.close()Set up connection pooling (recommended)
Use psycopg's built-in pool with a small pool size:
from psycopg_pool import ConnectionPool
import os
pool = ConnectionPool(
conninfo=os.environ["DATABASE_URL"],
min_size=1,
max_size=5, # PgBeam handles upstream pooling — keep this low
)
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1")
print(cur.fetchone())Pool sizing
PgBeam handles upstream connection pooling, so keep your application-side pool small:
| Deployment type | Recommended max_size |
|---|---|
| Single process (Gunicorn) | 5-10 |
| Multiple workers | 2-3 per worker |
| Serverless (Lambda) | 1-2 |
With PgBeam in transaction pool mode, each psycopg connection only holds an upstream connection during active transactions. A small pool handles high concurrency efficiently.
SQLAlchemy integration
psycopg works as the default PostgreSQL driver for SQLAlchemy. Update your engine configuration to point at PgBeam:
import os
import re
from sqlalchemy import create_engine
# postgresql+psycopg:// uses psycopg 3 as the driver
url = re.sub(r"^postgres(ql)?://", "postgresql+psycopg://", os.environ["DATABASE_URL"])
engine = create_engine(url, pool_size=5, max_overflow=0)import os
from sqlalchemy import create_engine
engine = create_engine(os.environ["DATABASE_URL"], pool_size=5, max_overflow=0)Set max_overflow=0 to prevent SQLAlchemy from creating connections beyond the
pool size. PgBeam handles overflow at the proxy level.
Caching
Automatic caching via cache rules
For ORM queries (SQLAlchemy, Django ORM), PgBeam tracks the SQL shapes automatically. Enable caching for specific shapes through Cache Rules in the dashboard.
SQL annotations for fine-grained control
# Cache for 5 minutes
cur.execute("/* @pgbeam:cache maxAge=300 */ SELECT * FROM categories")
# Disable caching for a specific query
cur.execute("/* @pgbeam:cache noCache */ SELECT balance FROM accounts WHERE id = %s", [account_id])Read replicas
Route reads to replicas with the /* @pgbeam:replica */ annotation:
# Route to a read replica
cur.execute("/* @pgbeam:replica */ SELECT * FROM products WHERE active = true")
# Combine with caching
cur.execute(
"/* @pgbeam:replica */ /* @pgbeam:cache maxAge=600 */ SELECT * FROM categories"
)Standard ORM queries always go to the primary. For replica routing with
SQLAlchemy, use text() or raw SQL execution.
See Read Replicas for replica setup and routing details.
Error handling
psycopg maps PostgreSQL SQLSTATE codes to specific exception classes:
import psycopg
try:
cur.execute("SELECT 1")
except psycopg.errors.TooManyConnections:
# SQLSTATE 53300 — connection limit exceeded
# Reduce pool size or upgrade plan
pass
except psycopg.errors.ConfigurationLimitExceeded:
# SQLSTATE 53400 — query rate limit exceeded
# Enable caching or upgrade plan
pass
except psycopg.OperationalError as e:
if "circuit breaker" in str(e):
# SQLSTATE 08006 — upstream unavailable
# Retry with backoff
passSee Error Codes for the full reference.
Django integration
Django uses psycopg as its default PostgreSQL backend. Update DATABASES in
settings.py:
import dj_database_url
import os
DATABASES = {
"default": dj_database_url.config(
default=os.environ["DATABASE_URL"],
conn_max_age=600,
)
}
# Reduce Django's connection pool
DATABASES["default"]["CONN_MAX_AGE"] = 600
DATABASES["default"]["OPTIONS"] = {
"pool": {
"min_size": 1,
"max_size": 5,
}
}Migrations
Run migrations directly against your origin database:
# Alembic
DATABASE_URL="postgresql://user:pass@db.example.com:5432/mydb" alembic upgrade head
# Django
DATABASE_URL="postgresql://user:pass@db.example.com:5432/mydb" python manage.py migrateDebugging
Enable debug mode to see cache and routing details:
cur.execute("SET pgbeam.debug = on")
cur.execute("SELECT * FROM users WHERE id = %s", [user_id])
# NOTICE: pgbeam: cache=hit age=12s ttl=60s swr=30sCommon issues
| Issue | Cause | Fix |
|---|---|---|
| "too many connections" | Pool too large | Set max_size=5 in ConnectionPool |
OperationalError on connect | Cold start after inactivity | Normal — first connection is slower |
| Stale data after writes | Cache returning old results | Adjust TTL or use noCache annotation |
Further reading
- Connection Pooling — Pool modes, sizing, and lifecycle
- Caching — TTL, SWR, cache layers, and cache rules
- Error Codes — SQLSTATE reference for PgBeam errors