-
-
Save jvelezmagic/f3653cc2ddab1c91e86751c8b423a1b6 to your computer and use it in GitHub Desktop.
| """QA Chatbot streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma. | |
| Features | |
| -------- | |
| - Persistent Chat Memory: | |
| Stores chat history in a local file. | |
| - Persistent Vector Store: | |
| Stores document embeddings in a local vector store. | |
| - Standalone Question Generation: | |
| Rephrases follow-up questions to standalone questions in their original language. | |
| - Document Retrieval: | |
| Searches and retrieves relevant documents based on user queries. | |
| - Context-Aware Responses: | |
| Generates responses based on a combined context from relevant documents. | |
| - Streaming Responses: | |
| Streams responses in real time either as plain text or as Server-Sent Events (SSE). | |
| SSE also sends the relevant documents as context. | |
| Next Steps | |
| ---------- | |
| - Add a proper exception handling mechanism during the streaming process. | |
| - Add pruning to the conversation buffer memory to prevent it from growing too large. | |
| - Combine documents using a more sophisticated method than simply concatenating them. | |
| Usage | |
| ----- | |
| 1. Install dependencies: | |
| ```bash | |
| pip install fastapi==0.99.1 uvicorn==0.23.2 python-dotenv==1.0.0 chromadb==0.4.5 tiktoken==0.4.0 langchain==0.0.257 openai==0.27.8 | |
| ``` | |
| or | |
| ```bash | |
| poetry install | |
| ``` | |
| 2. Run the server: | |
| ```bash | |
| uvicorn main:app --reload | |
| ``` | |
| 3. curl the server: | |
| With plain text: | |
| ```bash | |
| curl --no-buffer -X 'POST' \ | |
| 'http://localhost:8000/chat' \ | |
| -H 'accept: text/plain' \ | |
| -H 'Content-Type: application/json' \ | |
| -d '{ | |
| "session_id": "session_1", | |
| "message": "who'\''s playing in the river?" | |
| }' | |
| ``` | |
| With SSE: | |
| ```bash | |
| curl --no-buffer -X 'POST' \ | |
| 'http://localhost:8000/chat/sse/' \ | |
| -H 'accept: text/event-stream' \ | |
| -H 'Content-Type: application/json' \ | |
| -d '{ | |
| "session_id": "session_2", | |
| "message": "who'\''s playing in the garden?" | |
| }' | |
| Cheers! | |
| @jvelezmagic""" | |
| import os | |
| from functools import lru_cache | |
| from typing import AsyncGenerator, Literal | |
| from fastapi import Depends, FastAPI | |
| from fastapi.responses import StreamingResponse | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.embeddings import OpenAIEmbeddings | |
| from langchain.memory import ConversationBufferMemory, FileChatMessageHistory | |
| from langchain.prompts import PromptTemplate | |
| from langchain.schema import BaseChatMessageHistory, Document, format_document | |
| from langchain.schema.output_parser import StrOutputParser | |
| from langchain.vectorstores import Chroma | |
| from pydantic import BaseModel, BaseSettings | |
| class Settings(BaseSettings): | |
| openai_api_key: str | |
| class Config: # type: ignore | |
| env_file = ".env" | |
| env_file_encoding = "utf-8" | |
| class ChatRequest(BaseModel): | |
| session_id: str | |
| message: str | |
| class ChatSSEResponse(BaseModel): | |
| type: Literal["context", "start", "streaming", "end", "error"] | |
| value: str | list[Document] | |
| @lru_cache() | |
| def get_settings() -> Settings: | |
| return Settings() # type: ignore | |
| @lru_cache() | |
| def get_vectorstore() -> Chroma: | |
| settings = get_settings() | |
| embeddings = OpenAIEmbeddings(openai_api_key=settings.openai_api_key) # type: ignore | |
| vectorstore = Chroma( | |
| collection_name="chroma", | |
| embedding_function=embeddings, | |
| persist_directory="chroma", | |
| ) | |
| return vectorstore | |
| def combine_documents( | |
| docs: list[Document], | |
| document_prompt: PromptTemplate = PromptTemplate.from_template("{page_content}"), | |
| document_separator: str = "\n\n", | |
| ) -> str: | |
| doc_strings = [format_document(doc, document_prompt) for doc in docs] | |
| return document_separator.join(doc_strings) | |
| app = FastAPI( | |
| title="QA Chatbot Streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma", | |
| version="0.1.0", | |
| ) | |
| @app.on_event("startup") | |
| async def startup_event() -> None: | |
| vectorstore = get_vectorstore() | |
| is_collection_empty: bool = vectorstore._collection.count() == 0 # type: ignore | |
| if is_collection_empty: | |
| vectorstore.add_texts( # type: ignore | |
| texts=[ | |
| "Cats are playing in the garden.", | |
| "Dogs are playing in the river.", | |
| "Dogs and cats are mortal enemies, but they often play together.", | |
| ] | |
| ) | |
| if not os.path.exists("message_store"): | |
| os.mkdir("message_store") | |
| async def generate_standalone_question( | |
| chat_history: str, question: str, settings: Settings | |
| ) -> str: | |
| prompt = PromptTemplate.from_template( | |
| template="""Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language. | |
| Chat History: | |
| {chat_history} | |
| Follow Up Input: {question} | |
| Standalone question:""" | |
| ) | |
| llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key) | |
| chain = prompt | llm | StrOutputParser() # type: ignore | |
| return await chain.ainvoke( # type: ignore | |
| { | |
| "chat_history": chat_history, | |
| "question": question, | |
| } | |
| ) | |
| async def search_relevant_documents(query: str, k: int = 5) -> list[Document]: | |
| vectorstore = get_vectorstore() | |
| retriever = vectorstore.as_retriever() | |
| return await retriever.aget_relevant_documents(query=query, k=k) | |
| async def generate_response( | |
| context: str, chat_memory: BaseChatMessageHistory, message: str, settings: Settings | |
| ) -> AsyncGenerator[str, None]: | |
| prompt = PromptTemplate.from_template( | |
| """Answer the question based only on the following context: | |
| {context} | |
| Question: {question}""" | |
| ) | |
| llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key) | |
| chain = prompt | llm # type: ignore | |
| response = "" | |
| async for token in chain.astream({"context": context, "question": message}): # type: ignore | |
| yield token.content | |
| response += token.content | |
| chat_memory.add_user_message(message=message) | |
| chat_memory.add_ai_message(message=response) | |
| async def generate_sse_response( | |
| context: list[Document], | |
| chat_memory: BaseChatMessageHistory, | |
| message: str, | |
| settings: Settings, | |
| ) -> AsyncGenerator[str, ChatSSEResponse]: | |
| prompt = PromptTemplate.from_template( | |
| """Answer the question based only on the following context: | |
| {context} | |
| Question: {question}""" | |
| ) | |
| llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key) | |
| chain = prompt | llm # type: ignore | |
| response = "" | |
| yield ChatSSEResponse(type="context", value=context).json() | |
| try: | |
| yield ChatSSEResponse(type="start", value="").json() | |
| async for token in chain.astream({"context": context, "question": message}): # type: ignore | |
| yield ChatSSEResponse(type="streaming", value=token.content).json() | |
| response += token.content | |
| yield ChatSSEResponse(type="end", value="").json() | |
| chat_memory.add_user_message(message=message) | |
| chat_memory.add_ai_message(message=response) | |
| except Exception as e: # TODO: Add proper exception handling | |
| yield ChatSSEResponse(type="error", value=str(e)).json() | |
| @app.post("/chat") | |
| async def chat( | |
| request: ChatRequest, settings: Settings = Depends(get_settings) | |
| ) -> StreamingResponse: | |
| memory_key = f"./message_store/{request.session_id}.json" | |
| chat_memory = FileChatMessageHistory(file_path=memory_key) | |
| memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False) | |
| standalone_question = await generate_standalone_question( | |
| chat_history=memory.buffer, question=request.message, settings=settings | |
| ) | |
| relevant_documents = await search_relevant_documents(query=standalone_question) | |
| combined_documents = combine_documents(relevant_documents) | |
| return StreamingResponse( | |
| generate_response( | |
| context=combined_documents, | |
| chat_memory=chat_memory, | |
| message=request.message, | |
| settings=settings, | |
| ), | |
| media_type="text/plain", | |
| ) | |
| @app.post("/chat/sse/") | |
| async def chat_sse( | |
| request: ChatRequest, settings: Settings = Depends(get_settings) | |
| ) -> StreamingResponse: | |
| memory_key = f"./message_store/{request.session_id}.json" | |
| chat_memory = FileChatMessageHistory(file_path=memory_key) | |
| memory = ConversationBufferMemory(chat_memory=chat_memory, return_messages=False) | |
| standalone_question = await generate_standalone_question( | |
| chat_history=memory.buffer, question=request.message, settings=settings | |
| ) | |
| relevant_documents = await search_relevant_documents(query=standalone_question, k=2) | |
| return StreamingResponse( | |
| generate_sse_response( | |
| context=relevant_documents, | |
| chat_memory=chat_memory, | |
| message=request.message, | |
| settings=settings, | |
| ), | |
| media_type="text/event-stream", | |
| ) |
| [tool.poetry] | |
| name = "langchain-language-expression-streaming-fastapi" | |
| version = "0.1.0" | |
| description = "" | |
| authors = ["Jesús Vélez Santiago"] | |
| packages = [{include = "app"}] | |
| [tool.poetry.dependencies] | |
| python = "^3.10" | |
| langchain = "^0.0.257" | |
| openai = "^0.27.8" | |
| fastapi = "0.99.1" | |
| uvicorn = "^0.23.2" | |
| python-dotenv = "^1.0.0" | |
| chromadb = "^0.4.5" | |
| tiktoken = "^0.4.0" | |
| [tool.poetry.group.dev.dependencies] | |
| black = "^23.7.0" | |
| [build-system] | |
| requires = ["poetry-core"] | |
| build-backend = "poetry.core.masonry.api" |
I try to use this with a ConversationChain. It would not work. Any help with this? I simple added a ConversationChain in the generate_sse_response, any suggestiongs?
Just fond out how to do this.
from typing import AsyncGenerator, Literal
from pydantic import BaseModel
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import BaseChatMessageHistory, Document
from langchain.schema.output_parser import StrOutputParser
from langchain.memory import ConversationBufferMemory
from langchain.schema.runnable import RunnableMap
from langchain.prompts import ChatPromptTemplate
from langchain.chains import OpenAIModerationChain
from config import Settings
class ChatRequest(BaseModel):
session_id: str
message: str
class ChatSSEResponse(BaseModel):
type: Literal["context", "start", "streaming", "end", "error"]
value: str | list[Document]
async def generate_standalone_question(
chat_history: str,
question: str,
settings: Settings) -> str:
"""
Generate question from the previous chats and the user prompt or question
to get a stand alone question.
:param chat_history: History of chats
:param question: Question user wants to answer
:param settings: Settings basemodel for openai keys
:return: String of standalone question
"""
prompt = PromptTemplate.from_template(
template="""Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:""",
input=[]
)
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
chain = prompt | llm | StrOutputParser() # type: ignore
return await chain.ainvoke( # type: ignore
{
"chat_history": chat_history,
"question": question,
}
)
async def generate_sse_response(
memory: ConversationBufferMemory,
chat_memory: BaseChatMessageHistory,
message: str,
settings: Settings,
prompt_template: ChatPromptTemplate,
moderator_model: OpenAIModerationChain) -> AsyncGenerator[str, ChatSSEResponse]:
"""
Function to generate the response using Server Sent Events(SSE)
TODO: Generate good exception handling
TODO: Adding the moderator to add a safeguard againt violations, hate speech and violent content
:param context: List of all relevant documents with respect to user question to model
:param chat_memory: Memory of previous conversations
:param settings: Settings basemodel for openai keys
:param prompt_template: The chat prompt template feed to the AI, with System messages and human messages
:return: List of AsyncGenerator containing str and ChatSSEResponse
"""
llm = ChatOpenAI(temperature=0, openai_api_key=settings.openai_api_key)
# first get the memory then expand the memory
# https://python.langchain.com/docs/guides/expression_language/cookbook
chain = RunnableMap({
"input": lambda x: x["input"],
"memory": memory.load_memory_variables
}) | {
"input": lambda x: x["input"],
"history": lambda x: x["memory"]["history"]
} | prompt_template | llm
response = ""
try:
yield ChatSSEResponse(type="start", value="").json()
# type: ignore
async for token in chain.astream({"input": message}):
yield ChatSSEResponse(type="streaming", value=token.content).json()
response += token.content
yield ChatSSEResponse(type="end", value="").json()
chat_memory.add_user_message(message=message)
chat_memory.add_ai_message(message=response)
except Exception as e: # TODO: Add proper exception handling
yield ChatSSEResponse(type="error", value=str(e)).json()If you get TypeError: unsupported operand type(s) for |: 'type' and 'types.GenericAlias' in line value: str | list[Document], you should replace it with value: Union[str, list[Document]].
Hi, If you want an MVP (Minimal Working Example), this is what I've extracted from the script.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from pydantic import BaseModel
from typing import AsyncGenerator
class ChatRequest(BaseModel):
session_id: str
message: str
app = FastAPI(
title="QA Chatbot Streaming using FastAPI, LangChain Expression Language , OpenAI, and Chroma",
version="0.1.0",
)
async def generate_response(
context: str,
message: str,
) -> AsyncGenerator[str, None]:
prompt = PromptTemplate.from_template(
"""Answer the question based only on the following context:
{context}
Question: {question}"""
)
llm = ChatOpenAI(
temperature=0,
openai_api_key=YOUR_OPENAI_KEY,
)
chain = prompt | llm # type: ignore
response = ""
async for token in chain.astream({"context": context, "question": message}): # type: ignore
yield token.content
response += token.content
@app.post("/chat")
async def chat(
request: ChatRequest,
) -> StreamingResponse:
return StreamingResponse(
generate_response(
context="",
message=request.message,
),
media_type="text/plain",
)Great content! I am struggling to make this work with AgentExecutor. Output never come as streamed token. Any ideas how to make the LCEL implementation of AgentExecutor to stream?
agent_executor = AgentExecutor(agent=agent_chain, memory=memory, verbose=True, tools=tool_list,return_intermediate_steps=False)
async for token in agent_executor .astream({"input ": "Hello"}):
yield token.content
response += token.content
Postman doesn't stream the results back ... I recommend using curl. For example: