typescript and javascript logo
author avatar

Grzegorz Dubiel

26-11-2025

Wzorzec Thread Pool na Ratunek w Reactowych Aplikacjach Wykonujących Skomplikowane Obliczenia

Gdy mówię „kolejki” — myślisz backend lub mikroserwisy. Mówię „wątki” — znowu myślisz backend. To normalne. Niektóre wzorce naturalnie kojarzą się z określonymi obszarami, i to jest w porządku — mam dokładnie tak samo. Kluczem jest jednak umiejętność wykorzystania swojej wiedzy i znajomości tych wzorców w innych częściach procesu tworzenia oprogramowania.

W tym wpisie pokażę, jak wzorzec Thread Pool można zastosować w aplikacji React, aby znacząco zoptymalizować zadania wymagające dużej mocy obliczeniowej. Faktem jest, że większość aplikacji React'owych działa całości na głównym wątku, ponieważ tak domyślnie działa JavaScript.

Wspomniałem już o przenoszeniu ciężkich operacji CPU do Web Workera w moim poprzednim artykule , gdzie zaprezentowałem przykład aplikacji React uruchamiającej inferencję rozpoznawania oraz detekcji twarzy całkowicie w przeglądarce. Teraz chciałbym zoptymalizować ten przykład, stosując wzorzec Thread Pool.

Zanim przejdziemy do kodu, spójrzmy na wysokopoziomowe wyjaśnienie tego wzorca.

Czym jest Wzorzec Thread Pool

Wzorzec Thread Pool to wzorzec projektowy, w którym przygotowywana jest stała liczba wątków roboczych do wykonywania zadań. Zadania są wysyłane do tych wątków, a gdy wszystkie wątki są zajęte, nowe zadania trafiają do kolejki. Gdy którykolwiek z wątków się zwolni, następne zadanie jest pobierane z kolejki i do niego przypisywane. Zadania są zazwyczaj pobierane z kolejki zgodnie z zasadą FIFO (first in, first out).

Przegląd Istniejącego Kodu

Aplikacja, którą będziemy optymalizować, to aplikacja React pobierająca przykładowe zdjęcie z wzorcowymi twarzami oraz zestaw innych obrazów. Każda twarz, która pasuje do twarzy z przykładowego zdjęcia, zostaje rozmyta.

Logika aplikacji składa się z dwóch głównych modułów:

  1. useFace — hook, który zawiera logikę wywoływania inferencji wykonywanej wewnątrz Web Workera (w osobnym wątku). Pełniąca rolę „spoiwa” między workerem a stanem Reacta

typescript

import { useEffect, useRef, useState } from "react";
import * as faceapi from "face-api.js";
import * as Comlink from "comlink";
import type { FaceDetectionWorker } from "../types";

async function getImageWithDetections(
  allExampleFaces: Float32Array[],
  targetImageData: { id: number; src: string },
  detector: (transferObj: {
    allExampleFaces: Float32Array[];
    allTargetImages: ArrayBuffer;
  }) => Promise<
    faceapi.WithFaceDescriptor<
      faceapi.WithFaceLandmarks<
        { detection: faceapi.FaceDetection },
        faceapi.FaceLandmarks68
      >
    >[]
  >,
) {
  const { id, src } = targetImageData;
  const arrayBuffer = await (await fetch(src)).arrayBuffer();

  const arrayBufferForDetector = arrayBuffer.slice(0);

  const payload = {
    allExampleFaces,
    allTargetImages: arrayBufferForDetector,
  };

  const matchedDescriptors = await detector(payload);

  return {
    id,
    src,
    imgElement: arrayBuffer,
    detections: matchedDescriptors,
  };
}

function useFace() {
  const [exampleImage, setExampleImage] = useState<string | null>(null);
  const [targetImages, setTargetImages] = useState<string[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);
  const [outputImages, setOutputImages] = useState<string[]>([]);
  const workerRef = useRef<Worker | null>(null);

  useEffect(() => {
    const runWorker = async () => {
      const worker = new Worker(
        new URL("../face-detection.worker", import.meta.url),
        { type: "module" },
      );

      workerRef.current = worker;
    };

    runWorker().catch((error) => {
      console.error("Error initializing worker:", error);
    });

    return () => {
      workerRef.current?.terminate();
    };
  }, []);

  const handleFace = async () => {
    if (!exampleImage || !workerRef.current) {
      return;
    }
    try {
      console.time("face handler performance");
      const api = Comlink.wrap<Comlink.Remote<FaceDetectionWorker>>(
        workerRef.current as any,
      );

      setIsLoading(true);
      setError(null);

      const exampleArrayBuffer = await (
        await fetch(exampleImage)
      ).arrayBuffer();

      const allExampleFaces = await api.extractAllFaces(exampleArrayBuffer);
      const targetImagesWithId = targetImages.map((targetImage, index) => ({
        id: index,
        src: targetImage,
      }));

      for (const targetImage of targetImagesWithId) {
        const imageWithDescriptors = await getImageWithDetections(
          allExampleFaces.map((face) => face.descriptor),
          targetImage,
          api.detectMatchingFaces,
        );
        const output = await api.drawOutputImage(imageWithDescriptors);

        const url = URL.createObjectURL(output);
        setOutputImages((prevState) => [...prevState, url]);
      }
      console.timeEnd("face handler performance");
    } catch (err) {
      setError("Error processing image");
      console.error(err);
    } finally {
      setIsLoading(false);
    }
  };

  return {
    handleFace,
    isLoading,
    error,
    outputImages,
    loadExampleImage: setExampleImage,
    loadTargetImages: setTargetImages,
    exampleImage,
    targetImages,
  };
}

export default useFace;

FaceDetectionWorker — klasa Web Workera, która wykonuje całą ciężką pracę, taką jak ładowanie modeli, rozpoznawanie i detekcja twarzy oraz blurrowanie.

typescript

import * as faceapi from "face-api.js";
import * as Comlink from "comlink";
import type {
  DataTransfer,
  FaceDetectionWorker,
  ImageWithDescriptors,
} from "./types";
import { serializeFaceApiResult } from "./worker/serializers";

const MODEL_PATH = `/models`;

faceapi.env.setEnv(faceapi.env.createNodejsEnv());

faceapi.env.monkeyPatch({
  //@ts-ignore
  Canvas: OffscreenCanvas,
  //@ts-ignore
  createCanvasElement: () => {
    return new OffscreenCanvas(480, 270);
  },
});

const createCanvas = async (transferObj: DataTransfer) => {
  try {
    const buf = transferObj as ArrayBuffer | undefined;
    if (!buf) {
      return new OffscreenCanvas(20, 20);
    }

    const blob = new Blob([buf]);
    const bitmap = await createImageBitmap(blob);
    const canvas = new OffscreenCanvas(bitmap.width, bitmap.height);
    const ctx = canvas.getContext("2d")!;
    ctx.drawImage(bitmap, 0, 0, bitmap.width, bitmap.height);
    return canvas;
  } catch (error) {
    console.error(
      "Error creating image from buffer, using empty canvas instead",
      error,
    );
    return new OffscreenCanvas(20, 20);
  }
};

class WorkerClass implements FaceDetectionWorker {
  async extractAllFaces(transferObj: DataTransfer) {
    const canvas = await createCanvas(transferObj);
    const detections = await faceapi
      .detectAllFaces(canvas as unknown as faceapi.TNetInput)
      .withFaceLandmarks()
      .withFaceDescriptors();
    return detections.map(serializeFaceApiResult);
  }

  async detectMatchingFaces(transferObj: {
    allExampleFaces: Float32Array[];
    allTargetImages: ArrayBuffer;
  }) {
    const allExampleFaces = transferObj.allExampleFaces;
    const detections = await this.extractAllFaces(transferObj.allTargetImages);
    const threshold = 0.5;

    const matchedDescriptors = detections.filter(({ descriptor }) => {
      return allExampleFaces.some((exampleDescriptor) => {
        const distance = faceapi.euclideanDistance(
          exampleDescriptor,
          descriptor,
        );
        return distance < threshold;
      });
    });
    return matchedDescriptors.map(serializeFaceApiResult);
  }

  async drawOutputImage(imageWithDescriptors: ImageWithDescriptors) {
    const decodedCanvas = await createCanvas(imageWithDescriptors.imgElement);

    const canvas = new OffscreenCanvas(
      decodedCanvas.width,
      decodedCanvas.height,
    );
    const ctxRes = canvas.getContext("2d", { willReadFrequently: true })!;
    const detections = imageWithDescriptors.detections;
    ctxRes.drawImage(
      decodedCanvas as unknown as CanvasImageSource,
      0,
      0,
      canvas.width,
      canvas.height,
    );

    for (const detection of detections) {
      const { x, y, width, height } = detection?.detection?.box;

      const padding = 0.2;
      const expandedX = Math.max(0, x - width * padding);
      const expandedY = Math.max(0, y - height * padding);
      const expandedWidth = Math.min(
        canvas.width - expandedX,
        width * (1 + 2 * padding),
      );
      const expandedHeight = Math.min(
        canvas.height - expandedY,
        height * (1 + 2 * padding),
      );

      for (let i = 0; i < 3; i++) {
        ctxRes.filter = "blur(50px)";
        ctxRes.drawImage(
          decodedCanvas as unknown as CanvasImageSource,
          expandedX,
          expandedY,
          expandedWidth,
          expandedHeight,
          expandedX,
          expandedY,
          expandedWidth,
          expandedHeight,
        );
      }
      ctxRes.filter = "none";
    }

    return canvas.convertToBlob();
  }
}

async function loadModels() {
  console.log("WorkerClass init...");
  await Promise.all([
    faceapi.nets.ssdMobilenetv1.loadFromUri(MODEL_PATH),
    faceapi.nets.faceLandmark68Net.loadFromUri(MODEL_PATH),
    faceapi.nets.faceRecognitionNet.loadFromUri(MODEL_PATH),
  ]);
  console.log("worker initialized and models loaded");
}

(async () => {
  await loadModels();
  const worker = new WorkerClass();
  Comlink.expose(worker);
})();

Komunikacja między głównym wątkiem a workerem jest realizowana za pomocą biblioteki Comlink, która stanowi prostą abstrakcję nad Web Workerami.

Optymalizacja Aplikacji Za pomocą Patternu Thread Pool

Obecna implementacja rozwiązuje problem zawieszania się interfejsu, przenosząc inferencję AI do innego wątku. Jednak sama inferencja mogłaby działać szybciej, gdyby wykorzystać pozostałe dostępne wątki.

Klasa WorkerPool

Przede wszystkim musimy zaimplementować klasę, która będzie odpowiedzialna za utrzymywanie puli workerów, przydzielanie zadań oraz zarządzanie kolejką.

Na początku nasza klasa musi pobrać informację z navigator.hardwareConcurrency, aby ustalić, ile wątków jest dostępnych.

typescript

import * as Comlink from "comlink";
import type { FaceDetectionWorker } from "../types";

export class WorkerPool {
  private workers: Comlink.Remote<FaceDetectionWorker>[] = [];
  private availableWorkers: Set<number> = new Set();
  private taskQueue: Task<any>[] = [];
  private maxWorkers: number;
  private workerInstances: Worker[] = [];

  constructor(maxWorkers: number = navigator.hardwareConcurrency || 4) {
    this.maxWorkers =
      Math.max(1, Math.min(maxWorkers, navigator.hardwareConcurrency || 4)) - 1;
  }
}

Zauważ, że ze względów bezpieczeństwa najlepiej utworzyć o jednego workera mniej niż liczba dostępnych wątków, ponieważ jeden z nich będzie zawsze wykorzystywany przez główny wątek.

Następnie w metodzie init musimy uruchomić tylu workerów, ile pozwala na to skorygowana liczba wątków.

typescript

export class WorkerPool {
  // REST OF THE CODE
  async init(): Promise<void> {
    for (let i = 0; i < this.maxWorkers; i++) {
      const worker = new Worker(
        new URL("../face-detection.worker", import.meta.url),
        { type: "module" },
      );
      this.workerInstances.push(worker);
      const wrappedWorker = Comlink.wrap<Comlink.Remote<FaceDetectionWorker>>(
        worker as any,
      );
      this.workers.push(wrappedWorker);
      this.availableWorkers.add(i);
    }
  }
  // REST OF THE CODE
}

Następnie potrzebujemy metody odpowiedzialnej za wykonanie pojedynczego zadania.

typescript

interface Task<T> {
  fn: (worker: Comlink.Remote<FaceDetectionWorker>) => Promise<T>;
  resolve: (value: T) => void;
  reject: (error: Error) => void;
}

export class WorkerPool {
  private workers: Comlink.Remote<FaceDetectionWorker>[] = [];
  private availableWorkers: Set<number> = new Set();
  private taskQueue: Task<any>[] = [];

  // REST OF THE CODE
  private async executeTask<T>(
    workerIndex: number,
    task: Task<T>,
  ): Promise<void> {
    try {
      const result = await task.fn(this.workers[workerIndex]);
      task.resolve(result);
    } catch (error) {
      task.reject(error instanceof Error ? error : new Error(String(error)));
    } finally {
      this.availableWorkers.add(workerIndex);

      if (this.taskQueue.length <= 0) {
        return;
      }
      const nextTask = this.taskQueue.shift();
      if (nextTask) {
        this.availableWorkers.delete(workerIndex);
        this.executeTask(workerIndex, nextTask);
      }
    }
  }
  // REST OF THE CODE
}

Metoda executeTask jest funkcją prywatną, która przyjmuje dwa parametry. Pierwszy parametr, task, to obiekt zawierający trzy właściwości: fn, która zwraca promise z wynikiem opisanym typem generycznym, oraz resolve i reject, które obsługują scenariusze sukcesu i błędu. Pochodzą one z obiektu Promise, ponieważ zadanie jest opakowane w promise. Drugi parametr, workerIndex, to po prostu indeks workera, który jest dostępny do wykonania bieżącego zadania. Jeśli w kolejce znajdują się jakieś zadania, zostaną pobrane i przekazane do rekurencyjnego wywołania funkcji nadrzędnej.

Nadszedł czas na metodę execute, która służy do uruchamiania całego procesu zarządzania i wykonywania zadań w puli workerów.

typescript

export class WorkerPool {
  private workers: Comlink.Remote<FaceDetectionWorker>[] = [];
  private availableWorkers: Set<number> = new Set();
  private taskQueue: Task<any>[] = [];

  // REST OF THE CODE

  private async executeTask<T>(
    workerIndex: number,
    task: Task<T>,
  ): Promise<void> {
    try {
      const result = await task.fn(this.workers[workerIndex]);
      task.resolve(result);
    } catch (error) {
      task.reject(error instanceof Error ? error : new Error(String(error)));
    } finally {
      this.availableWorkers.add(workerIndex);

      if (this.taskQueue.length <= 0) {
        return;
      }
      const nextTask = this.taskQueue.shift();
      if (nextTask) {
        this.availableWorkers.delete(workerIndex);
        this.executeTask(workerIndex, nextTask);
      }
    }
  }

  async execute<T>(
    fn: (worker: Comlink.Remote<FaceDetectionWorker>) => Promise<T>,
  ): Promise<T> {
    return new Promise((resolve, reject) => {
      if (this.availableWorkers.size > 0) {
        const workerIndex = Array.from(this.availableWorkers)[0];
        this.availableWorkers.delete(workerIndex);
        this.executeTask(workerIndex, { fn, resolve, reject });
      } else {
        this.taskQueue.push({ fn, resolve, reject });
      }
    });
  }
  // REST OF THE CODE
}

Ta metoda działa w zasadzie jako funkcja wyższego rzędu. Wybrany wątek jest udostępniany poprzez przekazanie go do callbacku fn. Jeśli któryś wątek jest gotowy do przetworzenia zadania, wykonanie następuje natychmiast; w przeciwnym razie zadanie trafia do kolejki.

