The latest updates on your projects. Learn more about Vercel for Git ↗︎
Name | Status | Preview | Comments | Updated (UTC) |
---|---|---|---|---|
langchain | ⬜️ Ignored (Inspect) | Visit Preview | May 6, 2024 11:35am |
This code is mutating in place attributes on the classes.
record_manager.session_factory = session_maker
pgvector.session_maker = session_maker
While attributes are not marked explicitly as private, it's safest to still treat these as such.
Are you able to pass required information via the initializer? If so, what information do you need?
def __init__(
self,
namespace: str,
*,
connection: Optional[Union[??]], # <-- Is there something that can be provided here that will accommodate your use use case,
engine: Optional[Union[Engine, AsyncEngine]] = None,
db_url: Union[None, str, URL] = None,
engine_kwargs: Optional[Dict[str, Any]] = None,
async_mode: bool = False,
) -> None:
The SQLRecordManager can not be initialized with connection. And, it's not enough.
The session_maker = scoped_session(sessionmaker(bind=connection))
and async_scoped_session(async_sessionmaker(bind=connection),scopefunc=current_task)
must be used by all SQL tools.
May be, it's possible to add a parameter with current session_maker for PGVector and SQLRecordManager.
It's a complex process. The session_maker
MUST be initialized with the CURRENT connection (one connection by thread or async job. It's not possible to share the connection with different threads. It's not reentrant). Only with this construction, it's possible to share all the sub-transaction with the outer transaction, and guarantee full execution of the import, whatever happens.
The LCEL approch propose to use a singleton to describe the chain. So, you must use an Engine, and not a Connection. The connection is open on when it's need, one for each job.
The attributes record_manager.session_factory
and pgvector.session_maker
may be updated for that.
Hello from Paris 😄
To fully grasp the difficulty of this, one must understand that scoped_session()
is a class, which has, as an attribute, a local()
allowing variables associated with the current thread. Therefore, within the same thread, multiple instances of scoped_session()
can exist, each having its own local variable to keep the session active. Hence, if scoped_session()
is used in both the PGVector()
and SQLRecordManager()
classes, they will not share the same session, leading to inconsistency in case of a crash during import.
One could have a global variable global_scoped_session
with an instance of scoped_session()
for langchain, and ensure that all SQLAlchemy APIs use this global variable for session creation. Here, similarly, it's the caller, responsible for creating the first session, who can decide how to create it.
However, this doesn't work for our use case because, during the initialization of the global_scoped_session
variable, a sessionmaker(bind=connection)
needs to be created, and it's not possible to use a connection
for a global variable. Possibly, a sessionmaker(bind=engine)
, but it's strange to have an engine imposed by langchain globally.
One could imagine proposing an alternative to scoped_session
that uses a global variable for local()
: global_scoped_session
. Thus, if both classes default to using global_scoped_session()
, if the invocation is made directly, a session is created. There's no longer a need to modify session_(maker|factory)
.
If, before the call, global_scoped_session()
is invoked, PGVector
and SQLRecordManager
use this session. Therefore, it's the caller who can decide how the original session will be created.
So, to conclude, the only viable solution, in my opinion, is the one I propose. By offering the ability to modify the session_factory
and session_maker
, my PR can be validated 😄.
Hello @eyurtsev
Can you reconsider this very important small PR? Without it, it's not possible to have a resilient RAG in production.
60 | 60 | json_object = super().parse_result(result) | |
61 | 61 | return self._parse_obj(json_object) | |
62 | 62 | ||
63 | def parse_result( |
Shouldn't be included
176 | 176 | """Create a session and close it after use.""" | |
177 | 177 | ||
178 | if not isinstance(self.session_factory, async_sessionmaker): | ||
178 | if not isinstance(self.engine, AsyncEngine): |
Why doesn't the original code work?
Login to write a write a comment.
The
_amake_session()
method does not allow modifying theself.session_factory
withanything other than
async_sessionmaker
. This prohibits advanced uses ofindex()
.In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
index()
API.This API proposes to use an SQL-type record manager.
In a classic use case, using
SQLRecordManager
and a vector database, it is impossibleto guarantee the consistency of the import. Indeed, if a crash occurs during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector database.
With the PR we are proposing for
langchain-postgres
,it is now possible to guarantee the consistency of the import of chunks into
a vector database. It's possible only if the outer session is built
with the connection.
The same thing is possible asynchronously, but a bug in
sql_record_manager.py
in
_amake_session()
must first be fixed.Then, it is possible to do the same thing asynchronously: