langchain
core[patch]: fix no current event loop for sql history in async mode
#22933
Merged

core[patch]: fix no current event loop for sql history in async mode #22933

mackong
mackong342 days ago (edited 339 days ago)
  • Description: When use RunnableWithMessageHistory/SQLChatMessageHistory in async mode, we'll get the following error:
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'asyncio_3'.")

which throwed by

. and no message history will be add to database.

In this patch, a new _aexit_history function which will'be called in async mode is added, and in turn aadd_messages will be called.

In this patch, we use afunc attribute of a Runnable to check if the end listener should be run in async mode or not.

vercel
vercel342 days ago (edited 336 days ago)

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
langchain ⬜️ Ignored (Inspect) Visit Preview Jun 21, 2024 1:57pm
dosubot dosubot added size:M
dosubot dosubot added Ɑ: Runnables
dosubot dosubot added 🤖:bug
mackong
mackong342 days ago

@pprados Please help to review this PR.

mackong mackong force pushed 342 days ago
mackong mackong force pushed 340 days ago
mackong mackong force pushed 340 days ago
mackong mackong force pushed 340 days ago
hwchase17
hwchase17339 days ago

the issue you linked to is closed. can you share the code you are running to reproduce this error?

mackong
mackong339 days ago (edited 339 days ago)

@hwchase17 Here is a sample langserve code to reproduce this error.

Requirements

langchain-openai==0.1.8
langserve[server]==0.2.2
langchain==0.2.3
langchain-community==0.2.4
pydantic==1.10.13
asyncmy==0.2.9

LangServe server side code

from typing import AsyncIterator, Any

from fastapi import APIRouter, FastAPI
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories.sql import SQLChatMessageHistory
from langchain_core.runnables import Runnable
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.runnables.utils import ConfigurableFieldSpec
from langchain_core.tools import Tool
from langchain_openai import AzureChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import (
    ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate,
    SystemMessagePromptTemplate
)
from langchain.schema.runnable import RunnableLambda
from langserve import add_routes
from langserve.pydantic_v1 import BaseModel, Field
from sqlalchemy.ext.asyncio import create_async_engine


# Config
sql_server_url = "mysql+asyncmy://dbuser:dbpass@localhost:3306/history"

azure_endpoint = "fixme"
azure_deployment = "fixme"
openai_api_version = "fixme"
openai_api_key = "fixme"


# Model
class LLMInput(BaseModel):
    question: str = Field(
        ...,
        description="user question",
    )
    system_prompt: str = Field(
        default="You are a helpful AI assistant",
        description="system prompt",
    )


# History Stuff
async_sql_engine = create_async_engine(sql_server_url, pool_recycle=600)

history_factory_config = [
    ConfigurableFieldSpec(
        id="user_id",
        annotation=str,
        name="User ID",
        description="Unique identifier for the user.",
        default="",
        is_shared=True,
    ),
    ConfigurableFieldSpec(
        id="conversation_id",
        annotation=str,
        name="Conversation ID",
        description="Unique identifier for the conversation.",
        default="",
        is_shared=True,
    ),
]

def get_session_history(
    user_id: str,
    conversation_id: str,
) -> BaseChatMessageHistory:
    """Get a chat history from a session ID."""
    session_id = f"{user_id}_{conversation_id}"

    return SQLChatMessageHistory(
        session_id=session_id,
        connection=async_sql_engine,
    )


# Tool Stuff
def search(query: str, **kwargs) -> str:
    return f"{query} found"


class SearchToolInput(BaseModel):
    """Input for music tool."""
    query: str = Field(description="keyword for search")


search_tool = Tool(
    name="web_search",
    description="web_search is a tool to search online information.",
    func=search,
    args_schema=SearchToolInput,
)


