The Role of Graph Structure in LLM-Powered Applications
Creating features that rely on LLMs often requires handling complex tasks, since the output of LLMs is not deterministic. This creates numerous edge cases that need to be covered. Sometimes we need to preprocess text or files, validate the output, involve the user, and perform many other actions. Without proper planning and robust, scalable abstractions, our application's logic can quickly turn into chaos. That’s why it’s often best to look for battle-tested patterns as a foundation for AI integration code. I raised a similar topic in my previous article about summarizing large documents with LLMs using the Map-Reduce pattern . In this article, we’ll take the Map-Reduce flow from that post and organize it using a graph structure to make our code more explicit. Both that post and this one are inspired by LangChain’s tutorial on map-reduce for summarization.
Why use a graph in AI integrations?
As I mentioned, even a simple flow can sometimes become difficult to manage. For example, we might need to call an LLM recursively until we achieve the desired result, or we may have two or more LLMs collaborating on a single task. To standardize these processes and make them more readable and maintainable, we can define functions as nodes in a graph, connected by edges that explicitly specify what should happen after each function call. This approach also allows us to share state between function calls, which is very important, since flows involving LLMs usually require storing the task context.
But let’s start this story from the beginning.
What is a Graph Structure in a Few Words?
A graph is a non-linear data structure made up of nodes connected by edges, which indicate relationships between the nodes. A graph can be undirected or directed, and it can also be circular or acyclic. In general, a graph is a very flexible data structure, commonly used to represent relationships between various entities. This flexibility makes it a perfect fit for our use case.
Imagine we have a function represented by a node. The function calls an LLM, and the LLM returns output that goes to an evaluation node (connected by an edge to the first node). The evaluation node calls a specialized SLM (Small Language Model). If the evaluation passes, the output is sent to the output node; if not, another edge leads back to the first node for a retry, this time with annotations to enhance the initial prompt.
Let’s Get Our Hands Dirty With Code
In the previous article, we implemented the Map-Reduce pattern to summarize a blog by summarizing its scraped articles. Each article was sent to the LLM in parallel for summarization. If the resulting list was too long, it was broken down into sublists and summarized again with a reduce prompt, recursively, to achieve the desired compression. Finally, the collapsed list was summarized once more with a reduce prompt to produce the final condensed summary.
The task is quite complex to organize. We have parallel calls, conditional calls, recursion, and splitting involved. Of course, the solution looks neat, but imagine trying to scale it or explain it to new teammates. That’s why we sometimes need well-known patterns and frameworks to make our code more familiar to other developers.
We will be using a framework created with graphs in mind—langgraph
. langgraph
provides all the primitives needed to turn our flow into a graph. In my opinion, langgraph
is one of the best tools for developing AI integrations at scale. Building on a graph structure is a great idea and makes the flow easier to understand.
Preparing Utils
We need to define some utilities for processing data and running a model:
typescript
import { ChatOpenAI } from "@langchain/openai";
import { TokenTextSplitter } from "@langchain/textsplitters";
const MAX_TOKEN = 1000;
const textSplitter = new TokenTextSplitter({
chunkSize: MAX_TOKEN,
chunkOverlap: 0,
});
const llm = new ChatOpenAI({
model: "gpt-5-mini",
apiKey: process.env.OPENAI_API_KEY,
});
Defining a State
The first major improvement to our code is having a tool for defining a shared state, and even making it type-safe.
The state definition looks like this:
typescript
/* rest of the code */
import { Annotation } from "@langchain/langgraph";
/* rest of the code */
const OverallState = Annotation.Root({
contents: Annotation<string[]>,
summaries: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
}),
collapsedSummaries: Annotation<Document[]>,
finalSummary: Annotation<string>,
});
As you can see, we are using a single primitive to define the schema of our state as well as the TypeScript types via a generic. The Annotation.Root
method is a wrapper used to define the top-level state structure. The properties of the object supplied to Root
are channels that store the data returned from the nodes (functions in our graph). The reducer
property is a function used to merge the most recent result returned from a node with the current state. Later on, we will pass this OverallState
to the graph builder.
Defining Node Functions
The functions we are going to define reflect the four steps we explored in the previous article, which are:
Data returned from the scraper is passed to our graph invocation.
typescript
import { runSitemapBasedScraper } from "./scraper/main";
import graph from "./summarizer/with-langgraph/graph";
async function main() {
const scrappingResults = await runSitemapBasedScraper([
"https://www.aboutjs.dev",
]);
const filteredScrapedResults = scrappingResults.filter((result) => {
if (result.error) {
console.error(`❌ ${result.url}: ${result.error}`);
}
return result.success;
});
const summarized = await graph.invoke({
contents: filteredScrapedResults.flatMap((result) =>
result.posts.map((post) => post.content),
),
});
console.log("summarized", summarized.finalSummary);
}
void main();
Don’t worry, we’re not skipping anything. We’ll go through the process of building the graph step by step. In this snippet, I just wanted to show how the input data is passed. Our preprocess node, which will be part of the graph, looks like this:
typescript
import { Document } from "@langchain/core/documents";
import { Annotation } from "@langchain/langgraph";
/* rest of the code */
const OverallState = Annotation.Root({
contents: Annotation<string[]>,
summaries: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
}),
collapsedSummaries: Annotation<Document[]>,
finalSummary: Annotation<string>,
});
/* rest of the code */
const preProcess = async (state: typeof OverallState.State) => {
const docs = state.contents.map(
(content) => new Document({ pageContent: content }),
);
const splitDocs = await textSplitter.splitDocuments(docs);
return { contents: splitDocs.map((doc) => ({ content: doc.pageContent })) };
};
/* rest of the code */
As you can se our node function has a state parameter which is nothing more than our OverallState
defined earlier. State is typed. Preprocess function just gets content(posts) passed during invocation and splits into smaller sub-documents to make them to not exceed a given token limit.
- Mapping Phase In the mapping phase, we need to generate a summary for each article or its chunk:
Previous functions' implementation
typescript
async function runMappers(formattedDocs: Document[]): Promise<string[]> {
console.log("Summarization started...");
const splitDocs = await textSplitter.splitDocuments(formattedDocs);
const results = await model.batch(
splitDocs.map((doc) => [
{
role: "user",
content: mapTemplate(doc.pageContent),
},
]),
);
return results.map((result) => result.content as string);
}
Function's Implementation with langgraph
typescript
import { mapPrompt, reducePrompt } from "./prompts";
import { Send } from "@langchain/langgraph";
type SummaryState = {
content: string;
};
/* REST OF THE CODE */
const mapContents = (state: typeof OverallState.State) => {
return state.contents.map(
(content) => new Send("generateSummary", { content }),
);
};
const generateSummary = async (
state: SummaryState,
): Promise<{ summaries: string[] }> => {
const prompt = await mapPrompt.invoke({
context: state.content,
});
const response = await llm.invoke(prompt);
return { summaries: [String(response.content)] };
};
/* REST OF THE CODE */
The mapContents
function from the refactor maps content into a callback that returns a Send
object containing the content and the name of the next node (function). Each Send
instance spawns one node and transports data from the previous node to that spawned node. As a result, we receive one node per content item. So, mapContents
acts like an edge → node generator.
The generateSummary
function sends a request to the LLM along with the map prompt. One call is performed per Send
. The response from each function is placed in the state channel: summaries
. The returned value is added to the array, and the results from each generateSummary
call are merged by the reduce callback defined inside Annotation.Root
.
In this phase, the list of summaries is reduced to the final summary. We also need to handle the process of collapsing the list of summaries. The entire list cannot exceed a given limit, which can be determined by the context window of the LLM or other factors. This is also where we encounter the limitations of the Map-Reduce pattern. Because we need to break the list into sublists, some important pieces of information—expressed, for example, by long paragraphs—can also be split. Therefore, it is important to test how large each chunk should be. In our case, we split the content into very small parts for explanatory purposes.
Let’s take a look at the previous version of the function for the reducing phase:
typescript
async function lengthFunction(summaries: string[]) {
const tokenCounts = await Promise.all(
summaries.map(async (summary) => {
return model.getNumTokens(summary);
}),
);
return tokenCounts.reduce((sum, count) => sum + count, 0);
}
export async function splitSummariesByTokenLimit(
summaries: string[],
tokenLimit: number,
): Promise<string[][]> {
const listOfSummariesSublists: string[][] = [];
let sublist: string[] = [];
for (const summary of summaries) {
const chunks = await recursiveTextSplitter.splitText(summary);
for (const chunk of chunks) {
const candidateList = [...sublist, chunk];
const candidateTokens = await lengthFunction(candidateList);
if (candidateTokens > tokenLimit) {
if (sublist.length > 0) {
listOfSummariesSublists.push(sublist);
sublist = [];
}
}
sublist.push(chunk);
}
}
if (sublist.length > 0) {
listOfSummariesSublists.push(sublist);
}
return listOfSummariesSublists;
}
async function reduceSummariesBatch(listOfSummaries: string[][]) {
const result = await model.batch(
listOfSummaries.map((summaries) => [
{
role: "user",
content: reduceTemplate(summaries.join("
")),
},
]),
);
return result.map((res) => res.content as string);
}
async function checkShouldCollapse(summaries: string[]) {
const tokenCount = await lengthFunction(summaries);
return tokenCount > 1000;
}
async function collapseSummaries(
summaries: string[],
recursionLimit = 5,
iteration = 0,
) {
console.log("Collapsing summaries...");
if (summaries.length === 0) {
return [];
}
const splitDocLists = await splitSummariesByTokenLimit(summaries, CHUNK_SIZE);
const results = await reduceSummariesBatch(splitDocLists);
let shouldCollapse = await checkShouldCollapse(results);
if (shouldCollapse && iteration < recursionLimit) {
console.log("Token count exceeds limit, collapsing summaries further...");
return collapseSummaries(results, recursionLimit, iteration + 1);
}
return results;
}
… and then at the current version of the function for the mapping phase, implemented using langgraph’s
primitives:
typescript
import {
collapseDocs,
splitListOfDocs,
} from "langchain/chains/combine_documents/reduce";
import { reducePrompt } from "./prompts";
/* rest of the code */
const collectSummaries = async (state: typeof OverallState.State) => {
return {
collapsedSummaries: state.summaries.map(
(summary) => new Document({ pageContent: summary }),
),
};
};
async function shouldCollapse(state: typeof OverallState.State) {
let numTokens = await lengthFunction(state.collapsedSummaries);
if (numTokens > MAX_TOKEN) {
return "collapseSummaries";
} else {
return "generateFinalSummary";
}
}
async function reduceSummaries(input: Document[]) {
const prompt = await reducePrompt.invoke({ docs: input });
const response = await llm.invoke(prompt);
return String(response.content);
}
const collapseSummaries = async (state: typeof OverallState.State) => {
const docLists = splitListOfDocs(
state.collapsedSummaries,
lengthFunction,
MAX_TOKEN,
);
const results = [];
for (const docList of docLists) {
results.push(await collapseDocs(docList, reduceSummaries));
}
return { collapsedSummaries: results };
};
const generateFinalSummary = async (state: typeof OverallState.State) => {
const response = await reduceSummaries(state.collapsedSummaries);
return { finalSummary: response };
};
/* rest of the code */
The collectSummaries
function takes summaries from the mapping phase (the summaries
channel) and copies them to the collapsedSummaries
channel, where they can be modified by subsequent functions. In the previous version, this was not necessary.
Next, we have a function for checking whether the list should be collapsed into sublists (checkShouldCollapse
), which returns a string indicating whether we should collapse the list or go straight to the final summary. In the previous version, this function returned a boolean, whereas the current version returns a string that indicates which function should be called next, depending on whether the list is too long.
Then, the reduceSummaries
function is used to ask an LLM to generate a summary of a list when used in generateFinalSummary
, or of a sublist when used inside the collapseSummaries
function. In the previous version, we had two functions for sending batch requests to the LLM in the case of collapsing, and just one request when generating the final summary.
In the current version, the process of splitting lists into sublists and reducing them is simplified, as we use utilities from langchain
.
Putting It All Together Into a Graph
Alright, this will be the most important part of this refactor, as we will be able to see the benefits of organizing our flow into a graph structure.
First, let’s take a look at the previous version:
typescript
/* rest of the code */
export async function summarizeDocuments(
documents: LocalDocument[],
maxIterations = 5,
) {
const formattedDocs = documents.map(
(doc) =>
new Document({
pageContent: doc.content,
metadata: {
title: doc.title,
link: doc.link,
date: doc.date,
source: doc.source,
selector: doc.selector,
index: doc.index,
},
}),
);
let summaries = await runMappers(formattedDocs);
const shouldCollapse = await checkShouldCollapse(summaries);
if (shouldCollapse) {
summaries = await collapseSummaries(summaries, maxIterations);
}
const finalSummary = await reduceSummaries(summaries);
console.log("finalSummary", finalSummary);
}
/* rest of the code */
At first glance, everything seems clear and explicit—and it really is. The first time, we might overlook that the collapseSummaries
function contains recursion, but we probably notice it on the second iteration. Now imagine that we decide to add more tweaks like this: the main function would be full of implicit actions and could quickly become spaghetti code.
If we don’t plan to expand this logic, it’s fine. But if we have in mind more actions that need to be added to this flow, the graph will really shine:
typescript
import { StateGraph } from "@langchain/langgraph";
/* rest of the code */
const graphBuilder = new StateGraph(OverallState)
// registering nodes
.addNode("generateSummary", generateSummary)
.addNode("collectSummaries", collectSummaries)
.addNode("collapseSummaries", collapseSummaries)
.addNode("generateFinalSummary", generateFinalSummary)
.addNode("preProcess", preProcess)
// preprocessing phase
.addEdge("__start__", "preProcess")
// mapping phase
.addConditionalEdges("preProcess", mapContents, ["generateSummary"])
.addEdge("generateSummary", "collectSummaries")
// reducing phase
.addConditionalEdges("collectSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addConditionalEdges("collapseSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addEdge("generateFinalSummary", "__end__");
const graph = graphBuilder.compile();
export default graph;
Here is the smart and flexible abstraction for our flow.
Firstly, we initialize a StateGraph
instance, supplying the state that will be available in all nodes and edges. Then, we use the addNode
method to register our core functions responsible for mapping and reducing summaries, passing the node’s name as a string. The node’s name serves as an alias for the function, which is passed as the second argument.
Then we register edges using the addEdge
and addConditionalEdges
methods. Standard edges act as connections between two functions. For example, in this case: .addEdge("generateSummary", "collectSummaries")
, we know that after calling the generateSummary
function, the collectSummaries
function will be called.
We can also use the addConditionalEdge
method to determine which node or nodes should be called. The first argument, as in the standard addEdge
method, is the entry node. Then we pass a function that decides which node should come next, and the third argument is the array of possible choices that the conditional function can return. This method also allows us to spawn multiple nodes that can be executed in parallel, and that’s exactly what we do. In the fragment: .addConditionalEdges("preProcess", mapContents, ["generateSummary"])
, we go from the preProcess
function to the mapContents
function, which spawns multiple generateSummary
nodes using the Send
primitive:
typescript
const mapContents = (state: typeof OverallState.State) => {
return state.contents.map(
(content) => new Send("generateSummary", { content }),
);
};
The nodes __start__
and __end__
are predefined and simply indicate the start and end of the graph.
And now, the last thing: have you spotted the recursion case in our graph? Of course, it appears in this fragment:
typescript
/* rest of the code */
.addConditionalEdges("collectSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addConditionalEdges("collapseSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addEdge("generateFinalSummary", "__end__");
/* rest of the code */
In collapseSummaries
, we grab all the summaries from the mapping phase and then check whether they should be collapsed. If not, we move on to generateFinalSummary
, which is connected to the __end__
node that marks the end of execution. Otherwise, we jump back to collapseSummaries
. Having a structure like that, we can quickly identify a loop just by analyzing the tree.
Thanks to this representation, we can even draw a diagram to visualize the flow in a graph structure:
Now we see that the most benefits we get from organizing a function calling in the graph not refactoring each functions itself. Thanks to this we achieved readability, extendability and the code became explicit.
Summary
With langgraph
at hand, we can implement the graph seamlessly. Langgraph
provides us with convenient primitives to utilize the graph structure in our flow without writing any low-level code for the graph abstraction. By building frameworks around well-known data structures, we can be confident that every new team member can become productive sooner. In the case of building LLM-powered applications, this is crucial, as we often need to implement complex flows to handle multiple tasks with non-deterministic outputs. Of course, in cases where the process is simple and we know it won’t be expanded in the future, creating additional abstraction is premature, so careful planning is required.
Thanks for reading, and I hope you learned something from my content. Stay tuned for the next article. 👋
PS: Checkout the GitHub repo related to this article.