Production Python 1: Setting up FastAPI to talk to a “real” database.
In which I talk about how to do real things in Python in real environments you might deploy to.
This is the first entry in my series on Production Python. Some of the later entries in this will be paid subscribers only, but I consider showing people how to connect to a database correctly a public service, so here goes.
One of the most frustrating things for me over the years whenever I look at a new framework is that they like to gloss over realistic database setups in favor of showing you how fast and easy the rest of the framework is. I get it, databases are boring, but most real apps with actual load on them, and connected to the actual internet use:
None of the “out of the box” documentation tells you how to use that setup correctly, what the pitfalls and advantages are, or really provides any opinion whatsoever about what a good setup looks like. If it were relatively easy to get a correct setup, that’d be one thing, but there are a million ways to get it wrong, and the bugs when you have it wrong are subtle. You get processes forgetting to close out connections, transactions that dangle at the end of requests, scenarios where you accidentally use stale data from the replica instead of updated data in your current database session, and other things that will cost you and your team months and dollars to debug. Oh and then there’s asyncio
and all the fun associated with that if you’re not used to it.
I’m sure other people have opinions on my setup, but what I’ve put together works and the rules for using it are clear and consistent. In this post I intend to walk you through it. This is a setup I’ve used with apps with tens of thousands of simultaneous users, so rest assured, it will scale. This setup uses:
Why do I care about the read replica?
The correct question is really “at what point” do I care about the read replica. If you have a small number of transactions per second, you don’t. If your writes vastly outnumber your reads, you don’t. But once your FastAPI serves a million plus API requests per second, it will be cheaper to offload read to the read replica than to scale up larger instances of RDS to handle the load on the writer alone at speed.
The read replica takes all your readonly queries and handles them without interrupting or locking (mostly). By doing so it gives you several times as much throughput from the same size instance than if you were using the writer alone.
So while you might not need the read replica at first, it’s not that much more work to set it up if you think you might eventually get there, and it’s much less debugging and refactoring to set it up correctly first and deal with the separation between transactions with at least one write and ones without.
It is more complex, but there’s a simple rule for when to use the writer vs. the reader. If my transaction has at least one write associated with it, I have to use the writer. Otherwise use the reader. Don’t mix the reader and writer in the same operation, at least not in SQLAlchemy. It’s technically feasible, but makes for crappy, unreadable, easy to screw up code that isn’t worth the microseconds of speedup you might get from it. Just use one or the other. If you eventually need the speedup, optimize the crap out of it then and drink something strong before you hit merge.
SQLAlchemy?
Okay, I assume you’ve heard of it, but if you wonder why that and not something else, SQLAlchemy is the one library that keeps me from moving off of Python more than any other feature. Python’s nice. FastAPI is amazing. But SQLAlchemy is absolutely unmatched in other languages (maybe aside from Clojure). Nothing else gives you as much control about how and whether you use the ORM, what kind of queries you can execute, and how safely you can compose and reuse them. It’s the best, hands down. The others aren’t even close (again, except for possibly Clojure), and I have used LOTS of database frameworks.
Why asyncpg?
You could drop all the async parts of this post and use psycopg2 and it would work just fine. I don’t necessarily even recommend against that. asyncpg
will allow your application to switch to doing something else if the attempt to grab a connection or a chunk of records would block. Whether that benefits you in practice or not is pretty subtle, but I’m using it here because FastAPI is async
-first and most of the coding styles you see that use it are async. Given that the thing most likely to block your API behind internet bandwidth is your database, it seems like a good choice to use an async connector. It does come with subtlety though, and if you’re convinced async programming isn’t worth the headache then I say don’t bother.
Using the same code for Local Dev and Prod
I presume that if you’re here you’ve already got an Aurora or RDS instance. You either have the Helm charts and Kube or have hand clicked that button or you have a friendly Devops Elf who has done all the relevant setup bits for you. Presumably they’ve also setup SecretsManager or some such, so I’ll at least cover how I connect to this here.
I also presume that you’re not using these production resources in development. Instead you’re using something brilliant like Localstack for your AWS and a vanilla PG docker image to handle it. If you’re using a cloud development environment, good for you and you can probably simplify this greatly, but I’m going to start as if you’re running everything on your laptop as I was.
I promise this is the ugliest code in the post, but you’re not going to bootstrap a Boto3 connection and get credentials out of secrets manager with some clean Pydantic magic.
import os
import json
import boto3
_secrets = {"DB_PASSWORD": os.environ.get("DB_PASSWORD", "postgres")}
if os.environ.get("AWS_USE_LOCALSTACK", "false") == "false":
print("Using AWS. Fetching secrets from secrets manager")
session = boto3.session.Session()
secret_client = session.client(
service_name="secretsmanager",
region_name=os.environ.get("AWS_REGION_NAME", "us-east-1"),
)
aurora_secret = secret_client\
.get_secret_value(SecretId=os.environ["DB_SECRET_ID"])
secrets = json.loads(aurora_secret["SecretString"])
else:
print("Not using secrets manager")
What it does is ask the environment if it’s running locally and should assume localstack, and if so all the credentials and such will be environment vars instead of secrets. If we’re running in AWS on the other hand, we’re using genuine AWS services and our creds should be in SecretsManager.
“Hey Jeff, why should I be using SecretsManager?” Well, since you asked, it’s the safest and most compliant place to put secrets like keys and passwords that your app will access in production. There’s a more technical explanation, but this is an article about databases, not secrets. Just please don’t hardcode your secrets in the config file and store them in GitHub. Knowing you’ve done so will make me balder than I already am, and you don’t want that, do you?
Now this way of doing it may not work for you. If you judge your setup more sensitive, then you may want to load your secrets from string at the time of connection and discard them as soon as the connection is made. If that’s the case, wrap the above in a function and call it when you’re connecting. You’ll also want to discard and destroy any settings object with those secrets rather than using it as a singleton. We’re going to skip all that because the extra work isn’t hard and isn’t hard to get right, but is rather a lot of extra boilerplate.
Connecting
First we have a relatively simple object that we can use in our app as a singleton. You could do this as a module instead and skip the “non pythonic” static class thingie (technical term), but I have it this way so I can control when it’s initialized via FastAPI’s lifecycle mechanism.
import logging
import os
from asyncio import current_task
from sqlalchemy import AsyncAdaptedQueuePool
from sqlalchemy.ext.asyncio import (
async_sessionmaker,
create_async_engine,
create_async_pool_from_url,
AsyncSession,
async_scoped_session,
)
from ..config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class DB:
AsyncWriterLocal = None # this is an AsyncSessionMaker object
AsyncReaderLocal = None # this is an AsyncSessionMaker object
@classmethod
async def get(cls):
if cls.AsyncWriterLocal is None:
writer_pool = create_async_pool_from_url(
settings.sqlalchemy_db_uri,
pool_pre_ping=True,
pool_size=settings.sqlalchemy_pool_size,
max_overflow=settings.sqlalchemy_pool_overflow_size,
# keep statements balanced with a round-robin connection
# pool
poolclass=AsyncAdaptedQueuePool,
)
async_writer = create_async_engine(
settings.sqlalchemy_db_uri,
pool=writer_pool,
echo=settings.sqlalchemy_echo_sql,
execution_options={"isolation_level": "READ COMMITTED"},
)
cls.AsyncWriterLocal = async_sessionmaker(
bind=async_writer,
autoflush=True,
expire_on_commit=False,
future=True,
)
reader_pool = create_async_pool_from_url(
settings.sqlalchemy_db_reader_uri,
pool_pre_ping=True,
pool_size=settings.sqlalchemy_pool_size,
max_overflow=settings.sqlalchemy_pool_overflow_size,
poolclass=AsyncAdaptedQueuePool,
)
async_reader = create_async_engine(
settings.sqlalchemy_db_reader_uri,
pool=reader_pool,
execution_options={
"isolation_level": "REPEATABLE READ",
"postgresql_readonly": True,
"postgresql_deferrable": True,
},
echo=settings.sqlalchemy_echo_sql,
)
cls.AsyncReaderLocal = async_sessionmaker(
bind=async_reader,
autoflush=False,
autobegin=False,
future=True,
)
return cls
@classmethod
async def dispose(cls):
"""Close connections cleanly on shutdown"""
await cls.AsyncReaderLocal.kw["bind"].dispose()
await cls.AsyncWriterLocal.kw["bind"].dispose()
What am I looking at?
This class has round robin connection pooling built in so that you balance your reads and writes across the available compute. It creates a reader that will break if you try to write to it The read replica on AWS will throw an exception if you try to write to it, but if you’re using a single PG instance for local dev, the "postgresql_readonly": True
in the async reader execution options is your guard against fat-fingering a write to the reader and not catching it until it’s in prod.
Let me take a second to explain the stuff that’s kind of custom about these options:
isolation_level
: I find these isolation levels are best for performance on my setup, especially given that I also usealembic
for online migrations. From experience, I can say that using the wrong isolation level can cause your whole application to halt and catch fire for the duration of your database upgrade. Don’t do that unless you just like receiving angry calls from your VP of Engineering. And if I’m your VP of Engineering, have mercy on me.I don’t do
expire_on_commit
, because while it might be more “hygienic” to do so, I inevitably get worse performance in my API for not a lot of benefit. YMMV.The other custom bits I pull from
settings
are flags that SRE typically adjusts at will to prevent the API from overrunning compute or cost bounds. I won’t bore you with those details because you probably don’t care.
Now, the method get
creates the sessionmakers on the first attempt to create a database context (the database context is next) and keeps them there until they are dispose
d. That method grabs the underlying connection object, absurdly called the “bind”, and releases it back to the database, which is super-important when you’re running a hundred or so pods of API containers behind your load balancer.
Why do I do it this way?
I’ve glossed over some details. The point of this object is to make sure that the database connection resources are initialized and disposed of at the right time to prevent your API from:
Using a connection or state object in an
asyncio
-unsafe or thread-unsafe way.Allocating connections unnecessarily by initializing before they’re needed.
Failing to release connections in a timely or orderly fashion when an API process shuts down.
Failing to commit or rollback transactions on shutdown, potentially leading to dangling locks that destroy your performance without explanation.
How is this used?
FastAPI has a special function called lifespan
that you can attach lifespan events to. It’s a context manager defined in a single function with a bare yield
keyword between setup and teardown. Below, during setup we do a get
to create the objects. Then during teardown, we do a dispose
to make sure the transactions are closed and the connections cleaned up and released before the worker process dies.
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
log.info(f"Environment is {settings.environment}")
await DB.get()
log.info("Ready.")
yield
# Clean up and release the resources
await DB.dispose()
log.info("Database connections closed")
Functionally, this grabs our settings object from the config
module, which determines if we were using SecretsManager for our creds, fetches them if that’s the case, and then uses them in the service of DB.get() or falls back to envvars if this is local development.
Using the Connection: The Context object
I realize there’s something called the request context already in FastAPI. This is not that and you can scream at me for naming it Context, but I never use the request context in practice so your screams will fall on deaf ears. This is the "database context." it's like the request context, but more than one can be created per request.
The reader is begun before the writer, meaning that the reader will only fetch records that were committed before the context was begun. This ensures better transactional consistency than other ways I've tried, and makes for clean sessions and simple rules for reading code.
Do not reuse this object for more than one with
block, as the sessions are closed on exit. I’ll provide an example of using this object in the next section, but for now let’s see the class and then break it down:
class Context:
def __init__(self, begin_writer=True, begin_reader=True, **kwargs):
self.writer: AsyncSession | None = None
self.reader: AsyncSession | None = None
self.begin_writer: bool = begin_writer
self.begin_reader: bool = begin_reader
self.request_time = None
self.extra = kwargs
def __call__(self, ...): # NOTE TODO BELOW
x = Context(self.begin_writer, self.begin_reader, **self.extra)
# helpful for establishing order if you need to debug
x.request_time = now("utc")
x.writer = None
x.reader = None
# TODO this lets you add extra things to the context in the
# dependency injection stage.
# x.extra.update(...)
return x
async def __getitem__(self, key, default=None):
return self.extra.get(key, default)
async def __aenter__(self):
# grab Sessions and start the transaction.
db = await DB.get()
self.writer = async_scoped_session(
db.AsyncWriterLocal,
scopefunc=current_task,
)()
self.reader = async_scoped_session(
db.AsyncReaderLocal,
scopefunc=current_task,
)()
if self.begin_reader:
await self.reader.begin()
if self.begin_writer:
await self.writer.begin()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
# commit or rollback
if exc_type is not None:
await self.writer.rollback()
await self.reader.rollback()
elif self.begin_writer:
await self.writer.commit()
await self.writer.close() # close the session out so the connection gets returned to the pool
if self.begin_reader:
await self.reader.rollback()
await self.reader.close() # close the session out so the connection gets returned to the pool
if os.environ.get(
"PYTEST_CURRENT_TEST"
): # prevents ioloop leakage during pytest.
await self.writer.bind.dispose()
await self.reader.bind.dispose()
return False
This defines our context class as an “async contextmanager”. The contextmanager abstraction in Python is excellent for handling things like database connections and other finite resources as it assures that teardown will happen as soon as you exit the block, even in exceptional cases. We take advantage of that functionality by defining __aexit__
here.
The Context class is also meant to be used with FastAPI’s dependency injection. More on that in a bit, but I mention it because it’s why we have both a __call__
and an __init__
.
This one deserves a bit more detailed of a breakdown than the DB class.
The __init__
function sets up a clean slate and determines how the Context will be used. For FastAPI dependency injection, this lets us define a ReadonlyContext and a Context instance separately, making it clearer when defining our endpoints whether they write data or not.
The __call__
function gets called in the case of dependency injection, and note there’s a bit here where you’d fill in your own items. It creates a new instance with __init__
and returns it, creating a new context that will be used during your API call. You can add “extra” context by adding parameters to the call function and FastAPI will source those bits from parameters, path or otherwise, with the same name in the API route declaration.
The __aenter__
happens when you actually use the context object in an async with
statement. The most important non-obvious bits in here are
self.writer = async_scoped_session(
db.AsyncWriterLocal,
scopefunc=current_task,
)()
self.reader = async_scoped_session(
db.AsyncReaderLocal,
scopefunc=current_task,
)()
This uses the current asyncio task as the scope for the session, preventing you from accidentally using the same session across tasks, which is a great way to end up with dangling transactions. Trust me from experience, not doing this is dangerous.
Finally __aexit__
tears down the context and returns connections to the pool. If we have an exception, we rollback both transactions and we can assume the connection will return back to the pool and we don’t need .close()
. Adding close will simply throw another exception if the original exception was a closed connection. Otherwise we rollback the reader (for consistency) and commit the writer, then return them both to the pool.
Using the Context
In my api.py
I setup dependency injection thusly:
RWCtx = Annotated[Context, Depends(Context())]
ReadonlyCtx = Annotated[Context, Depends(Context(begin_writer=False))]
Now it can be used in an API call:
@app.route("/{object_id}/delete") # if object_id is an arg of `__call__` it will be part of the context object.
async def delete_item(ctx: RWCtx):
try:
async with ctx:
cmd = DeleteItem()
await cmd.cmd(ctx)
except ObjectNotFound as e:
raise HTTPException(status_code=404, detail=error_detail(e))
or explicitly like this:
async def some_query():
reader = Context(begin_writer=False)
async with reader() as ctx:
value = await ctx.reader.scalar(select(...))
return value
In either case, this will setup and teardown the connection correctly, and let us separate our queries by reader and writer. Note that in the first version, I do not call ctx, and in the second version I do. The first version was already __call__
-ed by FastAPI during dependency injection. The second version I created explicitly, so I have to call it and use the return value in the with statement. Technically with this simplified version I suppose you don’t need to do that, but I always have extra things I keep in my context like the current user, the object id referenced in the API call, etc, and so I would add those into the call to reader() here if this were a real call.
When should I use ctx.reader
?
If your API endpoint is read only, then you should definitely use the reader.
If your API call mixes reads and writes, but there’s a readonly portion where you need a few bits of data, but you don’t need that data to be part of the session state, then you may want to use the reader.
I use the reader in this case when I’m pulling data from a high-write table that I’m not writing to in the call, or when I need to make several queries in succession for some reason. I don’t use the reader and writer on the same table unless there’s some rare case where I explicitly want the “before state” for that table regardless of what I add in the writer’s session. If I do that, I document it explicitly because it’s really easy to screw up and pull stale data into a transaction.
When should I break things up into separate with
blocks?
A separate with block is a separate transaction. Again, with this Context class, the reader
is always explicitly begun before the writer, so it will always have data pre-write, even if you explicitly call commit()
on the writer. This is by design. So if you want to read something you just wrote, either do that on the writer (recommended) or do it in a separate with
block.
Also, it’s better to keep transactions short in high-frequency API endpoints. The read-replica helps with performance, but the smaller the transaction is, the fewer connections you hold open and the more likely everyone’s application is responsive. If you need to write, then read a bunch of data, and then write again, I’d do those in three separate transactions because presumably the big read in the middle shouldn’t hold resources that could be used for writes.
Bottom line, there’s no hard and fast rule and a lot of subtlety. Keep in mind the design of the Context class is for consistency in reads and writes and use it appropriately. Don’t feel like a with
block is this giant heavy thing. It’s just a transaction. Entering and exiting is no more expensive than the transaction itself.
One Last Best Practice: Providing a Healthcheck
Most real cloud implementations explicitly check the health of the API by pinging a specific endpoint. I like to make sure that the database connection is still good, so this is the database portion of that healthcheck endpoint. it creates and enters a context, then selects a scalar 1
from the db to make sure that the connection is up.
@app.api_route("/health", methods=["HEAD", "GET", "OPTIONS"], include_in_schema=False)
async def check_health():
log.debug("Checking reader and writer")
from myapp.data.database import Context
ctx = Context()
async with ctx("", "", "", "") as ctx:
await ctx.reader.scalar(select(1))
await ctx.writer.scalar(select(1))
log.debug("Healthcheck succeeded")
return JSONResponse(status_code=200, content={"healthy": True})
Wrapping up
The point of this article was to show you how to connect to a database in a “real” production like system you’re likely to encounter in practice. Most “real” systems use cloud databases, and most cloud databases provide redundancy and read-replicas. These features are not well-explored in the setup guides people write for FastAPI (and frankly any Python app framework), even though they’re usually needed in real applications.