langchain
community[patch]: Make sql record manager fully compatible with async
#20735
Merged

community[patch]: Make sql record manager fully compatible with async #20735

pprados
pprados1 year ago (edited 1 year ago)

The _amake_session() method does not allow modifying the self.session_factory with
anything other than async_sessionmaker. This prohibits advanced uses of index().

In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the index() API.
This API proposes to use an SQL-type record manager.

In a classic use case, using SQLRecordManager and a vector database, it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector database.

With the PR we are proposing for langchain-postgres,
it is now possible to guarantee the consistency of the import of chunks into
a vector database. It's possible only if the outer session is built
with the connection.

def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)

The same thing is possible asynchronously, but a bug in sql_record_manager.py
in _amake_session() must first be fixed.

    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session

Then, it is possible to do the same thing asynchronously:

async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
pprados Add MistralAI provider
a5afb789
pprados Fix _amake_session()
1909eb31
vercel
vercel1 year ago (edited 1 year ago)

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
langchain ⬜️ Ignored (Inspect) Visit Preview May 6, 2024 11:35am
pprados pprados marked this pull request as ready for review 1 year ago
dosubot dosubot added size:XS
dosubot dosubot added Ɑ: vector store
dosubot dosubot added 🔌: postgres
dosubot dosubot added 🤖:improvement
eyurtsev
eyurtsev1 year ago

This code is mutating in place attributes on the classes.

        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker

While attributes are not marked explicitly as private, it's safest to still treat these as such.


Are you able to pass required information via the initializer? If so, what information do you need?

    def __init__(
        self,
        namespace: str,
        *,
        connection: Optional[Union[??]], # <-- Is there something that can be provided here that will accommodate your use use case,
        engine: Optional[Union[Engine, AsyncEngine]] = None,
        db_url: Union[None, str, URL] = None,
        engine_kwargs: Optional[Dict[str, Any]] = None,
        async_mode: bool = False,
    ) -> None:
eyurtsev eyurtsev assigned eyurtsev eyurtsev 1 year ago
pprados
pprados1 year ago

The SQLRecordManager can not be initialized with connection. And, it's not enough.

The session_maker = scoped_session(sessionmaker(bind=connection)) and async_scoped_session(async_sessionmaker(bind=connection),scopefunc=current_task) must be used by all SQL tools.
May be, it's possible to add a parameter with current session_maker for PGVector and SQLRecordManager.

It's a complex process. The session_maker MUST be initialized with the CURRENT connection (one connection by thread or async job. It's not possible to share the connection with different threads. It's not reentrant). Only with this construction, it's possible to share all the sub-transaction with the outer transaction, and guarantee full execution of the import, whatever happens.

The LCEL approch propose to use a singleton to describe the chain. So, you must use an Engine, and not a Connection. The connection is open on when it's need, one for each job.

The attributes record_manager.session_factory and pgvector.session_maker may be updated for that.

pprados Merge branch 'master' into pprados/fix_sql_record_manager
f68eb79d
pprados
pprados1 year ago

Hello from Paris 😄

To fully grasp the difficulty of this, one must understand that scoped_session() is a class, which has, as an attribute, a local() allowing variables associated with the current thread. Therefore, within the same thread, multiple instances of scoped_session() can exist, each having its own local variable to keep the session active. Hence, if scoped_session() is used in both the PGVector() and SQLRecordManager() classes, they will not share the same session, leading to inconsistency in case of a crash during import.

One could have a global variable global_scoped_session with an instance of scoped_session() for langchain, and ensure that all SQLAlchemy APIs use this global variable for session creation. Here, similarly, it's the caller, responsible for creating the first session, who can decide how to create it.

However, this doesn't work for our use case because, during the initialization of the global_scoped_session variable, a sessionmaker(bind=connection) needs to be created, and it's not possible to use a connection for a global variable. Possibly, a sessionmaker(bind=engine), but it's strange to have an engine imposed by langchain globally.

One could imagine proposing an alternative to scoped_session that uses a global variable for local(): global_scoped_session. Thus, if both classes default to using global_scoped_session(), if the invocation is made directly, a session is created. There's no longer a need to modify session_(maker|factory).

If, before the call, global_scoped_session() is invoked, PGVector and SQLRecordManager use this session. Therefore, it's the caller who can decide how the original session will be created.

So, to conclude, the only viable solution, in my opinion, is the one I propose. By offering the ability to modify the session_factory and session_maker, my PR can be validated 😄.