Ostatnią przydatną metodą do dodania jest metoda terminate.

typescript

export class WorkerPool {
  private workers: Comlink.Remote<FaceDetectionWorker>[] = [];
  private availableWorkers: Set<number> = new Set();
  private taskQueue: Task<any>[] = [];

  // REST OF THE CODE
  async terminate(): Promise<void> {
    while (
      this.taskQueue.length > 0 ||
      this.availableWorkers.size < this.maxWorkers
    ) {
      await new Promise((resolve) => setTimeout(resolve, 100));
    }

    this.workerInstances.forEach((worker) => worker.terminate());
    this.workers = [];
    this.availableWorkers.clear();
    this.taskQueue = [];
  }
  // REST OF THE CODE
}

Metoda terminate zamyka wszystkie workery w sposób kontrolowany, czekając na zakończenie wszystkich zadań przed zakończeniem działania.

Cały moduł wygląda w ten sposób:

typescript

import * as comlink from "comlink";
import type { facedetectionworker } from "../types";

interface task<t> {
  fn: (worker: comlink.remote<facedetectionworker>) => promise<t>;
  resolve: (value: t) => void;
  reject: (error: error) => void;
}

export class workerpool {
  private workers: comlink.remote<facedetectionworker>[] = [];
  private availableworkers: set<number> = new set();
  private taskqueue: task<any>[] = [];
  private maxworkers: number;
  private workerinstances: worker[] = [];

  constructor(maxworkers: number = navigator.hardwareconcurrency || 4) {
    this.maxworkers = math.max(
      1,
      math.min(maxworkers, navigator.hardwareconcurrency || 4),
    );
  }

  async init(): promise<void> {
    for (let i = 0; i < this.maxworkers; i++) {
      const worker = new worker(
        new url("../face-detection.worker", import.meta.url),
        { type: "module" },
      );
      this.workerinstances.push(worker);
      const wrappedworker = comlink.wrap<comlink.remote<facedetectionworker>>(
        worker as any,
      );
      this.workers.push(wrappedworker);
      this.availableworkers.add(i);
    }
  }

  async execute<t>(
    fn: (worker: comlink.remote<facedetectionworker>) => promise<t>,
  ): promise<t> {
    return new promise((resolve, reject) => {
      if (this.availableworkers.size > 0) {
        const workerindex = array.from(this.availableworkers)[0];
        this.availableworkers.delete(workerindex);
        this.executetask(workerindex, { fn, resolve, reject });
      } else {
        this.taskqueue.push({ fn, resolve, reject });
      }
    });
  }

  private async executetask<t>(
    workerindex: number,
    task: task<t>,
  ): promise<void> {
    try {
      const result = await task.fn(this.workers[workerindex]);
      task.resolve(result);
    } catch (error) {
      task.reject(error instanceof error ? error : new error(string(error)));
    } finally {
      this.availableworkers.add(workerindex);

      if (this.taskqueue.length <= 0) {
        return;
      }
      const nexttask = this.taskqueue.shift();
      if (nexttask) {
        this.availableworkers.delete(workerindex);
        this.executetask(workerindex, nexttask);
      }
    }
  }

  async terminate(): promise<void> {
    while (
      this.taskqueue.length > 0 ||
      this.availableworkers.size < this.maxworkers
    ) {
      await new promise((resolve) => settimeout(resolve, 100));
    }

    this.workerinstances.foreach((worker) => worker.terminate());
    this.workers = [];
    this.availableworkers.clear();
    this.taskqueue = [];
  }
}

Hook useFace

Teraz musimy dostosować hook useFace, aby wywoływał metody inferencji za pośrednictwem utworzonej puli workerów.

Pierwszym krokiem jest zastąpienie wartości workerRef, zmieniając pojedynczego, konkretnego workera na instancję klasy WorkerPool.

typescript

