Persist RAG Memory Across Turns with Lakebase PostgresSaver
Swap LangChain's InMemorySaver for PostgresSaver backed by Databricks Lakebase to maintain conversation history in RAG agents, enabling context-aware multi-turn responses like resolving 'it' to prior mentions across Model Serving requests.
Parse and Index Multimodal PDFs for Reliable Retrieval
Use Databricks' ai_parse_document(version="2.0") to handle complex PDFs with text, tables (rendered as HTML), images, and diagrams in one call, outperforming PyPDF2 or Unstructured for enterprise docs. Load PDFs from Unity Catalog Volumes as binary files via Spark, parse into a VARIANT column with structured elements (type: text/table/figure/section_header, content, optional AI-generated descriptions), then save to a Delta table.
Extract plain text by concatenating elements with == page == separators using a custom UDF. Chunk with LangChain's RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200, separators=["== page ==", "\n\n", ...]) in a Pandas UDF for Spark scalability, adding unique IDs via monotonically_increasing_id(). Enable Change Data Feed (ALTER TABLE ... SET TBLPROPERTIES (delta.enableChangeDataFeed = true)), then create a Delta Sync Vector Search index with databricks-gte-large-en (1024-dim embeddings, 8192-token context). Databricks auto-computes embeddings on index (from chunk column) and queries—no direct model calls needed. Retrieve top-5 results for queries like "How does the system prevent overheating?" to pull relevant chunks with paths.
This pipeline ensures fresh indexing via TRIGGERED sync, handling updates without reprocessing.
Implement Persistent Conversation Memory via Lakebase
Standard RAG demos fail multi-turn because InMemorySaver loses state per Model Serving request. Fix by provisioning Lakebase Autoscaling (managed Postgres 17, ~1 min setup via UI: Apps > Lakebase > Autoscaling > New project). Use w.postgres API (not w.database for legacy): fetch host (ep.status.hosts.host), endpoint, generate token via w.postgres.generate_database_credential(endpoint).
URL-encode username (urllib.parse.quote(username)) to handle @ in emails. Connect with psycopg.connect(db_uri, autocommit=True, row_factory=dict_row), then PostgresSaver(conn=conn).setup()—creates checkpoints, checkpoint_writes, checkpoint_blobs, checkpoint_migrations tables. Avoid PostgresSaver.from_conn_string() as it yields a context manager, not persistent instance.
In the agent, pass this checkpointer instead of InMemorySaver. Use stable thread_id (e.g., "demo-session-001") in config={"configurable": {"thread_id": ...}} for agent.invoke(). Agent loads full history from Lakebase per turn, resolving anaphora like "it" to prior context (e.g., Turn 1: "What is Orion?"; Turn 2: "How does it handle overheating?").
Deploy Production Agent with MLflow and Validate
Package agent in agent.py as mlflow.pyfunc.ResponsesAgent: load config from agent-config.yaml (LLM endpoint, index, Lakebase details, num_results=3), build ChatDatabricks + VectorSearchRetrieverTool + checkpointer, handle thread_id from custom_inputs (default session-{uuid4()}). Log model with MLflow (mlflow.pyfunc.log_model), binding resources like DatabricksVectorSearchIndex and DatabricksServingEndpoint; pip reqs include langgraph-checkpoint-postgres, psycopg[binary], databricks-sdk>=0.89.0. Register to Unity Catalog, deploy via agents.deploy(scale_to_zero_enabled=True).
Query endpoint with OpenAI client, passing thread_id in custom_inputs for persistence. Validate: Reuse thread_id across calls; follow-up responses reference prior details (e.g., "motion", "vision subsystems"). Gotchas: Distinct w.postgres API (30min debug); direct psycopg.connect; correct host path (ep.status.hosts.host); no input_example for custom inputs—use output_path in mlflow.models.predict tests.
Outcome: Production RAG agent with verifiable memory across stateless requests, Lakebase accumulating thread histories for scalable, context-aware Q&A.