baskaryan core[patch]: Release 0.1.46 (#20891)
b329fe7b
chosh0615 partner: Upstage quick documentation update (#20869)
842d84f2
baskaryan core[minor], langchain[patch], community[patch]: mv StructuredQuery (…
a5643858
mokeyish openai[patch]: Allow disablling safe_len_embeddings(OpenAIEmbeddings)…
435bc561
JasonSTong community[patch]: add BeautifulSoupTransformer remove_unwanted_classn…
123f19e4
multiple: remove external repo mds (#20896)
033d3102
community[mionr]: add Jina Reranker in retrievers module (#19406)
47f8627f
paul-paliychuk docs: Fix misplaced zep cloud example links (#20867)
8bbd8811
upstage: release 0.1.2 (#20898)
9fdaee08
fzowl docs: Use voyage-law-2 in the examples (#20784)
fd186419
samanhappy docs: Fix broken link in agents.ipynb (#20872)
73551518
coolbeevip community[patch]: add HTTP response headers Content-Type to metadata …
b58bb766
tomasonjo community[patch]: Support passing graph object to Neo4j integrations …
648a5957
merdan-9 docs: hide model import in multiple_tools.ipynb (#20883)
25b39103
ccurme core, community: deprecate tool.__call__ (#20900)
18d53da9
ccurme docs: update chat model feature table (#20899)
97ba189e
ccurme mistral, openai: support custom tokenizers in chat models (#20901)
4417aa1c
AndresAlgaba community[patch]: deprecate persist method in Chroma (#20855)
b0d7bab1
davidefantiniIntel community: fix tqdm import (#20263)
5ec5e90c
klaus-xiong community[minor]: Add relyt vector database (#20316)
feca792a
eyurtsev cli[minor]: Add __version__ (#20903)
b5d5c97f
rahul-trip community[patch]: Add semantic info to metadata, classified by pebblo…
3c702a77
shane-huang community[patch]: add more data types support to ipex-llm llm integra…
471718c8
mjschock experimental[patch]: remove \n from AutoGPT feedback_tool exit check …
88df1a7a
anish749 core[patch]: improve comma separated list output parser to handle non…
6d721cee
mjschock experimental[patch]: return from HuggingGPT task executor task.run() …
7b2d5ed1
am-kinetica community[minor]: Implemented Kinetica Document Loader and added note…
2febebd6
dristysrivastava community[patch]: Add support for pebblo server and client version (#…
3ce27b36
mattgotteiner community[patch]: Add initial tests for AzureSearch vector store (#17…
502a76d9
ccurme langchain: support PineconeVectorStore in self query retriever (#20905)
147a43cf
hinthornw Use lstv2 (#20747)
474508e8
pprados FIX
170888b0
efriis efriis added partner
efriis efriis assigned efriis efriis 1 year ago
dosubot dosubot removed size:XS
dosubot dosubot added size:XXL
pprados Merge branch 'master' into pprados/fix_sql_record_manager
0d5de1b2
dosubot dosubot removed size:XXL
dosubot dosubot added size:XS
pprados Merge branch 'master' into pprados/fix_sql_record_manager
cad3681c
pprados Fix lint
8f2d8666
pprados pprados marked this pull request as draft 1 year ago
pprados Fix lint
cca03bdf
pprados pprados marked this pull request as ready for review 1 year ago
dosubot dosubot removed size:XS
dosubot dosubot added size:S
pprados
pprados1 year ago

Hello @eyurtsev

Can you reconsider this very important small PR? Without it, it's not possible to have a resilient RAG in production.

eyurtsev
eyurtsev approved these changes on 2024-04-22
libs/core/langchain_core/output_parsers/pydantic.py
6060 json_object = super().parse_result(result)
6161 return self._parse_obj(json_object)
6262
63
def parse_result(
eyurtsev1 year ago

Shouldn't be included

libs/community/langchain_community/indexes/_sql_record_manager.py
176176 """Create a session and close it after use."""
177177
178 if not isinstance(self.session_factory, async_sessionmaker):
178
if not isinstance(self.engine, AsyncEngine):
eyurtsev1 year ago

Why doesn't the original code work?

dosubot dosubot added lgtm
eyurtsev eyurtsev changed the title Make sql record manager fully compatible with async community[patch]: Make sql record manager fully compatible with async 1 year ago
eyurtsev eyurtsev merged 7be68228 into master 1 year ago
pprados pprados deleted the pprados/fix_sql_record_manager branch 339 days ago

Login to write a write a comment.

Login via GitHub

Reviewers
Assignees
Labels
Milestone