FastAPI + SQLAlchemyでscoped_sessionを使うとエラーが発生する
- 発生した環境は以下
- Python 3.8.10
- SQLAlchemy 2.0.18
- fastapi 0.99.1
エラー内容
FastAPIを使っていてSQLAlchemyでDBにアクセスを連続で行ったときに以下のようなエラーが発生することがある。
File "./fastapi-session-test/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 926, in _raise_for_prerequisite_state raise sa_exc.InvalidRequestError( sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)
エラー内容からするとSQLAlchemyのSession周りのエラーで同時に複数スレッドからSessionを共有している場合に発生するらしい。 SQLAlchemyはThreadLocal変数を利用してる(Thread-Local Scope)ので複数スレッドから同時にSessionを作っても別々の変数が割当たるので問題がないと思っていたが、どうも違うのかもしれない…。
今回エラーが発生するソースコードではscoped_sessionを使うとエラーが発生するが、socped_sessionを使わない場合はエラーが発生しない。 どうやったらこのエラーの発生する原因を突き止められるかなと考えたが、すぐには思いつかなかった。 今のところはscoped_sessionを使わない方法でエラーを回避する。
※ソースコードから抜粋
DB_FILENAME = "sql_app.db" DATABASE_URL = f"sqlite:///./{DB_FILENAME}" engine = create_engine(DATABASE_URL, echo=False) SessionLocal = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) # SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
検証に利用したテスト用のソースコード
uvicorn main:appで起動して/user/[id]
に同時にアクセスすると時々エラーが発生する。
import logging from fastapi import Depends, FastAPI, Request from sqlalchemy import create_engine, insert from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, scoped_session, sessionmaker from sqlalchemy.sql.sqltypes import Integer, String from starlette.exceptions import HTTPException as StarletteHTTPException DB_FILENAME = "sql_app.db" DATABASE_URL = f"sqlite:///./{DB_FILENAME}" engine = create_engine(DATABASE_URL, echo=False) SessionLocal = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) # SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) logger = logging.getLogger("uvicorn.error") logger.setLevel(logging.DEBUG) fh = logging.FileHandler("sql_app.log") fh.setLevel(logging.DEBUG) formatter = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" fh.setFormatter(logging.Formatter(formatter)) logger.addHandler(fh) class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True) name: Mapped[str] = mapped_column(String) email: Mapped[str] = mapped_column(String) password: Mapped[str] = mapped_column(String) Base.metadata.create_all(bind=engine) def insert_test_data(max_data=100000000): print("start test data insert") session = SessionLocal() print("create insert data") add_data = [dict(name="XXXXXXXXXX", email="XXXXX@exmaple.com", password="123456") for _ in range(max_data)] print("created insert data") session.execute(insert(User), add_data) session.commit() session.close() print("data inserted!") if not DB_FILENAME: insert_test_data(1000000) def get_db(): db = SessionLocal() try: yield db finally: db.close() app = FastAPI() @app.middleware("http") async def exception_handling_middleware(request: Request, call_next): try: return await call_next(request) except StarletteHTTPException as e: logger.error(f"url: {request.url}, error: {e}") logger.error("", exc_info=True, stack_info=True) raise e @app.get("/") def api_status(): return {"status": "ok"} @app.get("/users") def get_usere(db: Session = Depends(get_db)): print(f"users: {id(db)}") return db.query(User).all() @app.get("/user/{user_id}") def get_user(request: Request, user_id: int, db: Session = Depends(get_db)): logger.debug(f"url: {request.url}, user: {id(db)}") return db.query(User).filter(User.id == user_id).first()