def build_llm_chain(llm_input: LLMInput) -> Runnable:
    llm = AzureChatOpenAI(
        azure_endpoint=azure_endpoint,
        azure_deployment=azure_deployment,
        openai_api_version=openai_api_version,
        openai_api_key=openai_api_key,
        streaming=True,
        temperature=0.01,
        max_tokens=4000,
    )
    tools = [search_tool]
    prompt = ChatPromptTemplate.from_messages([
        SystemMessagePromptTemplate.from_template(llm_input.system_prompt),
        MessagesPlaceholder(variable_name="history"),
        HumanMessagePromptTemplate.from_template("{question}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ])
    agent = create_tool_calling_agent(
        llm, tools, prompt,
    )
    return AgentExecutor(
        agent=agent,
        tools=tools,
    )


def build_chain(llm_input: LLMInput) -> Runnable:
    llm_chain = build_llm_chain(llm_input)

    return RunnableWithMessageHistory(
        llm_chain,
        get_session_history,
        input_messages_key="question",
        history_messages_key="history",
        history_factory_config=history_factory_config
    ).with_types(
        input_type=LLMInput
    )

async def llm_main(_input: dict[str, Any]) -> AsyncIterator[Any]:
    _input["question"] = _input["question"].lstrip("\n").rstrip()
    _input["system_prompt"] = _input["system_prompt"] or "You are a helpful AI assistant"

    llm_input: LLMInput = LLMInput.parse_obj(_input)

    chain = build_chain(llm_input)
    config = {
        "configurable": {
            "user_id": "user1",
            "conversation_id": "conv1",
        }
    }

    async for event in chain.astream_events(_input, config, version="v1"):
        yield event


router = APIRouter()

add_routes(
    router,
    RunnableLambda(llm_main).with_types(input_type=LLMInput),
    enabled_endpoints=["invoke", "batch", "stream"]
)

app = FastAPI(
    title="Test Server",
    version="0.01",
    description="Test Server Powered by LangChain&LLM",
)
app.include_router(router)

Save it as async_agent.py, and change configurations about sql server and azure openai to your own at the beginning of the file , then run like

uvicorn async_agent:app --host 0.0.0.0

LangServe client side code

from langserve import RemoteRunnable

client = RemoteRunnable('http://localhost:8000/')
for chunk in yuan.stream({'question': "What's langchain"}):
    print(chunk)

Save it as agent_client.py, and run like

python agent_client.py

Result

On the client side, you will see many output about the event, and final answer about the question is got.
But on the server side, you will see logs like

INFO:     Will watch for changes in these directories: ['/home/mackong/Codes/playground/llm']
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [828125] using WatchFiles
INFO:     Started server process [828140]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     127.0.0.1:49590 - "POST /stream HTTP/1.1" 200 OK
Parent run 8e230381-f1bb-42ab-835b-635851ef9bba not found for run 5959e910-f062-4229-9839-1d3de2f22007. Treating as a root run.
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'ThreadPoolExecutor-0_0'.")

where the error is throwed by

and no message history will be add to database.

mackong mackong force pushed 339 days ago
pprados
mackong mackong force pushed 339 days ago
mackong mackong force pushed 339 days ago
mackong
pprados
mackong mackong force pushed 339 days ago
ccurme ccurme added community
ccurme ccurme removed community
mackong mackong force pushed 338 days ago
mackong
mackong mackong force pushed 338 days ago
ccurme ccurme added Ɑ: core
eyurtsev
eyurtsev commented on 2024-06-19
eyurtsev337 days ago

Looks overall reasonable to me. It'll generate a trace that's a bit uglier than before, but I think more important to fix the async path

libs/core/langchain_core/runnables/history.py
483499 output_messages = self._get_output_messages(output_val)
484500 hist.add_messages(input_messages + output_messages)
485501
502
async def _aexit_history(self, run: Run, config: RunnableConfig) -> None:
eyurtsev337 days ago

@mackong would you mind unit testing code to cover the async path?

mackong337 days ago (edited 337 days ago)

@eyurtsev unit testing added, but there is a unit will be failed caused by a unrelated issue. see https://github.com/langchain-ai/langchain/actions/runs/9594957636/job/26458682186?pr=22933#step:6:164

Now AsyncRootListenersTracer's schema format is original, so on_chat_model_start will fallback to on_llm_start, then type of Run's input will be str not BaseMessage, then it will be ignored by ChatMessageHistory's add_message.

if not isinstance(message, BaseMessage):
raise ValueError
self.messages.append(message)

I have create a PR #23214 which fix the issue, please review #23214 first.

mackong336 days ago

@eyurtsev now added unit tests passed

eyurtsev eyurtsev assigned eyurtsev eyurtsev 337 days ago
mackong mackong force pushed 337 days ago
dosubot dosubot removed size:M
dosubot dosubot added size:L
mackong mackong force pushed 337 days ago
mackong core[patch]: fix no current event loop for sql history in async mode
9daee92d
mackong mackong force pushed to 9daee92d 336 days ago
eyurtsev
eyurtsev approved these changes on 2024-06-21
dosubot dosubot added lgtm
eyurtsev eyurtsev merged 360a70c8 into master 336 days ago
mackong mackong deleted the fix-async-history branch 334 days ago

Login to write a write a comment.

Login via GitHub

Reviewers
Assignees
Labels
Milestone