이번 글에서는 FastAPI, Starlette, Uvicorn을 사용할 때의 ASGI 프로토콜 동작 방식을 살펴보고, 미들웨어를 통한 DB 세션 관리 시 주의할 점과 트랜잭션 처리 방식을 설명한다.
1. FastAPI의 ASGI 통신 이해하기
FastAPI 애플리케이션을 Uvicorn으로 실행할 때 내부적으로 어떤 일이 일어나는지 이해하는 건 중요하다. FastAPI와 Starlette는 ASGI(Asynchronous Server Gateway Interface)를 기반으로 동작하고 비동기 처리를 지원한다. Django나 Flask 같은 동기 프레임워크가 사용하는 WSGI(Web Server Gateway Interface)와는 다르다.
정확한 요청 흐름은 다음과 같다:
- Uvicorn이 HTTP 요청을 ASGI 형식으로 변환하여 Request 객체를 만든다.
- FastAPI에서는 Request 객체에 따라 Routing 하여 요청을 처리하고 Response 객체를 생성한다.
- Uvicorn은 Response 객체를 보고 HTTP 응답을 생성해 클라이언트에 전송한다.
2. 미들웨어로 DB 세션 관리하기
미들웨어는 FastAPI에 Request 객체가 전달되기 전 / Uvicorn에 Response 객체가 전달되기 전에 요청을 가로챈다.
FastAPI에서 DB 세션을 미들웨어로 관리할 수 있다. 미들웨어는 다음과 같은 방식으로 구현한다:
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
request.state.db = Session()
try:
response = await call_next(request)
await request.state.db.commit()
return response
except Exception e:
await request.state.db.rollback()
raise e
finally:
await request.state.db.close()
- 미들웨어에서 DB 세션을 시작하여 Request 객체에 주입한다.
- FastAPI에서 요청을 처리하고 DB 세션에 객체를 등록/수정한다.
- FastAPI에서 반환한 세션을 commit 한다.
- 만약 FastAPI에서 작업 처리 중 오류가 발생한 경우 세션을 rollback 한다.
3. StreamingResponse와 Background Task
SSE(Server Sent Event) 통신 환경에서 스트리밍 응답을 반환하는 경우 FastAPI에서는 StreamingResponse 클래스를 사용한다.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from typing import Generator
@app.get("/stream")
async def stream(request: Request):
async def async_gen():
for i in range(5):
yield f"chunk {i}\n"
await asyncio.sleep(0.5)
return StreamingResponse(
async_gen(), media_type="text/event-stream"
)
StreamingResponse 클래스를 사용하면 제네레이터의 내용을 스트리밍 형식으로 반환할 수 있다.
만약 스트리밍이 완료된 후 스트리밍 내용을 데이터베이스에 저장해야 한다는 요구사항이 있다면 어떻게 해야 할까?
StreamingResponse에 전달된 제네레이터가 끝나는 시점에 작업을 수행해야 한다.
이를 위해 FastAPI는 BackgroundTask를 제공한다.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
from typing import Generator
@app.get("/stream")
async def stream(request: Request):
async def async_gen():
for i in range(5):
yield f"chunk {i}\n"
await asyncio.sleep(0.5)
# 스트리밍 완료 후 DB 세션을 사용해 작업을 수행하는 background job
async def background_job(db_session):
print("응답을 DB에 추가...")
await db_session.flush()
return StreamingResponse(
async_gen(),
media_type="text/event-stream",
background=BackgroundTask(background_job, request.state.db)
)
DB에 데이터를 저장하는 작업을 함수로 정의하고 함수를 StreamingResponse에 전달하면 스트리밍 완료 시점에 등록한 함수가 실행된다.
4. 미들웨어에서 StreamingResponse와 Background Task 처리 순서
다시 미들웨어 코드를 살펴보자.
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
request.state.db = Session()
try:
response = await call_next(request)
await request.state.db.commit()
return response
except Exception e:
await request.state.db.rollback()
raise e
finally:
await request.state.db.close()
call_next(FastAPI 라우터 함수)가 response를 반환한 후 finally 블록에서 db 세션이 닫히고 response가 반환된다.
미들웨어 코드만 보면 스트리밍과 Background Task가 어느 시점에 실행되는지 알기 어렵다.
작업이 실행되는 순서를 정리하면 다음과 같다:
- 미들웨어에서 DB 세션 열기
- call_next() 함수로 라우터 호출 → StreamingResponse + Background Task 반환
- 미들웨어: commit() 후 → finally에서 close()
- 미들웨어에서 Response 반환
- ASGI 서버(Uvicorn)가 StreamingResponse를 순회하며 응답 전송
- 스트리밍이 끝나면 Background Task 수행
- HTTP 요청 처리 종료
미들웨어 외부에서 Background Task가 실행되기 때문에 DB 세션은 close 된다.
5. Background Task에서 세션 생성
미들웨어에서 이미 commit 수행 및 세션 close 된 상태이므로, Background Task 내에서 추가적인 DB 작업을 수행할 경우 별도의 commit을 해줘야 한다.
commit을 통해 새로운 트랜잭션의 데이터를 DB에 반영한다.
async def background_job():
db = Session()
new_record = Record(name="background")
db.add(new_record)
db.commit() # commit 필요
db.close()
flush만 호출하면 DB에 실제로 반영되지 않으므로 주의해야 한다.
6. 정리
지금까지 FastAPI와 Uvicorn을 활용한 ASGI 프로토콜의 동작 방식과 미들웨어에서 DB 세션 관리 시 주의할 점을 살펴봤다.
특히 스트리밍 응답을 반환하는 StreamingResponse와 BackgroundTask를 함께 사용할 경우 세션 관리가 까다로워질 수 있다.
미들웨어가 Response를 반환한 뒤에는 세션이 닫혀 있으므로, 스트리밍 완료 후 추가 DB 작업이 필요하면 별도의 세션을 만들고 명시적인 commit을 해야 한다.
동작 흐름을 정확히 이해하고 관리해야 데이터 유실이나 트랜잭션 문제를 예방할 수 있다.
FastAPI를 제대로 활용하려면 결국 ASGI 처리 흐름과 미들웨어의 동작 순서를 파악하고 사용하는 습관이 필요하다.
'공부' 카테고리의 다른 글
Triton Inference Server와 비동기 gRPC로 통신하기 (0) | 2025.02.11 |
---|---|
FastAPI의 페이지네이션 성능 개선기 (0) | 2025.01.30 |
DB 암호화 방식 (0) | 2025.01.18 |
브라우저의 요청이 서버까지 가는 과정 (DNS 요청 과정) (0) | 2024.12.13 |
AWS MediaConvert의 userMetadata 개수 제한 (0) | 2024.12.06 |
댓글