PgBeam Docs

psycopg

Connect Python applications using psycopg to PgBeam for connection pooling, caching, and global routing. Includes SQLAlchemy integration.

Connect your Python application to PgBeam by updating the connection string. This guide covers psycopg 3, psycopg connection pools, and SQLAlchemy integration.

Setup

Set the connection string

Environment variable
export DATABASE_URL=postgresql://user:pass@abc.aws.pgbeam.app:5432/mydb

Connect with psycopg

main.py
import os
import psycopg

conn = psycopg.connect(os.environ["DATABASE_URL"])

with conn.cursor() as cur:
    cur.execute("SELECT 'hello from pgbeam'")
    print(cur.fetchone())

conn.close()

Use psycopg's built-in pool with a small pool size:

Using ConnectionPool
from psycopg_pool import ConnectionPool
import os

pool = ConnectionPool(
    conninfo=os.environ["DATABASE_URL"],
    min_size=1,
    max_size=5,  # PgBeam handles upstream pooling — keep this low
)

with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT 1")
        print(cur.fetchone())

Pool sizing

PgBeam handles upstream connection pooling, so keep your application-side pool small:

Deployment typeRecommended max_size
Single process (Gunicorn)5-10
Multiple workers2-3 per worker
Serverless (Lambda)1-2

With PgBeam in transaction pool mode, each psycopg connection only holds an upstream connection during active transactions. A small pool handles high concurrency efficiently.

SQLAlchemy integration

psycopg works as the default PostgreSQL driver for SQLAlchemy. Update your engine configuration to point at PgBeam:

SQLAlchemy 2.x with psycopg 3
import os
import re
from sqlalchemy import create_engine

# postgresql+psycopg:// uses psycopg 3 as the driver
url = re.sub(r"^postgres(ql)?://", "postgresql+psycopg://", os.environ["DATABASE_URL"])
engine = create_engine(url, pool_size=5, max_overflow=0)
SQLAlchemy 1.x
import os
from sqlalchemy import create_engine

engine = create_engine(os.environ["DATABASE_URL"], pool_size=5, max_overflow=0)

Set max_overflow=0 to prevent SQLAlchemy from creating connections beyond the pool size. PgBeam handles overflow at the proxy level.

Caching

Automatic caching via cache rules

For ORM queries (SQLAlchemy, Django ORM), PgBeam tracks the SQL shapes automatically. Enable caching for specific shapes through Cache Rules in the dashboard.

SQL annotations for fine-grained control

Cache control with psycopg
# Cache for 5 minutes
cur.execute("/* @pgbeam:cache maxAge=300 */ SELECT * FROM categories")

# Disable caching for a specific query
cur.execute("/* @pgbeam:cache noCache */ SELECT balance FROM accounts WHERE id = %s", [account_id])

Read replicas

Route reads to replicas with the /* @pgbeam:replica */ annotation:

Replica routing with psycopg
# Route to a read replica
cur.execute("/* @pgbeam:replica */ SELECT * FROM products WHERE active = true")

# Combine with caching
cur.execute(
    "/* @pgbeam:replica */ /* @pgbeam:cache maxAge=600 */ SELECT * FROM categories"
)

Standard ORM queries always go to the primary. For replica routing with SQLAlchemy, use text() or raw SQL execution.

See Read Replicas for replica setup and routing details.

Error handling

psycopg maps PostgreSQL SQLSTATE codes to specific exception classes:

Handling PgBeam errors
import psycopg

try:
    cur.execute("SELECT 1")
except psycopg.errors.TooManyConnections:
    # SQLSTATE 53300 — connection limit exceeded
    # Reduce pool size or upgrade plan
    pass
except psycopg.errors.ConfigurationLimitExceeded:
    # SQLSTATE 53400 — query rate limit exceeded
    # Enable caching or upgrade plan
    pass
except psycopg.OperationalError as e:
    if "circuit breaker" in str(e):
        # SQLSTATE 08006 — upstream unavailable
        # Retry with backoff
        pass

See Error Codes for the full reference.

Django integration

Django uses psycopg as its default PostgreSQL backend. Update DATABASES in settings.py:

settings.py
import dj_database_url
import os

DATABASES = {
    "default": dj_database_url.config(
        default=os.environ["DATABASE_URL"],
        conn_max_age=600,
    )
}

# Reduce Django's connection pool
DATABASES["default"]["CONN_MAX_AGE"] = 600
DATABASES["default"]["OPTIONS"] = {
    "pool": {
        "min_size": 1,
        "max_size": 5,
    }
}

Migrations

Run migrations directly against your origin database:

# Alembic
DATABASE_URL="postgresql://user:pass@db.example.com:5432/mydb" alembic upgrade head

# Django
DATABASE_URL="postgresql://user:pass@db.example.com:5432/mydb" python manage.py migrate

Debugging

Enable debug mode to see cache and routing details:

cur.execute("SET pgbeam.debug = on")
cur.execute("SELECT * FROM users WHERE id = %s", [user_id])
# NOTICE: pgbeam: cache=hit age=12s ttl=60s swr=30s

Common issues

IssueCauseFix
"too many connections"Pool too largeSet max_size=5 in ConnectionPool
OperationalError on connectCold start after inactivityNormal — first connection is slower
Stale data after writesCache returning old resultsAdjust TTL or use noCache annotation

Further reading

On this page