LCEL接口
- 输入格式
- 输出格式
- 8种不同的接口方式
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-3.5-turbo",
)
prompt = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话")
chain = prompt | model
#prompt.
chain.input_schema.schema()
prompt.input_schema.schema()
model.input_schema.schema()
# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage
chain.output_schema.schema()
# Invoke
chain.invoke({"topic": "熊"})
# Batch
chain.batch([{"topic": "熊"}, {"topic": "猫"}])
#max_concurrency控制并发数
chain.batch([{"topic": "熊"}, {"topic": "猫"}, {"topic": "狗"}], config={"max_concurrency": 5})
# Async Stream 异步
async for s in chain.astream({"topic": "女人"}):
print(s.content, end="", flush=True)
# Async Invoke
await chain.ainvoke({"topic": "男人"})
# Async Batch
await chain.abatch([{"topic": "熊"},{"topic": "女人"}])
异步获取中间步骤
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings
template = """基于下面的上下文来回答问题:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
vectorstore = FAISS.from_texts(
["柯基犬是一种中型家庭宠物犬"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
async for chunk in retrieval_chain.astream_log(
"柯基是什么?", include_names=["Docs"]
):
print("-" * 40)
print(chunk)
# 只看状态值
async for chunk in retrieval_chain.astream_log(
"柯基是什么?", include_names=["Docs"], diff=False
):
print("-" * 70)
print(chunk)
并行支持
from langchain_core.runnables import RunnableParallel
chain1 = ChatPromptTemplate.from_template("给我讲一个关于{topic}的笑话") | model
chain2 = (
ChatPromptTemplate.from_template("写两行关于{topic}的诗歌")
| model
)
combined = RunnableParallel(joke=chain1, poem=chain2)
%%time
chain1.invoke({"topic": "熊"})
# 并行执行
%%time
chain2.invoke({"topic": "熊"})
%%time
combined.invoke({"topic": "熊"})
# 并行批处理,适用于大量生成
%%time
chain1.batch([{"topic": "熊"}, {"topic": "猫"}])
%%time
chain2.batch([{"topic": "熊"}, {"topic": "猫"}])
# 并行执行
%%time
combined.batch([{"topic": "熊"}, {"topic": "猫"}])