import { useEffect, useRef, useState } from "react";
import { WorkerPool } from "./worker-pool";
function useFace() {
  const workerPoolRef = useRef<WorkerPool | null>(null);

  useEffect(() => {
    const initializeWorkerPool = async () => {
      const pool = new WorkerPool();
      await pool.init();
      workerPoolRef.current = pool;
    };

    initializeWorkerPool().catch((error) => {
      console.error("Error initializing worker pool:", error);
    });

    return () => {
      if (workerPoolRef.current) {
        workerPoolRef.current.terminate();
      }
    };
  }, []);
}

Musimy utworzyć instancję WorkerPool i przypisać ją do workerRef. Ważne jest również, aby pamiętać o zamknięciu puli workerów w funkcji sprzątającej (cleanup).

Teraz, w funkcji handleFace, każde zadanie powinno być wykonywane w callbacku przekazywanym do metody execute.

typescript

async function getImageWithDetections(
  allExampleFaces: Float32Array[],
  targetImageData: { id: number; src: string },
  workerPool: WorkerPool,
) {
  // REST OF THE CODE
  const matchedDescriptors = await workerPool.execute((worker) =>
    worker.detectMatchingFaces(payload),
  );
  // REST OF THE CODE
}

function useFace() {
  const [exampleImage, setExampleImage] = useState<string | null>(null);
  const [targetImages, setTargetImages] = useState<string[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);
  const [outputImages, setOutputImages] = useState<string[]>([]);
  const workerPoolRef = useRef<WorkerPool | null>(null);

  // REST OF THE CODE

  const handleFace = async () => {
    // REST OF THE CODE
      const allExampleFaces = await workerPool.execute((worker) =>
        worker.extractAllFaces(exampleArrayBuffer),
      );

      const processedImages = await Promise.all(
        targetImagesWithId.map((targetImage) =>
          getImageWithDetections(
            allExampleFaces.map((face) => face.descriptor),
            targetImage,
            workerPool,
          ),
        ),
      );

      const outputPromises = processedImages.map((imageWithDescriptors) =>
        workerPool.execute((worker) =>
          worker.drawOutputImage(imageWithDescriptors),
        ),
      );
    // REST OF THE CODE
}

Porównanie Wydajności

Przeprowadzimy test, aby zobaczyć różnicę między zoptymalizowaną wersją a wersją oryginalną. Testy zostaną wykonane na komputerze z procesorem posiadającym 8 wątków, inferencja będzie przeprowadzona na 20 obrazach.

Oryginalna wersja

Do inferencji używany jest tylko jeden wątek. Interfejs użytkownika nie jest zablokowany, a aplikacja działa płynnie, ale cały proces zajmuje 71 sekund.

Wykorzystanie procesora przedstawiono na diagramie z htop:

Wykorzystanie CPU w wersji oryginalnej

Wersja Zoptymalizowana

Zoptymalizowana wersja wykorzystuje wszystkie dostępne wątki procesora. Poprawa jest znacząca — cały proces zajmuje teraz 36 sekund.

Różnicę w wykorzystaniu procesora można również zobaczyć na poniższym diagramie:

Wykorzystanie CPU w wersji zoptymalizowanej

Podsumowanie

Ostatnio wielu deweloperów front-end korzystających z Reacta mówi o meta-frameworkach, komponentach serwerowych i przenoszeniu renderowania z przeglądarki na serwer. Czasami jednak, ciężkie obliczenia muszą być wykonywane po stronie przeglądarki. Wtedy warto korzystać z funkcji oferowanych przez przeglądarkę i sam język. Zawsze dobrze jest być świadomym możliwości środowiska, w którym działa nasz język programowania.

Uważam również, że warto patrzeć poza swoją dziedzinę programowania. Jeśli zajmujesz się front-endem, poznawaj koncepcje back-endowe i odwrotnie. Większość wzorców nie jest stworzona wyłącznie dla jednej strony; niektóre po prostu są częściej stosowane w określonych kontekstach.

Mam nadzieję, że artykuł Ci się spodobał i że udało Ci się z niego wynieść coś wartościowego. 🫡

typescript and javascript logogreg@aboutjs.dev

©Grzegorz Dubiel | 2025