Implementing Function Calling with Knowledge Base Integration
How to integrate functions with a knowledge base
This notebook demonstrates creating a proxy capable of accessing a knowledge base and invoking two functions based on user requests. It builds an agent utilizing arXiv data to answer academic inquiries. Two functions are defined: - get_articles: retrieves articles from arXiv related to a topic, listing abstracts with links - read_article_and_summarize: reads full articles from previous searches and summarizes key argumants, evidence, and conclusions. This setup enables multi-service workflows where data from the first function is persisted for use in the second.
Exercise
This notebook walks through the following workflow:
- Practical Search: Create two functions that fetch answers from arXiv
- Agent Configuration: Build agent behavior that evaluates function needs and executes them accordingly
- ArXiv Interaction: Combine all components into a real-time session
# Package Installation
!pip install scipy
!pip install tenacity
!pip install tiktoken==0.3.3
!pip install termcolor
!pip install openai
!pip install arxiv
!pip install pandas
!pip install PyPDF2
!pip install tqdm
import os
import arxiv
import ast
import concurrent
import json
import os
import pandas as pd
import tiktoken
from csv import writer
from IPython.display import display, Markdown, Latex
from openai import OpenAI
from PyPDF2 import PdfReader
from scipy import spatial
from tenacity import retry, wait_random_exponential, stop_after_attempt
from tqdm import tqdm
from termcolor import colored
GPT_MODEL = "gpt-3.5-turbo-0613"
EMBEDDING_MODEL = "text-embedding-ada-002"
client = OpenAI()
1. Utility Functions
First, we set up utilities to support our two functions.
Downloaded papers will be stored in a directory (here ./data/papers). A file arxiv_library.csv stores embeddings and metadata for downloaded papers for retrieval during summarization.
directory = './data/papers'
# Check if directory exists
if not os.path.exists(directory):
# Create directory and intermediate directories if needed
os.makedirs(directory)
print(f"Directory '{directory}' created successfully.")
else:
# Print message if already exists
print(f"Directory '{directory}' already exists.")
# Set directory for downloaded papers
data_dir = os.path.join(os.curdir, "data", "papers")
paper_dir_filepath = "./data/arxiv_library.csv"
# Initialize dataframe for storing downloaded files
df = pd.DataFrame(list())
df.to_csv(paper_dir_filepath)
@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
def embedding_request(text):
# Generate embedding
response = client.embeddings.create(input=text, model=EMBEDDING_MODEL)
return response
@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
def get_articles(query, library=paper_dir_filepath, top_k=5):
"""
Return top-k articles based on user query, storing relevant info.
Downloads files and stores details in arxiv_library.csv for later retrieval by read_article_and_summarize
"""
client = arxiv.Client()
search = arxiv.Search(
query = "quantum",
max_results = 10,
sort_by = arxiv.SortCriterion.SubmittedDate
)
result_list = []
for result in client.results(search):
result_dict = {}
result_dict.update({"title": result.title})
result_dict.update({"summary": result.summary})
# Get first URL
result_dict.update({"article_url": [x.href for x in result.links][0]})
result_dict.update({"pdf_url": [x.href for x in result.links][1]})
result_list.append(result_dict)
# Store reference information in library file
response = embedding_request(text=result.title)
file_reference = [
result.title,
result.download_pdf(data_dir),
response.data[0].embedding,
]
# Write to file
with open(library, "a") as f_object:
writer_object = writer(f_object)
writer_object.writerow(file_reference)
f_object.close()
return result_list
# Test
result_output = get_articles("ppo reinforcement learning")
result_output[0]
{'title': 'Entanglement entropy and deconfined criticality: emergent SO(5) symmetry and proper lattice bipartition',
'summary': "We study the R\\'enyi entanglement entropy (EE) of the two-dimensional $J$-$Q$\nmodel, the emblematic quantum spin model of deconfined criticality at the phase\ntransition between antiferromagnetic and valence-bond-solid ground states.\nQuantum Monte Carlo simulations with an improved EE scheme reveal critical\ncorner contributions that scale logarithmically with the system size, with a\ncoefficient in remarkable agreement with the form expected from a large-$N$\nconformal field theory with SO($N=5$) symmetry. However, details of the\nbipartition of the lattice are crucial in order to observe this behavior. If\nthe subsystem for the reduced density matrix does not properly accommodate\nvalence-bond fluctuations, logarithmic contributions appear even for\ncorner-less bipartitions. We here use a $45^\\circ$ tilted cut on the square\nlattice. Beyond supporting an SO($5$) deconfined quantum critical point, our\nresults for both the regular and tilted cuts demonstrate important microscopic\naspects of the EE that are not captured by conformal field theory.",
'article_url': 'http://arxiv.org/abs/2401.14396v1',
'pdf_url': 'http://arxiv.org/pdf/2401.14396v1'}
"""
Rank strings by their relatedness to a query string (query) based on embeddings in a DataFrame (df)
relatedness_fn: Function to compute relatedness between two embedding vectors. Default uses cosine distance with 1 minus the distance as the metric.
Cosine distance near 1 indicates high similarity;
Cosine distance near -1 indicates low similarity
top_n: Integer specifying number of most related strings to return
"""
def strings_ranked_by_relatedness(
query: str,
df: pd.DataFrame,
relatedness_fn=lambda x, y: 1 - spatial.distance.cosine(x, y),
top_n: int = 100,
) -> list[str]:
"""
Return list of strings and their relatedness scores sorted from highest to lowest
"""
# Get embedding
query_embedding_response = embedding_request(query)
query_embedding = query_embedding_response.data[0].embedding
strings_and_relatednesses = [
(row["filepath"], relatedness_fn(query_embedding, row["embedding"]))
for i, row in df.iterrows()
]
strings_and_relatednesses.sort(key=lambda x: x[1], reverse=True)
strings, relatednesses = zip(*strings_and_relatednesses)
return strings[:top_n]
def read_pdf(filepath):
"""
Takes a PDF file path and returns its content as a string
"""
# Create PdfReader object
reader = PdfReader(filepath)
pdf_text = ""
page_number = 0
for page in reader.pages:
page_number += 1
pdf_text += page.extract_text() + f"\nPage Number: {page_number}"
return pdf_text
# Split text into chunks of size n, ending at sentence boundaries
def create_chunks(text, n, tokenizer):
"""Return consecutive chunks of size n from given text."""
tokens = tokenizer.encode(text)
i = 0
while i < len(tokens):
# Find nearest sentence within 0.5*n and 1.5*n tokens
j = min(i + int(1.5 * n), len(tokens))
while j > i + int(0.5 * n):
# Decode tokens and check for sentence end or new line
chunk = tokenizer.decode(tokens[i:j])
if chunk.endswith(".") or chunk.endswith("\n"):
break
j -= 1
# If no sentence boundary found, use n tokens as chunk
if j == i + int(0.5 * n):
j = min(i + n, len(tokens))
yield tokens[i:j]
i = j
def extract_chunk(content, template_prompt):
"""
Use prompt as input to return a summarized text chunk
"""
prompt = template_prompt + content
response = client.chat.completions.create(
model=GPT_MODEL, messages=[{"role": "user", "content": prompt}], temperature=0
)
return response.choices[0].message.content
def summarize_text(query):
"""
Perform the following tasks:
- Read embeddings from arxiv_library.csv
- Find nearest files to user query
- Extract text from files and chunk it
- Summarize each chunk in parallel
- Compile final summary and return
"""
# Prompt for recursive summarization of input files
summary_prompt = """Summarize an academic paper. Extract key points through reasoning.\n\nContent:"""
# If library is empty (no searches yet), perform a search and download results
library_df = pd.read_csv(paper_dir_filepath).reset_index()
if len(library_df) == 0:
print("No papers searched yet, downloading first.")
get_articles(query)
print("Paper download complete.")
library_df = pd.read_csv(paper_dir_filepath).reset_index()
library_df.columns = ["title", "filepath", "embedding"]
library_df["embedding"] = library_df["embedding"].apply(ast.literal_eval)
strings = strings_ranked_by_relatedness(query, library_df, top_n=1)
print("Extracting text chunks from paper")
pdf_text = read_pdf(strings[0])
# Initialize tokenizer
tokenizer = tiktoken.get_encoding("cl100k_base")
results = ""
# Split document into 1500 token chunks
chunks = create_chunks(pdf_text, 1500, tokenizer)
text_chunks = [tokenizer.decode(chunk) for chunk in chunks]
print("Summarizing each text chunk")
# Process summaries in parallel
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(text_chunks)
) as executor:
futures = [
executor.submit(extract_chunk, chunk, summary_prompt)
for chunk in text_chunks
]
with tqdm(total=len(text_chunks)) as pbar:
for _ in concurrent.futures.as_completed(futures):
pbar.update(1)
for future in futures:
data = future.result()
results += data
# Final summary
print("Compiling all summaries")
response = client.chat.completions.create(
model=GPT_MODEL,
messages=[
{
"role": "user",
"content": f"""Organize a summary based on key points extracted from an academic paper.
Highlight core arguments, conclusions, and evidence, addressing the user's query.
User query: {query}
Summary should be organized under headings for core arguments, evidence, and conclusion.
Key Points:\n{results}\nSummary:\n""",
}
],
temperature=0,
)
return response
# Test
chat_test_response = summarize_text("PPO reinforcement learning sequence generation")
100%|██████████| 15/15 [00:08<00:00, 1.76it/s]
print(chat_test_response.choices[0].message.content)
The academic paper discusses the unique decomposition of generators of completely positive dynamical semigroups in infinite dimensions. The main result of the paper is that for any separable complex Hilbert space, any trace-class operator B that does not have a purely imaginary trace, and any generator L of a norm-continuous one-parameter semigroup of completely positive maps, there exists a unique bounded operator K and a unique completely positive map Φ such that L=K(·) + (·)K∗+ Φ. The paper also introduces a modified version of the Choi formalism, which relates completely positive maps to positive semi-definite operators, and characterizes when this correspondence is injective and surjective. The paper concludes by discussing the challenges and questions that arise when generalizing the results to non-separable Hilbert spaces.
2. Agent Setup
In this step, we create an agent with a Conversasion class that supports multiple API calls and Python libraries for interaction with the completion API and knowledge base.
@retry(wait=wait_random_exponential(min=1, max=40), stop=stop_after_attempt(3))
def chat_completion_request(messages, functions=None, model=GPT_MODEL):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
functions=functions,
)
return response
except Exception as e:
print("Unable to generate ChatCompletion response")
print(f"Exception: {e}")
return e
class Conversation:
def __init__(self):
self.conversation_history = []
def add_message(self, role, content):
message = {"role": role, "content": content}
self.conversation_history.append(message)
def display_conversation(self, detailed=False):
role_to_color = {
"system": "red",
"user": "green",
"assistant": "blue",
"function": "magenta",
}
for message in self.conversation_history:
print(
colored(
f"{message['role']}: {message['content']}\n\n",
role_to_color[message["role"]],
)
)
# Initialize get_articles and read_article_and_summarize functions
arxiv_functions = [
{
"name": "get_articles",
"description": """Use this function to retrieve academic papers from arXiv to answer user questions.""",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": f"""
User query in JSON format. Provide an overview and return URL references of articles
""",
}
},
"required": ["query"],
},
},
{
"name": "read_article_and_summarize",
"description": """
Use this function to read all papers and provide a summary to users. Must call get_articles first in a session, otherwise do not call this function
""",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": f"""
Describe user query in plain text
""",
}
},
"required": ["query"],
},
}
]
def chat_completion_with_function_execution(messages, functions=[None]):
"""Invoke ChatCompletion API with optional function addition"""
response = chat_completion_request(messages, functions)
full_message = response.choices[0]
if full_message.finish_reason == "function_call":
print(f"Function request generated, calling function")
return call_arxiv_function(messages, full_message)
else:
print(f"No function called, returning response to user")
return response
def call_arxiv_function(messages, full_message):
"""Execute function call when model determines it necessary. Currently extended by adding clauses to if statement."""
if full_message.message.function_call.name == "get_articles":
try:
parsed_output = json.loads(
full_message.message.function_call.arguments
)
print("Obtained return result")
results = get_articles(parsed_output["query"])
except Exception as e:
print(parsed_output)
print(f"Function execution failed")
print(f"Error message: {e}")
messages.append(
{
"role": "function",
"name": full_message.message.function_call.name,
"content": str(results),
}
)
try:
print("Got search results, returning summary")
response = chat_completion_request(messages)
return response
except Exception as e:
print(type(e))
raise Exception("Function return failed")
elif (
full_message.message.function_call.name == "read_article_and_summarize"
):
parsed_output = json.loads(
full_message.message.function_call.arguments
)
print("Found and read paper")
summary = summarize_text(parsed_output["query"])
return summary
else:
raise Exception("Function call not recognized")
3. ArXiv Session
Testing functon outputs during conversation:
# Begin with system message
paper_system_message = """You are arXivGPT, a helpful assistant retrieving academic papers to answer user questions.
Clearly summarize papers so customers can decide which to read to answer their queries.
Always provide article URLs and titles so users understand paper names and can click to access them.
Start now!"""
paper_conversation = Conversation()
paper_conversation.add_message("system", paper_system_message)
# Add user message
paper_conversation.add_message("user", "Hi, how does PPO reinforcement learning work?")
chat_response = chat_completion_with_function_execution(
paper_conversation.conversation_history, functions=arxiv_functions
)
assistant_message = chat_response.choices[0].message.content
paper_conversation.add_message("assistant", assistant_message)
display(Markdown(assistant_message))
# Add another user message to trigger our system using the second tool
paper_conversation.add_message(
"user",
"Can you help me read a PPO sequence generation paper and give me a summary?",
)
updated_response = chat_completion_with_function_execution(
paper_conversation.conversation_history, functions=arxiv_functions
)
display(Markdown(updated_response.choices[0].message.content))
Function generation requested, calling function
Finding and reading paper
Chunking text from paper
Summarizing each chunk of text
100%|██████████| 15/15 [00:09