Skip to content

Instantly share code, notes, and snippets.

@darenr
Created June 12, 2024 22:24
Show Gist options
  • Select an option

  • Save darenr/df71dff41dc600913ad445c6661269a2 to your computer and use it in GitHub Desktop.

Select an option

Save darenr/df71dff41dc600913ad445c6661269a2 to your computer and use it in GitHub Desktop.
Async example with LangChain
import asyncio
import time
import pandas as pd
from langchain.callbacks import get_openai_callback
from langchain.chains import LLMChain, SequentialChain
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
from langchain.prompts import ChatPromptTemplate
class SummaryChain:
def __init__(self, df, llm):
self.df = df
self.llm = llm
def build_chain(self):
llm = self.llm
sentiment_schema = ResponseSchema(
name="sentiment",
description="The main sentiment of the review, limited to 3 words.",
)
summary_schema = ResponseSchema(
name="summary",
description="Brief Summary of the review, limited to one paragraph.",
)
sentiment_response_schemas = [sentiment_schema, summary_schema]
output_parser = StructuredOutputParser.from_response_schemas(
sentiment_response_schemas
)
response_format = output_parser.get_format_instructions()
## sentiment and Summary Chain
sentiment_prompt = ChatPromptTemplate.from_template(
"""Act like an expert somellier. Your goal is to extract the main sentiment from wine reviews, delimited by triple dashes. Limit the sentiment to 3 words. \
---
Review: {review}
---
{response_format}
"""
)
sentiment_chain = LLMChain(
llm=llm, prompt=sentiment_prompt, output_key="sentiment"
)
chain = SequentialChain(
chains=[sentiment_chain],
input_variables=["review"] + ["response_format"],
output_variables=["sentiment"],
verbose=False,
)
return chain, output_parser, response_format
async def generate_concurrently(self):
df = self.df
chain, output_parser, response_format = self.build_chain()
tasks = []
for _, row in df.iterrows():
review = row["description"]
unique_id = row["unique_id"]
inputs = {
"review": review,
"response_format": response_format,
}
tasks.append(self.async_generate(chain, inputs, unique_id))
results = await asyncio.gather(*tasks)
for unique_id, response, cost in results:
summary = output_parser.parse(response)["summary"]
sentiment = output_parser.parse(response)["sentiment"]
df.loc[
df["unique_id"] == unique_id, ["summary", "sentiment", "sentiment_cost"]
] = [summary, sentiment, cost]
async def async_generate(self, chain, inputs, unique_id):
with get_openai_callback() as cb:
resp = await chain.arun(inputs)
return unique_id, resp, cb.total_cost
class CharacteristicsChain:
def __init__(self, df, llm):
self.df = df
self.llm = llm
def build_chain(self):
llm = self.llm
characteristics_schema = []
for i in range(1, 6):
characteristics_schema.append(
ResponseSchema(
name=f"characteristic_{i}",
description=f"The number {i} characteristic. One or two words long.",
)
)
output_parser = StructuredOutputParser.from_response_schemas(
characteristics_schema
)
response_format = output_parser.get_format_instructions()
characteristics_prompt = ChatPromptTemplate.from_template(
"""
Act like an expert somellier. You will receive the name, the summary of the review and the county of origin of a given wine, delimited by triple dashes.
Your goal is to extract the top five main characteristics of the wine.
---
Wine Name: {wine_name}
Country: {country}
Summary Review: {summary}
---
{response_format}
"""
)
characteristics_chain = LLMChain(
llm=llm, prompt=characteristics_prompt, output_key="characteristics"
)
chain = SequentialChain(
chains=[characteristics_chain],
input_variables=["wine_name", "summary", "country"] + ["response_format"],
output_variables=["characteristics"],
verbose=False,
)
return chain, output_parser, response_format
async def generate_concurrently(self):
df = self.df
chain, output_parser, response_format = self.build_chain()
tasks = []
for _, row in df.iterrows():
summary = row["summary"]
country = row["country"]
unique_id = row["unique_id"]
title = row["title"]
inputs = {
"summary": summary,
"wine_name": title,
"country": country,
"response_format": response_format,
}
tasks.append(self.async_generate(chain, inputs, unique_id))
results = await asyncio.gather(*tasks)
for unique_id, response, cost in results:
characteristic_1 = output_parser.parse(response)["characteristic_1"]
characteristic_2 = output_parser.parse(response)["characteristic_2"]
characteristic_3 = output_parser.parse(response)["characteristic_3"]
characteristic_4 = output_parser.parse(response)["characteristic_4"]
characteristic_5 = output_parser.parse(response)["characteristic_5"]
df.loc[
df.unique_id == unique_id,
[
"characteristic_1",
"characteristic_2",
"characteristic_3",
"characteristic_4",
"characteristic_5",
"cost_characteristics",
],
] = [
characteristic_1,
characteristic_2,
characteristic_3,
characteristic_4,
characteristic_5,
cost,
]
async def async_generate(self, chain, inputs, unique_id):
with get_openai_callback() as cb:
resp = await chain.arun(inputs)
return unique_id, resp, cb.total_cost
llm = ChatOpenAI(
temperature=0.0,
request_timeout=15,
model_name="gpt-4o",
)
df = pd.read_csv("wine_subset.csv")
s = time.perf_counter()
summary_chain = SummaryChain(llm=llm, df=df)
asyncio.run(summary_chain.generate_concurrently())
elapsed = time.perf_counter() - s
print(
"\033[1m" + f"Summary Chain (Async) executed in {elapsed:0.2f} seconds." + "\033[0m"
)
s = time.perf_counter()
characteristics_chain = CharacteristicsChain(llm=llm, df=df)
asyncio.run(characteristics_chain.generate_concurrently())
elapsed = time.perf_counter() - s
print(
"\033[1m"
+ f"Characteristics Chain (Async) executed in {elapsed:0.2f} seconds."
+ "\033[0m"
)
df.to_csv("checkpoint.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment