Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Resolve async generator as an dependency injection #776

Unanswered
agn-7 asked this question in Q&A
Discussion options

I have an async generator function which yields a db async session within it. Now, I am looking for a dependency injection approach to resolve it exactly like Depends() acts in FastAPI!

Here's the get_db() async generator:

async def get_db() -> AsyncGenerator[AsyncSession, None]:
 async with async_session() as session:
 yield session

In the FastAPI router, I can simply use Depends() to resolve an async generator and reach to the async session (db session):

@router.get("/interactions")
async def get_all_interactions(db: Annotated[AsyncSession, Depends(get_db)]):
 interactions = await crud.get_interactions(db=db)
 ...

Now, outside of the request, how can I inject the get_db within a new method and get rid of async for inside of the method?

@cli.command(name="create_superuser")
async def create_superuser(): # Note: how to pass db session here as param?
 username = click.prompt("Username", type=str)
 email = click.prompt("Email (optional)", type=str, default="")
 password = getpass("Password: ")
 confirm_password = getpass("Confirm Password: ")
 if password != confirm_password:
 click.echo("Passwords do not match")
 return
 async for db in database.get_db(): # Note: remove it from here
 user = schemas.UserAdminCreate(
 username=username,
 email=None if not email else email,
 password=password,
 role="admin",
 )
 await crud.create_user(db=db, user=user)

Can I use this package to do something like this?

from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
@cli.command(name="create_superuser")
@inject
async def create_superuser(db: AsyncSession = Provide[database.get_db]): # Note
 # same code here
 user = schemas.UserAdminCreate(...)
 await crud.create_user(db=db, user=user)
You must be logged in to vote

Replies: 1 comment

Comment options

@agn-7 I gave up trying to do it only using dependency-injector. Somehow fastapi fails to recognize async generator functions that are supplied from Provide[...]. So this is my current solution, very hacky, but working.
#728 (comment)

P.S. Realised you want this to work outside of fastapi, here's a possible solution using Closing[Provide[...]]:

import asyncio
import os
import sqlalchemy as sa
from dependency_injector import providers
from dependency_injector.containers import DeclarativeContainer
from dependency_injector.wiring import Closing, Provide, inject
from sqlalchemy.ext.asyncio import (
 AsyncSession,
 async_scoped_session,
 async_sessionmaker,
 create_async_engine,
)
async def init_db_engine():
 dsn = os.environ["POSTGRES_DSN"]
 engine = create_async_engine(dsn, echo=True)
 print("engine start")
 yield engine
 print("engine stop")
 await engine.dispose()
session_factory = Provide["db_scoped_session"]
async def init_session():
 session = (await session_factory)()
 async with session:
 print("session before")
 yield session
 print("session after")
class Container(DeclarativeContainer):
 db_engine = providers.Resource(init_db_engine)
 db_session_factory = providers.Resource(async_sessionmaker, db_engine)
 db_scoped_session = providers.ThreadSafeSingleton(
 async_scoped_session,
 session_factory=db_session_factory,
 scopefunc=asyncio.current_task,
 )
 db_session = providers.Resource(init_session)
@inject
async def call_db(db: AsyncSession = Closing[Provide[Container.db_session]]):
 return await db.execute(sa.text("select version()"))
async def main():
 await call_db()
 await call_db()
 await call_db()
if __name__ == "__main__":
 container = Container()
 container.wire(modules=[__name__])
 loop = asyncio.get_event_loop()
 loop.run_until_complete(main())
 loop.run_until_complete(container.shutdown_resources())
You must be logged in to vote
0 replies
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet

AltStyle によって変換されたページ (->オリジナル) /