I'm trying to run a basic stateful session using the google.adk library in Python, interacting with a question_answering_agent and InMemorySessionService. I encountered a series of errors related to imports, asynchronous operations, and API key loading.
My goal is to successfully run the basic_stateful_session.py script which initializes a session, sends a message to an agent, and then logs the final session state.
However, I get the following error:
AttributeError: 'coroutine' object has no attribute 'state' and ValueError: Session not found (with RuntimeWarning: coroutine '...' was never awaited)
The script: basic_stateful_session.py
import uuid
from dotenv import load_dotenv
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from question_answering_agent import question_answering_agent
load_dotenv()
# Create a new session service to store state
session_service_stateful = InMemorySessionService()
initial_state = {
"user_name": "John Doe",
"user_preferences": """
I like to play Pickleball, Disc Golf, and Tennis.
My favorite food is Mexican.
My favorite TV show is Game of Thrones.
Loves it when people like and subscribe to his YouTube channel.
""",
}
# Create a NEW session
APP_NAME = "John Doe Bot"
USER_ID = "john_doe"
SESSION_ID = str(uuid.uuid4())
stateful_session = session_service_stateful.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID,
state=initial_state,
)
print("CREATED NEW SESSION:")
print(f"\tSession ID: {SESSION_ID}")
runner = Runner(
agent=question_answering_agent,
app_name=APP_NAME,
session_service=session_service_stateful,
)
new_message = types.Content(
role="user", parts=[types.Part(text="What is Johns favorite TV show?")]
)
for event in runner.run(
user_id=USER_ID,
session_id=SESSION_ID,
new_message=new_message,
):
if event.is_final_response():
if event.content and event.content.parts:
print(f"Final Response: {event.content.parts[0].text}")
print("==== Session Event Exploration ====")
session = session_service_stateful.get_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
# Log final Session state
print("=== Final Session State ===")
for key, value in session.state.items():
print(f"{key}: {value}")
The agent:
from google.adk.agents import Agent
# Create the root agent
question_answering_agent = Agent(
name="question_answering_agent",
model="gemini-2.0-flash",
description="Question answering agent",
instruction="""
You are a helpful assistant that answers questions about the user's preferences.
Here is some information about the user:
Name:
{user_name}
Preferences:
{user_preferences}
""",
)
1 Answer 1
The solution was pretty straightforward and wrapping the code into asyncio function:
import uuid
import asyncio # Import asyncio for running async functions
import os # Import os to check environment variables
from dotenv import load_dotenv
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.genai.client import Client
from question_answering_agent import question_answering_agent
# Loads environment variables from a .env file
load_dotenv()
google_api_key = os.getenv("GOOGLE_API_KEY")
# Define an async main function to encapsulate the asynchronous operations
async def main():
# Create a new session service to store state
session_service_stateful = InMemorySessionService()
# Define the initial state for the session. This information will be used by the agent.
# It can include user preferences, context, or any other relevant information.
initial_state = {
"user_name": "John Doe",
"user_preferences": """
I like to play Pickleball, Disc Golf, and Tennis.
My favorite food is Mexican.
My favorite TV show is Game of Thrones.
Loves it when people like and subscribe to his YouTube channel.
""",
}
# Create a NEW session
APP_NAME = "John Doe Bot"
USER_ID = "john_doe"
SESSION_ID = str(uuid.uuid4()) # Generates a unique session ID each time
# AWAIT the create_session call
# app name, user_id and session_id are used to identify the session
stateful_session = await session_service_stateful.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID,
state=initial_state, # initial_state is passed to the session
)
print("CREATED NEW SESSION:")
print(f"\tSession ID: {SESSION_ID}")
# Add a small delay. This can sometimes help with subtle timing issues or
# thread synchronization if the Runner operates in a separate thread.
await asyncio.sleep(0.1)
# A runner to run the agent with the session service
runner = Runner(
agent=question_answering_agent,
app_name=APP_NAME,
session_service=session_service_stateful,
)
new_message = types.Content(
role="user", parts=[types.Part(text="What is Johns favorite TV show?")]
)
# AWAIT the runner.run_async asynchronous generator
async for event in runner.run_async(
user_id=USER_ID,
session_id=SESSION_ID, # Ensure this SESSION_ID matches the one created
new_message=new_message,
):
if event.is_final_response():
if event.content and event.content.parts:
print(f"Final Response: {event.content.parts[0].text}")
print("==== Session Event Exploration ====")
# AWAIT the get_session call to retrieve the session for logging its final state
session = await session_service_stateful.get_session(
app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID
)
# Log final Session state
print("=== Final Session State ===")
if session and hasattr(session, 'state'): # Added a check to ensure session and state exist
for key, value in session.state.items():
print(f"{key}: {value}")
else:
print("Session or session state could not be retrieved.")
# This block ensures that main() is called when the script is executed
if __name__ == "__main__":
asyncio.run(main())
Comments
Explore related questions
See similar questions with these tags.