LangChain Integration
DeltaMemory integrates with LangChain to provide cognitive memory capabilities for your agents and chains.
Installation
pip install deltamemory langchain-coreQuick Start
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from deltamemory.langchain import get_deltamemory_tools
# Create DeltaMemory tools
tools = get_deltamemory_tools(
deltamemory_url="http://localhost:6969",
user_id="user-123"
)
# Create agent with memory tools
llm = ChatOpenAI(model="gpt-4")
agent = create_tool_calling_agent(llm, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)
# Agent decides when to use memory
response = agent_executor.invoke({
"input": "What are my preferences?"
})Integration Patterns
1. Agent with Memory Tools
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate
from deltamemory.langchain import get_deltamemory_tools
# Create tools
tools = get_deltamemory_tools(
deltamemory_url="http://localhost:6969",
user_id="user-123"
)
# Create prompt
prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant with memory capabilities.
Use the recall_memory tool when:
- User asks about their preferences or history
- User references past conversations
- Context would improve your response
Use the store_memory tool when:
- User shares important preferences
- User asks you to remember something
- Key facts emerge from the conversation"""),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
# Create agent
llm = ChatOpenAI(model="gpt-4")
agent = create_tool_calling_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True
)
# Use agent
response = agent_executor.invoke({
"input": "I prefer TypeScript over JavaScript"
})
print(response["output"])2. Chain with Manual Memory
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from deltamemory import DeltaMemory
client = DeltaMemory(base_url="http://localhost:6969")
async def add_memory_context(inputs: dict) -> dict:
"""Add memory context to the chain."""
recall = await client.recall(
inputs["question"],
user_id=inputs["user_id"]
)
inputs["memory_context"] = recall.context or "No relevant memories."
return inputs
# Create chain with memory
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant.\n\nMemory Context:\n{memory_context}"),
("human", "{question}")
])
chain = (
RunnablePassthrough()
| add_memory_context
| prompt
| ChatOpenAI(model="gpt-4")
)
# Use chain
response = await chain.ainvoke({
"question": "What are my preferences?",
"user_id": "user-123"
})
print(response.content)3. Custom Memory Tool
from langchain.tools import StructuredTool
from langchain_core.pydantic_v1 import BaseModel, Field
from deltamemory import DeltaMemory
from typing import Optional
client = DeltaMemory()
class RecallInput(BaseModel):
query: str = Field(description="What to search for in memory")
limit: Optional[int] = Field(default=5, description="Maximum results")
async def recall_memory(query: str, limit: int = 5) -> dict:
"""Search past conversations and user context."""
result = await client.recall(query, user_id="user-123", limit=limit)
return {
"profiles": [
f"{p.topic}::{p.sub_topic}: {p.content}"
for p in (result.profiles or [])
],
"events": [
f"{e.event_type}: {e.gist}"
for e in (result.events or [])
],
"context": result.context
}
recall_tool = StructuredTool(
name="recall_memory",
description="Search user's past conversations and preferences",
func=recall_memory,
args_schema=RecallInput,
coroutine=recall_memory
)Available Tools
recall_memory
Search past conversations and user context.
from deltamemory.langchain import get_deltamemory_tools
tools = get_deltamemory_tools(
deltamemory_url="http://localhost:6969",
user_id="user-123"
)
# Tool schema
{
"name": "recall_memory",
"description": "Search past conversations and user context",
"parameters": {
"query": "What to search for",
"limit": "Maximum number of results (default: 5)"
}
}store_memory
Store important information for future reference.
{
"name": "store_memory",
"description": "Store important user information",
"parameters": {
"content": "Information to remember",
"importance": "Importance level: low, medium, or high"
}
}Multi-User Support
Handle multiple users with collection-based isolation:
from deltamemory.langchain import get_deltamemory_tools
def create_user_agent(user_id: str):
"""Create an agent with user-specific memory."""
tools = get_deltamemory_tools(
deltamemory_url="http://localhost:6969",
user_id=user_id,
collection=f"user-{user_id}"
)
llm = ChatOpenAI(model="gpt-4")
agent = create_tool_calling_agent(llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools)
# Create agents for different users
agent_alice = create_user_agent("alice")
agent_bob = create_user_agent("bob")
# Each agent has isolated memory
response_alice = agent_alice.invoke({"input": "I prefer dark mode"})
response_bob = agent_bob.invoke({"input": "What are my preferences?"})
# Bob's agent won't see Alice's preferencesConversation Memory Integration
Combine DeltaMemory with LangChain's conversation memory:
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
from langchain_openai import ChatOpenAI
from deltamemory import DeltaMemory
client = DeltaMemory()
# Short-term conversation memory
conversation_memory = ConversationBufferMemory()
# Long-term cognitive memory
async def get_long_term_context(user_input: str, user_id: str) -> str:
recall = await client.recall(user_input, user_id=user_id)
return recall.context or ""
# Create chain with both memories
llm = ChatOpenAI(model="gpt-4")
chain = ConversationChain(
llm=llm,
memory=conversation_memory,
verbose=True
)
# Use both memories
async def chat(user_id: str, message: str):
# Get long-term memory
long_term_context = await get_long_term_context(message, user_id)
# Add to prompt
enhanced_message = f"""Long-term Memory:
{long_term_context}
User Message: {message}"""
# Generate response with both memories
response = chain.predict(input=enhanced_message)
# Store in long-term memory
await client.ingest(
f"User: {message}\nAssistant: {response}",
metadata={"user_id": user_id}
)
return responseRAG with Memory
Combine retrieval-augmented generation with cognitive memory:
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
from deltamemory import DeltaMemory
# Document store (RAG)
vectorstore = Chroma(embedding_function=OpenAIEmbeddings())
# Cognitive memory
client = DeltaMemory()
async def rag_with_memory(question: str, user_id: str):
# Get relevant documents (RAG)
docs = vectorstore.similarity_search(question, k=3)
doc_context = "\n\n".join([doc.page_content for doc in docs])
# Get user memory (DeltaMemory)
recall = await client.recall(question, user_id=user_id)
memory_context = recall.context or ""
# Combine both contexts
llm = ChatOpenAI(model="gpt-4")
response = llm.predict(f"""Answer the question using both the documents and user memory.
Documents:
{doc_context}
User Memory:
{memory_context}
Question: {question}""")
return responseStreaming Support
from langchain_openai import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from deltamemory import DeltaMemory
client = DeltaMemory()
async def chat_stream(user_id: str, message: str):
# Get memory context
recall = await client.recall(message, user_id=user_id)
# Stream response
llm = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
response = llm.predict(f"""Memory Context:
{recall.context}
User: {message}""")
# Store after streaming
await client.ingest(
f"User: {message}\nAssistant: {response}",
metadata={"user_id": user_id}
)
return responseError Handling
from deltamemory import DeltaMemory, ConnectionError
from langchain_openai import ChatOpenAI
client = DeltaMemory()
async def chat_with_fallback(user_id: str, message: str):
try:
# Try to get memory
recall = await client.recall(message, user_id=user_id)
context = recall.context
except ConnectionError:
# Fallback: continue without memory
print("DeltaMemory unavailable, continuing without memory")
context = ""
llm = ChatOpenAI(model="gpt-4")
response = llm.predict(f"""Context: {context}
User: {message}""")
# Try to store (non-blocking)
try:
await client.ingest(
f"User: {message}\nAssistant: {response}",
metadata={"user_id": user_id}
)
except Exception as e:
print(f"Failed to store memory: {e}")
return responseComplete Example: Customer Support Agent
import asyncio
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate
from langchain.tools import StructuredTool
from deltamemory.langchain import get_deltamemory_tools
async def main():
# Create memory tools
memory_tools = get_deltamemory_tools(
deltamemory_url="http://localhost:6969",
user_id="customer-123"
)
# Add custom support tools
def search_knowledge_base(query: str) -> str:
"""Search company knowledge base."""
return f"Knowledge base results for: {query}"
kb_tool = StructuredTool.from_function(
func=search_knowledge_base,
name="search_knowledge_base",
description="Search company knowledge base for product information"
)
all_tools = memory_tools + [kb_tool]
# Create agent
prompt = ChatPromptTemplate.from_messages([
("system", """You are a customer support agent.
Use recall_memory to:
- Check customer's past issues and preferences
- Personalize your responses
Use store_memory to:
- Save important customer information
- Record issue resolutions
Use search_knowledge_base to:
- Find product information
- Look up troubleshooting steps"""),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
llm = ChatOpenAI(model="gpt-4")
agent = create_tool_calling_agent(llm, all_tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=all_tools,
verbose=True
)
# Handle customer query
response = agent_executor.invoke({
"input": "I'm having the same issue as last time"
})
print(response["output"])
asyncio.run(main())