Rola struktury grafu w aplikacjach opartych na LLM
Tworzenie rozwiązań opartych na LLM-ach często wymaga realizacji złożonych zadań, ponieważ odpowiedzi LLM'ów nie są przewidywalne. Powoduje to liczne edge cases, które muszą zostać obsłużone. Czasami musimy przetworzyć tekst lub pliki, zweryfikować odpowiedź, zaangażować użytkownika oraz wiele innych. Bez odpowiedniego planowania i solidnych, skalowalnych abstrakcji logika naszej aplikacji może łatwo zamienić się w chaos. Dlatego często najlepszym rozwiązaniem jest sięgnięcie po sprawdzone wzorce jako fundament kodu integracji AI. Podobny temat poruszyłem w moim wcześniejszym artykule o podsumowywaniu dużych dokumentów z wykorzystaniem wzorca Map-Reduce . W tym artykule wykorzystam flow Map-Reduce z tamtego wpisu i zorganizuję go przy pomocy struktury grafu, aby uczynić go czytelniejszym. Zarówno tamten artykuł, jak i ten, inspirowane są tutorialem LangChain dotyczącym map-reduce w podsumowywaniu treści.
Dlaczego warto używać grafu w integracjach AI?
Jak już wspomniałem, nawet prosty flow może czasami stać się trudny do opanowania. Na przykład możemy potrzebować odpytać model rekurencyjnie, tak długo aż osiągniemy oczekiwany rezeultat, albo zgrać dwa lub więcej LLM-ów współpracujących nad jednym zadaniem. Aby ustandaryzować te procesy i uczynić je bardziej czytelnymi oraz łatwiejszymi w utrzymaniu, możemy zdefiniować funkcje jako nodes w grafie, które połączone są za pomocą edges, które to jednoznacznie określają, jaka akcja powinna się wydarzyć następna. Takie podejście pozwala również na współdzielenie stanu między wywołaniami funkcjami, co jest bardzo istotne, ponieważ flow z udziałem LLM-ów zazwyczaj wymagaja przechowywania kontekstu zadania nad którym pracuje LLM.
Ale zacznijmy tę historię od początku.
Czym jest Graph w Kilku Słowach?
Graf to nieliniowa struktura danych złożona z node'ów połączonych edge'ami, które wskazują relacje między węzłami. Graf może być kierunkowy lub niekierunkowy, a także cykliczny lub acykliczny. Ogólnie rzecz biorąc, graf jest bardzo elastyczną strukturą danych, powszechnie stosowaną do odwzorowywania relacji między różnymi encjami. Ta elastyczność sprawia, że idealnie pasuje do naszego przypadku.
Wyobraźmy sobie funkcję reprezentowaną przez node'a. Funkcja wywołuje LLM'a, LLM zwraca wynik, który trafia do node'a ewaluacyjnego (połączonego edge'em z pierwszym node'em). Node ewaluacyjny wywołuje wyspecjalizowany SLM (Small Language Model). Jeśli ewaluacja zakończy się powodzeniem, wynik zostaje wysłany do node'a końcowego; w przeciwnym razie inny edge prowadzi z powrotem do pierwszego node'a w celu wykonania ponownej próby, tym razem z adnotacjami ulepszającymi początkowy prompt.
Bierzmy się Do Zabawy z Kodem
W poprzednim artykule zaimplementowałem wzorzec Map-Reduce, aby wygenerować podsumowanie bloga poprzez streszczenie jego zescapowanych artykułów. Każdy artykuł został wysłany do LLM równolegle w celu wygenerowania podsumowania. Jeśli zwrócona lista była zbyt długa, dzielona była na podlisty i ponownie streszczana przy użyciu promptu reduce, rekurencyjnie, aby uzyskać oczekiwaną kompresję. Na końcu zredukowana lista została jeszcze raz podsumowana z użyciem promptu reduce, aby otrzymać ostateczne, skondensowane podsumowanie.
To zadanie jest dość złożone do ogarnięcia. Mamy wywołania równoległe, warunkowe, rekurencję oraz podziały na sublisty. Oczywiście rozwiązanie wygląda czysto, ale wyobraź sobie próbę jego skalowania lub tłumaczenia go nowym członkom zespołu. Właśnie dlatego czasami potrzebujemy dobrze znanych wzorców i frameworków, by nasz kod był bardziej zrozumiały dla innych programistów.
Będziemy korzystać z frameworka stworzonego z myślą o grafach — langgraph. langgraph dostarcza wszystko co potrzebne do przekształcenia naszego flow w graf. Moim zdaniem langgraph to jedno z najlepszych narzędzi do tworzenia integracji AI na dużą skalę. Opieranie się na strukturze grafu to świetny pomysł, który sprawia, że flow jest łatwiejszy do zrozumienia.
Przygotowanie Util'sów
Musimy zdefiniować kilka funkcji do przetwarzania danych i uruchamiania modelu:
typescript
import { ChatOpenAI } from "@langchain/openai";
import { TokenTextSplitter } from "@langchain/textsplitters";
const MAX_TOKEN = 1000;
const textSplitter = new TokenTextSplitter({
chunkSize: MAX_TOKEN,
chunkOverlap: 0,
});
const llm = new ChatOpenAI({
model: "gpt-5-mini",
apiKey: process.env.OPENAI_API_KEY,
});Definiowanie stanu
Pierwszym dużym usprawnieniem w naszym kodzie jest posiadanie narzędzia do definiowania współdzielonego, dobrze otypowanego stanu.
Definicja stanu wygląda tak:
typescript
/* rest of the code */
import { Annotation } from "@langchain/langgraph";
/* rest of the code */
const OverallState = Annotation.Root({
contents: Annotation<string[]>,
summaries: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
}),
collapsedSummaries: Annotation<Document[]>,
finalSummary: Annotation<string>,
});Jak widać, używamy tylko jednej funkcji do zdefiniowania schematu naszego stanu, a także typów TypeScript z uzyciem generyków. Metoda Annotation.Root jest wrapperem służącym do definiowania struktury top level stanu. Właściwości obiektu przekazanego do Root to kanały przechowujące dane zwracane z nodes (funkcjii w naszym grafie). Właściwość reducer to funkcja używana do łączenia najnowszego wyniku zwróconego z node'a z aktualnym stanem. Później przekażemy OverallState do graph builder'a.
Definiowanie Funkcji dla Node'ów
Funkcje, które zdefiniujemy, odzwierciedlają cztery kroki omówione w poprzednim artykule, mianowicie:
Dane zwrócone przez scraper zostaną przekazane do naszej instancji grafu.
typescript
import { runSitemapBasedScraper } from "./scraper/main";
import graph from "./summarizer/with-langgraph/graph";
async function main() {
const scrappingResults = await runSitemapBasedScraper([
"https://www.aboutjs.dev",
]);
const filteredScrappedResults = scrappingResults.filter((result) => {
if (result.error) {
console.error(`❌ ${result.url}: ${result.error}`);
}
return result.success;
});
const summarized = await graph.invoke({
contents: filteredScrappedResults.flatMap((result) =>
result.posts.map((post) => post.content),
),
});
console.log("summarized", summarized.finalSummary);
}
void main();Spokojnie, niczego nie pomijamy. Przejdziemy krok po kroku przez proces budowania grafu. W tym fragmencie chciałem jedynie pokazać, jak przekazywane są dane wejściowe. Nasz node do wstępnego przetwarzania, który będzie częścią grafu, wygląda następująco:
typescript
import { Document } from "@langchain/core/documents";
import { Annotation } from "@langchain/langgraph";
/* rest of the code */
const OverallState = Annotation.Root({
contents: Annotation<string[]>,
summaries: Annotation<string[]>({
reducer: (state, update) => state.concat(update),
}),
collapsedSummaries: Annotation<Document[]>,
finalSummary: Annotation<string>,
});
/* rest of the code */
const preProcess = async (state: typeof OverallState.State) => {
const docs = state.contents.map(
(content) => new Document({ pageContent: content }),
);
const splitDocs = await textSplitter.splitDocuments(docs);
return { contents: splitDocs.map((doc) => ({ content: doc.pageContent })) };
};
/* rest of the code */W fazie mapowania musimy wygenerować podsumowanie dla każdego artykułu lub jego fragmentu:
Poprzednia implementacja
typescript
async function runMappers(formattedDocs: Document[]): Promise<string[]> {
console.log("Summarization started...");
const splitDocs = await textSplitter.splitDocuments(formattedDocs);
const results = await model.batch(
splitDocs.map((doc) => [
{
role: "user",
content: mapTemplate(doc.pageContent),
},
]),
);
return results.map((result) => result.content as string);
}Implementacja funkcji z życiem langgraph
typescript
import { mapPrompt, reducePrompt } from "./prompts";
import { Send } from "@langchain/langgraph";
type SummaryState = {
content: string;
};
/* REST OF THE CODE */
const mapContents = (state: typeof OverallState.State) => {
return state.contents.map(
(content) => new Send("generateSummary", { content }),
);
};
const generateSummary = async (
state: SummaryState,
): Promise<{ summaries: string[] }> => {
const prompt = await mapPrompt.invoke({
context: state.content,
});
const response = await llm.invoke(prompt);
return { summaries: [String(response.content)] };
};
/* REST OF THE CODE */Funkcja mapContents z refaktora mapuje content do callbacka, który zwraca obiekt Send zawierający content oraz nazwę następnego node'a (funkcji). Każda instancja Send tworzy jeden node i przenosi dane z poprzedniego node'a do nowo utworzonego. W efekcie otrzymujemy jeden node na fragment treści. Tak więc mapContents działa jak generator edge → node.
Funkcja generateSummary wysyła zapytanie do LLM wraz z promptem mapującym. Dla każdego Send wykonywane jest jedno wywołanie. Odpowiedź z każdej funkcji jest umieszczana w kanale stanu: summaries. Zwrócona wartość jest dodawana do tablicy, a wyniki z każdego wywołania generateSummary są scalane przez callback reducer zdefiniowany wewnątrz Annotation.Root.
W tej fazie lista podsumowań jest redukowana do ostatecznego podsumowania. Musimy również obsłużyć proces scalania listy podsumowań. Cała lista nie może przekroczyć określonego limitu, który np. może być determinowany przez okno kontekstowe LLM'a lub inne czynniki. To również tutaj napotykamy ograniczenia wzorca Map-Reduce. Ponieważ musimy podzielić listę na podlisty, niektóre ważne fragmenty informacji — na przykład wyrażone przez długie akapity — mogą zostać podzielone. Dlatego bardzo istotne jest dokładne przetestowanie w celu określenia jak duża powinna być każda część tekstu. W naszym przypadku dzielimy treść na bardzo małe fragmenty w celach wyjaśniających.
Spójrzmy na poprzednią wersję funkcji dla fazy redukcji:
typescript
async function lengthFunction(summaries: string[]) {
const tokenCounts = await Promise.all(
summaries.map(async (summary) => {
return model.getNumTokens(summary);
}),
);
return tokenCounts.reduce((sum, count) => sum + count, 0);
}
export async function splitSummariesByTokenLimit(
summaries: string[],
tokenLimit: number,
): Promise<string[][]> {
const listOfSummariesSublists: string[][] = [];
let sublist: string[] = [];
for (const summary of summaries) {
const chunks = await recursiveTextSplitter.splitText(summary);
for (const chunk of chunks) {
const candidateList = [...sublist, chunk];
const candidateTokens = await lengthFunction(candidateList);
if (candidateTokens > tokenLimit) {
if (sublist.length > 0) {
listOfSummariesSublists.push(sublist);
sublist = [];
}
}
sublist.push(chunk);
}
}
if (sublist.length > 0) {
listOfSummariesSublists.push(sublist);
}
return listOfSummariesSublists;
}
async function reduceSummariesBatch(listOfSummaries: string[][]) {
const result = await model.batch(
listOfSummaries.map((summaries) => [
{
role: "user",
content: reduceTemplate(summaries.join("
")),
},
]),
);
return result.map((res) => res.content as string);
}
async function checkShouldCollapse(summaries: string[]) {
const tokenCount = await lengthFunction(summaries);
return tokenCount > 1000;
}
async function collapseSummaries(
summaries: string[],
recursionLimit = 5,
iteration = 0,
) {
console.log("Collapsing summaries...");
if (summaries.length === 0) {
return [];
}
const splitDocLists = await splitSummariesByTokenLimit(summaries, CHUNK_SIZE);
const results = await reduceSummariesBatch(splitDocLists);
let shouldCollapse = await checkShouldCollapse(results);
if (shouldCollapse && iteration < recursionLimit) {
console.log("Token count exceeds limit, collapsing summaries further...");
return collapseSummaries(results, recursionLimit, iteration + 1);
}
return results;
}… a następnie na obecną wersję funkcji dla fazy mapowania, zaimplementowanej z użyciem funkcji langgraph:
typescript
import {
collapseDocs,
splitListOfDocs,
} from "langchain/chains/combine_documents/reduce";
import { reducePrompt } from "./prompts";
/* rest of the code */
const collectSummaries = async (state: typeof OverallState.State) => {
return {
collapsedSummaries: state.summaries.map(
(summary) => new Document({ pageContent: summary }),
),
};
};
async function shouldCollapse(state: typeof OverallState.State) {
let numTokens = await lengthFunction(state.collapsedSummaries);
if (numTokens > MAX_TOKEN) {
return "collapseSummaries";
} else {
return "generateFinalSummary";
}
}
async function reduceSummaries(input: Document[]) {
const prompt = await reducePrompt.invoke({ docs: input });
const response = await llm.invoke(prompt);
return String(response.content);
}
const collapseSummaries = async (state: typeof OverallState.State) => {
const docLists = splitListOfDocs(
state.collapsedSummaries,
lengthFunction,
MAX_TOKEN,
);
const results = [];
for (const docList of docLists) {
results.push(await collapseDocs(docList, reduceSummaries));
}
return { collapsedSummaries: results };
};
const generateFinalSummary = async (state: typeof OverallState.State) => {
const response = await reduceSummaries(state.collapsedSummaries);
return { finalSummary: response };
};
/* rest of the code */Funkcja collectSummaries pobiera podsumowania z fazy mapowania (kanał summaries) i kopiuje je do kanału collapsedSummaries, gdzie mogą być modyfikowane przez kolejne funkcje. W poprzedniej wersji nie było to konieczne.
Następnie mamy funkcję sprawdzającą, czy lista powinna zostać podzielona na podlisty (checkShouldCollapse), która zwraca ciąg znaków wskazujący, czy powinniśmy dokonać podziału, czy przejść bezpośrednio do ostatecznego podsumowania. W poprzedniej wersji funkcja zwracała wartość logiczną (boolean), natomiast obecna wersja zwraca string'a, który wskazuje, która funkcja powinna zostać wywołana następnie, w zależności od tego, czy długość listy przekracza limit.
Następnie funkcja reduceSummaries służy do wysyłania zapytania do LLM w celu wygenerowania podsumowania całej listy, gdy jest używana w generateFinalSummary, lub podlisty, gdy jest używana wewnątrz funkcji collapseSummaries. W poprzedniej wersji mieliśmy dwie funkcje do wysyłania zbiorczych zapytań do LLM w przypadku podziału, a tylko jedno zapytanie przy generowaniu ostatecznego podsumowania.
W obecnej wersji proces dzielenia list na podlisty i ich redukcji jest uproszczony, ponieważ korzystamy z narzędzi z biblioteki langchain.
Składanie Wszystkiego w Graf
Dobrze, to będzie najważniejsza część tego refaktoru, ponieważ będziemy mogli zobaczyć korzyści płynące z organizowania naszego flow w strukturę grafu.
Najpierw spójrzmy na poprzednią wersję:
typescript
/* rest of the code */
export async function summarizeDocuments(
documents: LocalDocument[],
maxIterations = 5,
) {
const formattedDocs = documents.map(
(doc) =>
new Document({
pageContent: doc.content,
metadata: {
title: doc.title,
link: doc.link,
date: doc.date,
source: doc.source,
selector: doc.selector,
index: doc.index,
},
}),
);
let summaries = await runMappers(formattedDocs);
const shouldCollapse = await checkShouldCollapse(summaries);
if (shouldCollapse) {
summaries = await collapseSummaries(summaries, maxIterations);
}
const finalSummary = await reduceSummaries(summaries);
console.log("finalSummary", finalSummary);
}
/* rest of the code */Na pierwszy rzut oka wszystko wydaje się jasne i przejrzyste — i rzeczywiście takie jest. Początkowo możemy nie zauważyć, że funkcja collapseSummaries zawiera rekurencję, ale prawdopodobnie dostrzeżemy to przy drugim przejrzeniu. Teraz wyobraźmy sobie, że zdecydujemy się dodać więcej takich funkcji: funkcja główna byłaby pełna ukrytych akcji i mogłaby szybko stać się spaghetti code.
Jeśli nie planujemy rozszerzać tej logiki, to ok. Jeśli jednak mamy na myśli dodanie kolejnych działań do tego flow, graf naprawdę będzie tutaj błyszczał:
typescript
import { StateGraph } from "@langchain/langgraph";
/* rest of the code */
const graphBuilder = new StateGraph(OverallState)
// registering nodes
.addNode("generateSummary", generateSummary)
.addNode("collectSummaries", collectSummaries)
.addNode("collapseSummaries", collapseSummaries)
.addNode("generateFinalSummary", generateFinalSummary)
.addNode("preProcess", preProcess)
// preprocessing phase
.addEdge("__start__", "preProcess")
// mapping phase
.addConditionalEdges("preProcess", mapContents, ["generateSummary"])
.addEdge("generateSummary", "collectSummaries")
// reducing phase
.addConditionalEdges("collectSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addConditionalEdges("collapseSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addEdge("generateFinalSummary", "__end__");
const graph = graphBuilder.compile();
export default graph;Oto sprytna i elastyczna abstrakcja dla naszego flow.
Najpierw inicjalizujemy instancję StateGraph, przekazując stan, który będzie dostępny we wszystkich nodes i edges. Następnie używamy metody addNode, aby zarejestrować nasze główne funkcje odpowiedzialne za mapowanie i redukowanie podsumowań, przekazując nazwę node'a jako string. Nazwa node'a pełni rolę aliasu dla funkcji, która jest przekazywana jako drugi argument
Następnie rejestrujemy edges, korzystając z metod addEdge i addConditionalEdges. Standardowe edges działają jak połączenia między dwoma funkcjami. Na przykład w tym przypadku: .addEdge("generateSummary", "collectSummaries") wiemy, że po wywołaniu funkcji generateSummary zostanie wywołana funkcja collectSummaries.
Możemy także użyć metody addConditionalEdge, aby określić, który node lub które nodes powinny zostać wywołane. Pierwszym argumentem, podobnie jak w standardowej metodzie addEdge, jest node wejściowy. Następnie przekazujemy funkcję, która decyduje, który node ma być następny, trzecim argumentem jest tablica możliwych wyborów, które może zwrócić funkcja warunkowa. Ta metoda pozwala nam również uruchamiać wiele nodes równolegle — i właśnie to tutaj robimy. We fragmencie: .addConditionalEdges("preProcess", mapContents, ["generateSummary"]) przechodzimy od funkcji preProcess do funkcji mapContents, która uruchamia wiele nodes generateSummary przy użyciu funkcji Send.
typescript
const mapContents = (state: typeof OverallState.State) => {
return state.contents.map(
(content) => new Send("generateSummary", { content }),
);
};Nodes __start__ i __end__ są predefiniowane, wskazują początek oraz koniec grafu.
I teraz ostatnia rzecz: czy zauważyłeś przypadek rekurencji w naszym grafie? Oczywiście występuje on w tym fragmencie:
typescript
/* rest of the code */
.addConditionalEdges("collectSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addConditionalEdges("collapseSummaries", shouldCollapse, [
"collapseSummaries",
"generateFinalSummary",
])
.addEdge("generateFinalSummary", "__end__");
/* rest of the code */W collapseSummaries pobieramy wszystkie podsumowania z fazy mapowania, a następnie sprawdzamy, czy powinny zostać skompresowane. Jeśli nie, przechodzimy do generateFinalSummary, które jest połączone z node'em __end__oznaczającym koniec wykonania. W przeciwnym razie wracamy do collapseSummaries. Mając taką strukturę możemy szybko zidentyfikować pętlę, analizując samą strukturę drzewa.
Dzięki tej reprezentacji możemy nawet narysować diagram, aby zobrazować flow w strukturze grafu:

Podsumowanie
Dzięki langgraph możemy bezproblemowo zaimplementować graf. Langgraph dostarcza wygodne narzędzia do wykorzystania struktury grafu w naszym flow, bez konieczności pisania niskopoziomowego kodu dla abstrakcji grafu. Programiści doceniają frameworki i wzorce projektowe. Budując frameworki wokół dobrze znanych struktur danych, możemy mieć pewność, że każdy nowy członek zespołu szybciej stanie się produktywny. W przypadku tworzenia aplikacji opartych na LLM'ach jest to kluczowe, ponieważ często trzeba zaimplementować skomplikowane flow, obsługujące wiele zadań z niedeterministycznym wynikiem. Oczywiście, w sytuacjach, gdy proces jest prosty i wiemy, że nie będzie rozwijany w przyszłości, tworzenie dodatkowej abstrakcji jest niedojrzałe, dlatego konieczne jest staranne planowanie.
Dziękuję za lekturę i mam nadzieję, że nauczyłeś się czegoś z moich treści. Do następnego artyukułu! Stay tuned. 👋
PS: Sprawdź repozytorium GitHub związane z tym artykułem.
