-
-
Notifications
You must be signed in to change notification settings - Fork 337
-
I have an async generator function which yields a db async session within it. Now, I am looking for a dependency injection approach to resolve it exactly like Depends()
acts in FastAPI!
Here's the get_db()
async generator:
async def get_db() -> AsyncGenerator[AsyncSession, None]: async with async_session() as session: yield session
In the FastAPI router, I can simply use Depends()
to resolve an async generator and reach to the async session (db session):
@router.get("/interactions") async def get_all_interactions(db: Annotated[AsyncSession, Depends(get_db)]): interactions = await crud.get_interactions(db=db) ...
Now, outside of the request, how can I inject the get_db
within a new method and get rid of async for
inside of the method?
@cli.command(name="create_superuser") async def create_superuser(): # Note: how to pass db session here as param? username = click.prompt("Username", type=str) email = click.prompt("Email (optional)", type=str, default="") password = getpass("Password: ") confirm_password = getpass("Confirm Password: ") if password != confirm_password: click.echo("Passwords do not match") return async for db in database.get_db(): # Note: remove it from here user = schemas.UserAdminCreate( username=username, email=None if not email else email, password=password, role="admin", ) await crud.create_user(db=db, user=user)
Can I use this package to do something like this?
from dependency_injector import containers, providers from dependency_injector.wiring import Provide, inject @cli.command(name="create_superuser") @inject async def create_superuser(db: AsyncSession = Provide[database.get_db]): # Note # same code here user = schemas.UserAdminCreate(...) await crud.create_user(db=db, user=user)
Beta Was this translation helpful? Give feedback.
All reactions
Replies: 1 comment
-
@agn-7 I gave up trying to do it only using dependency-injector. Somehow fastapi fails to recognize async generator functions that are supplied from Provide[...]
. So this is my current solution, very hacky, but working.
#728 (comment)
P.S. Realised you want this to work outside of fastapi, here's a possible solution using Closing[Provide[...]]
:
import asyncio import os import sqlalchemy as sa from dependency_injector import providers from dependency_injector.containers import DeclarativeContainer from dependency_injector.wiring import Closing, Provide, inject from sqlalchemy.ext.asyncio import ( AsyncSession, async_scoped_session, async_sessionmaker, create_async_engine, ) async def init_db_engine(): dsn = os.environ["POSTGRES_DSN"] engine = create_async_engine(dsn, echo=True) print("engine start") yield engine print("engine stop") await engine.dispose() session_factory = Provide["db_scoped_session"] async def init_session(): session = (await session_factory)() async with session: print("session before") yield session print("session after") class Container(DeclarativeContainer): db_engine = providers.Resource(init_db_engine) db_session_factory = providers.Resource(async_sessionmaker, db_engine) db_scoped_session = providers.ThreadSafeSingleton( async_scoped_session, session_factory=db_session_factory, scopefunc=asyncio.current_task, ) db_session = providers.Resource(init_session) @inject async def call_db(db: AsyncSession = Closing[Provide[Container.db_session]]): return await db.execute(sa.text("select version()")) async def main(): await call_db() await call_db() await call_db() if __name__ == "__main__": container = Container() container.wire(modules=[__name__]) loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.run_until_complete(container.shutdown_resources())
Beta Was this translation helpful? Give feedback.
All reactions
-
👍 1