The latest updates on your projects. Learn more about Vercel for Git ↗︎
Name | Status | Preview | Comments | Updated (UTC) |
---|---|---|---|---|
langchain | ⬜️ Ignored (Inspect) | Visit Preview | Jun 21, 2024 1:57pm |
@pprados Please help to review this PR.
the issue you linked to is closed. can you share the code you are running to reproduce this error?
@hwchase17 Here is a sample langserve code to reproduce this error.
langchain-openai==0.1.8
langserve[server]==0.2.2
langchain==0.2.3
langchain-community==0.2.4
pydantic==1.10.13
asyncmy==0.2.9
from typing import AsyncIterator, Any
from fastapi import APIRouter, FastAPI
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories.sql import SQLChatMessageHistory
from langchain_core.runnables import Runnable
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.runnables.utils import ConfigurableFieldSpec
from langchain_core.tools import Tool
from langchain_openai import AzureChatOpenAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import (
ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate,
SystemMessagePromptTemplate
)
from langchain.schema.runnable import RunnableLambda
from langserve import add_routes
from langserve.pydantic_v1 import BaseModel, Field
from sqlalchemy.ext.asyncio import create_async_engine
# Config
sql_server_url = "mysql+asyncmy://dbuser:dbpass@localhost:3306/history"
azure_endpoint = "fixme"
azure_deployment = "fixme"
openai_api_version = "fixme"
openai_api_key = "fixme"
# Model
class LLMInput(BaseModel):
question: str = Field(
...,
description="user question",
)
system_prompt: str = Field(
default="You are a helpful AI assistant",
description="system prompt",
)
# History Stuff
async_sql_engine = create_async_engine(sql_server_url, pool_recycle=600)
history_factory_config = [
ConfigurableFieldSpec(
id="user_id",
annotation=str,
name="User ID",
description="Unique identifier for the user.",
default="",
is_shared=True,
),
ConfigurableFieldSpec(
id="conversation_id",
annotation=str,
name="Conversation ID",
description="Unique identifier for the conversation.",
default="",
is_shared=True,
),
]
def get_session_history(
user_id: str,
conversation_id: str,
) -> BaseChatMessageHistory:
"""Get a chat history from a session ID."""
session_id = f"{user_id}_{conversation_id}"
return SQLChatMessageHistory(
session_id=session_id,
connection=async_sql_engine,
)
# Tool Stuff
def search(query: str, **kwargs) -> str:
return f"{query} found"
class SearchToolInput(BaseModel):
"""Input for music tool."""
query: str = Field(description="keyword for search")
search_tool = Tool(
name="web_search",
description="web_search is a tool to search online information.",
func=search,
args_schema=SearchToolInput,
)
def build_llm_chain(llm_input: LLMInput) -> Runnable:
llm = AzureChatOpenAI(
azure_endpoint=azure_endpoint,
azure_deployment=azure_deployment,
openai_api_version=openai_api_version,
openai_api_key=openai_api_key,
streaming=True,
temperature=0.01,
max_tokens=4000,
)
tools = [search_tool]
prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(llm_input.system_prompt),
MessagesPlaceholder(variable_name="history"),
HumanMessagePromptTemplate.from_template("{question}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_tool_calling_agent(
llm, tools, prompt,
)
return AgentExecutor(
agent=agent,
tools=tools,
)
def build_chain(llm_input: LLMInput) -> Runnable:
llm_chain = build_llm_chain(llm_input)
return RunnableWithMessageHistory(
llm_chain,
get_session_history,
input_messages_key="question",
history_messages_key="history",
history_factory_config=history_factory_config
).with_types(
input_type=LLMInput
)
async def llm_main(_input: dict[str, Any]) -> AsyncIterator[Any]:
_input["question"] = _input["question"].lstrip("\n").rstrip()
_input["system_prompt"] = _input["system_prompt"] or "You are a helpful AI assistant"
llm_input: LLMInput = LLMInput.parse_obj(_input)
chain = build_chain(llm_input)
config = {
"configurable": {
"user_id": "user1",
"conversation_id": "conv1",
}
}
async for event in chain.astream_events(_input, config, version="v1"):
yield event
router = APIRouter()
add_routes(
router,
RunnableLambda(llm_main).with_types(input_type=LLMInput),
enabled_endpoints=["invoke", "batch", "stream"]
)
app = FastAPI(
title="Test Server",
version="0.01",
description="Test Server Powered by LangChain&LLM",
)
app.include_router(router)
Save it as async_agent.py
, and change configurations about sql server and azure openai to your own at the beginning of the file , then run like
uvicorn async_agent:app --host 0.0.0.0
from langserve import RemoteRunnable
client = RemoteRunnable('http://localhost:8000/')
for chunk in yuan.stream({'question': "What's langchain"}):
print(chunk)
Save it as agent_client.py
, and run like
python agent_client.py
On the client side, you will see many output about the event, and final answer about the question is got.
But on the server side, you will see logs like
INFO: Will watch for changes in these directories: ['/home/mackong/Codes/playground/llm']
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO: Started reloader process [828125] using WatchFiles
INFO: Started server process [828140]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: 127.0.0.1:49590 - "POST /stream HTTP/1.1" 200 OK
Parent run 8e230381-f1bb-42ab-835b-635851ef9bba not found for run 5959e910-f062-4229-9839-1d3de2f22007. Treating as a root run.
Error in RootListenersTracer.on_chain_end callback: RuntimeError("There is no current event loop in thread 'ThreadPoolExecutor-0_0'.")
where the error is throwed by
and no message history will be add to database.Looks overall reasonable to me. It'll generate a trace that's a bit uglier than before, but I think more important to fix the async path
483 | 499 | output_messages = self._get_output_messages(output_val) | |
484 | 500 | hist.add_messages(input_messages + output_messages) | |
485 | 501 | ||
502 | async def _aexit_history(self, run: Run, config: RunnableConfig) -> None: |
@mackong would you mind unit testing code to cover the async path?
@eyurtsev unit testing added, but there is a unit will be failed caused by a unrelated issue. see https://github.com/langchain-ai/langchain/actions/runs/9594957636/job/26458682186?pr=22933#step:6:164
Now AsyncRootListenersTracer's schema format is original, so on_chat_model_start
will fallback to on_llm_start
, then type of Run's input will be str not BaseMessage, then it will be ignored by ChatMessageHistory's add_message.
langchain/libs/core/tests/unit_tests/fake/memory.py
Lines 20 to 22 in ad7f2ec
I have create a PR #23214 which fix the issue, please review #23214 first.
@eyurtsev now added unit tests passed
Login to write a write a comment.
which throwed by
langchain/libs/community/langchain_community/chat_message_histories/sql.py
Line 259 in ddfbca3
In this patch, a new _aexit_history function which will'be called in async mode is added, and in turn aadd_messages will be called.
In this patch, we use
afunc
attribute of a Runnable to check if the end listener should be run in async mode or not.