typescript and javascript logo
author avatar

Grzegorz Dubiel

31-12-2025

Użycie RabbitMQ w nowoczesnym Node.js: praktyczny przykład fanout exchange

Celem aplikacji, które tworzymy, jest automatyzacja procesów i ułatwianie życia ludziom. Podczas automatyzacji często konieczne jest wykonywanie wielu zadań, z których wiele można — a nawet należy — wykonywać równolegle. W tym celu tworzymy niezależne usługi odpowiedzialne za obsługę konkretnych zadań. Usługi te zazwyczaj oczekują na wiadomość, aby rozpocząć przetwarzanie danych.

Mogłeś zauważyć, że mówię o architekturze mikroserwisów — i częściowo masz rację. Warto jednak zaznaczyć, że message brokery nie są wykorzystywane wyłącznie w świecie mikroserwisów. Skupmy się na tym jak działa message broker w ekosystemie Node.js. Narzędziem którym wybrałem jest RabbitMQ.

Czym jest RabbitMQ

RabbitMQ jest brokerem wiadomości — usługą odpowiedzialną za dystrybucję wiadomości. Proces komunikacji jest obsługiwany przez producer'a, który publikuje wiadomość do exchange. Następnie exchange kieruje wiadomość do odpowiednich kolejek, z których consumerzy mogą ją odbierać i wykonywać działania na podstawie otrzymanych danych.

Warstwa exchange jest potężną abstrakcją odpowiedzialną za routowanie wiadomości do kolejek. Pozwala to na przykład wysłać kopie tej samej wiadomości do wielu kolejek, umożliwiając wielu consumer'om niezależne przetwarzanie tej samej wiadomości. Dzięki użyciu osobnych kolejek każdy consumer może np. stosować własny mechanizm back-pressure, co pozwala zaimplementować zachowanie typu publish/subscribe przy użyciu kolejek.

RabbitMQ implementuje protokół AMQP (Advanced Message Queuing Protocol), który umożliwia niezawodną komunikację opartą na wiadomościach z wykorzystaniem kolejek. Protokół AMQP definiuje sposób wysyłania wiadomości przez producer'a, sposób ich routowania przez brokera oraz to, jak współdziałają kolejki, exchanges i bindings. Określa on również, w jaki sposób komunikacja między klientami a brokerem odbywa się za pośrednictwem protokołu TCP.

Exchange

Jest to bardzo potężna i istotna funkcja w RabbitMQ. Jak wspomniałem wcześniej, exchange odbiera wiadomość od producer'a i kieruje ją do odpowiednich kolejek zgodnie z zadeklarowanym typem wymiany.

Istnieje wiele typów exchanges. W tym artykule omówimy następujące:

  • fanout odpowiada za odbieranie wiadomości i wysyłanie jej kopii do wszystkich kolejek powiązanych z exchange, co pozwala wielu consumer'om przetwarzać tę samą wiadomość za pośrednictwem własnych kolejek.
  • direct kieruje wiadomość do kolejki, której binding key odpowiada routing key podanemu podczas publikowania wiadomości. Zapewnia to komunikację w postaci producer → exchange → konkretna kolejka (routing_key) → consumer.

Jeśli chcesz dowiedzieć się więcej o pozostałych typach exchange, możesz znaleźć je w oficjalnej dokumentacji RabbitMQ.

Zarys Projektu

Jeśli czytałeś którykolwiek z moich wcześniejszych artykułów, wiesz, że lubię omawiać konkretne funkcje, wzorce poprzez budowanie praktycznych, działających projektów. Porozmawiajmy więc o projekcie, który zbudujemy.

Aplikacja, którą zbudujemy, będzie pełnić rolę orkiestratora metadanych postów. Jej zadaniem jest obserwowanie folderu i oczekiwanie na pojawienie się posta w postaci Markdown. Gdy takie zdarzenie wystąpi, producent wysyła wiadomość zawierającą metadane pliku — takie jak nazwa pliku i ścieżka do niego — przez wymianę typu fanout do dwóch kolejek. Będziemy mieć dwóch consumer'ów:

  • Serwis SEO odpowiedzialny za generowanie metadanych SEO dla posta przy użyciu modelu LLM, a następnie zapisanie wyników w MongoDB.

  • Serwis Github Upload odpowiedzialny za tworzenie pull requesta z plikiem MD posta w skonfigurowanym repozytorium.

Zaimplementujemy również dead-letter exchange, aby móc ponownie przetwarzać martwe wiadomości, korzystając z typu exchange direct. Każda usługa będzie miała osobną kolejkę przeznaczoną do obsługi martwych wiadomości.

Kod projektu będzie przechowywany w monorepo, które będzie zarządzane przy użyciu PNPM workspaces.

Struktura monorepo będzie wyglądać następująco:

Markdown

rabitmq-node (monorepo)

├── 📦 packages/
│ ├── logger/
│ │ └── TypeScript logger utility package
│ └── rabbitmq/
│ └── RabbitMQ consumer/producer package

├── 🔧 services/
│ ├── seo-service/
│ │ └── SEO content analysis service (OpenAI + MongoDB)
│ └── github-upload-service/
│ └── GitHub PR-based file upload service

├── 📝 producer/
│ └── File observer + message publishing service

├── 🛠️ scripts/
│ └── Development and utility scripts

Tworzenie Wrapper'a RabbitMQ

Stworzymy dwie klasy: jedną dla producer'a i jedną dla consumer'a. Te dwie klasy wyraźnie demonstrują, jak działa RabbitMQ.

Producer

Na początku tworzymy dwa interfejsy dla konfiguracji kolejek. Określają one strukturę konfiguracji zgodną z wymaganiami RabbitMQ.

typescript

export interface QueueConfig {
  name: string;
  routingKey?: string;
  maxRetries?: number;
  consumerType: string;
}

export interface ProducerConfig {
  rabbitmqUrl: string;
  exchangeName: string;
  exchangeType?: "fanout" | "topic" | "direct";
  queues?: QueueConfig[];
}

rabbitmqUrl pozwala podać URL do połączenia z usługą message brokera. exchangeName jest wymagany do skonfigurowania exchange, który przechowuje kolejki. Każda kolejka musi mieć przynajmniej określony name, opcjonalnie może zawierać routingKey i maxRetries, które będą używane do konfiguracji kolejek obsługi błędów (dead-letter).

Teraz zdefiniujmy klasę producera oraz jego metodę connect:

typescript

// Rest of the code
export class RabbitMQProducer {
  // Rest of the code...
  private async setupExchange() {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const exchangeType = this.config.exchangeType || "fanout";
    await this.channel.assertExchange(this.config.exchangeName, exchangeType, {
      durable: true,
    });
    this.logger.info(
      `Exchange declared: ${this.config.exchangeName} (${exchangeType})`,
    );
  }

  private getCentralDLXName() {
    return "app.dlx";
  }

  private async setupCentralDLX() {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const dlxName = this.getCentralDLXName();
    await this.channel.assertExchange(dlxName, "direct", {
      durable: true,
    });
    this.logger.info(`Central Dead Letter Exchange declared: ${dlxName}`);
  }
}

Aby skonfigurować exchange, musimy go utworzyć(assert), co oznacza, że zostanie utworzony, jeśli nie istnieje. Exchange jest tworzony w uprzednio utworzonym kanale. Następnie musimy podać trzy argumenty do metody assertExchange. Pierwszy to nazwa exchange, drugi to typ exchange (w naszym przypadku domyślnie fanout), a trzeci to obiekt konfiguracji, w którym określamy, czy exchange jest trwały (durable), czy nie. Ten argument jest ważny, ponieważ trwały exchange przetrwa restarty RabbitMQ, co w większości przypadków jest dobrym wyborem, czyniąc aplikację bardiej niezawodną.

Kolejne metody konfigurują exchange do obsługi błędów. Tutaj widać wyraźną różnicę między typami exchange. Exchange obsługujący błędy ma typ direct, natomiast exchange służący do dostarczania wiadomości do wielu usług jednocześnie ma typ fanout. Jak wspomniałem wcześniej, exchange typu fanout wysyła kopię tej samej wiadomości do wszystkich powiązanych kolejek, podczas gdy exchange typu direct dostarcza wiadomość bezpośrednio do kolejki powiązanej z nim i określonej przez klucz routingu.

Nadszedł czas na zdefiniowanie metod dla kolejek. Musimy stworzyć metody zarówno dla standardowych kolejek, jak i dla kolejek dead-letter:

typescript

// Rest of the code
export class RabbitMQProducer {
  // Rest of the code...
  private getDeadLetterQueueName(consumerType: string) {
    return `dlq.${consumerType}`;
  }
  private async setupMainQueue(queue: QueueConfig) {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const dlxName = this.getCentralDLXName();

    await this.channel.assertQueue(queue.name, {
      durable: true,
      arguments: {
        "x-dead-letter-exchange": dlxName,
        "x-dead-letter-routing-key": queue.consumerType,
      },
    });
    this.logger.info(`Queue declared: ${queue.name}`);

    await this.channel.bindQueue(
      queue.name,
      this.config.exchangeName,
      queue.routingKey || "",
    );
    this.logger.info(
      `Queue bound: ${queue.name} -> ${this.config.exchangeName}`,
    );
  }
  private async setupDeadLetterQueue(queue: QueueConfig) {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const maxRetries = queue.maxRetries ?? 4;
    const dlxName = this.getCentralDLXName();
    const dlQueueName = this.getDeadLetterQueueName(queue.consumerType);
    const retryDelayMs = 5000 * maxRetries;

    await this.channel.assertQueue(dlQueueName, {
      durable: true,
      arguments: {
        "x-dead-letter-exchange": this.config.exchangeName,
        "x-dead-letter-routing-key": queue.consumerType,
        "x-message-ttl": retryDelayMs,
      },
    });
    this.logger.info(`Dead Letter Queue declared: ${dlQueueName}`);

    await this.channel.bindQueue(dlQueueName, dlxName, queue.consumerType);
    this.logger.info(
      `Dead Letter Queue bound: ${dlQueueName} -> ${dlxName} (routing key: ${queue.consumerType})`,
    );
  }
}

Aby skonfigurować kolejkę, musimy użyć metody assertQueue. Metoda ta wymaga dwóch argumentów: nazwy kolejki (pobranej z konfiguracji) oraz obiektu konfiguracji. W obiekcie konfiguracji określamy dwie rzeczy: czy kolejka ma być trwała (durable), oraz argumenty służące do ustawienia referencji do kolejki DLX (dead-letter exchange). x-dead-letter-exchange wskazuje DLX exchange utworzony wcześniej, a x-dead-letter-routing-key jest używany przez DLX exchange jako wskazanie, do której kolejki wiadomość powinna zostać wysłana.

Metody do konfiguracji kolejki dead-letter wyglądają podobnie; argumenty przekazywane do assertQueue są prawie takie same. Dodatkowo musimy podać x-message-ttl, który określa, kiedy wiadomość dead-letter powinna zostać ponownie wysłana.

Następnie tworzymy metodę do skonfigurowania wszystkiego razem przy użyciu uprzednio zdefiniowanych metod:

typescript

// Rest of the code
export class RabbitMQProducer {
  // Rest of the code...
  async setup() {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }
    if (!this.connection) {
      throw new Error("Connection not initialized");
    }

    try {
      await this.setupExchange();
      await this.setupCentralDLX();

      const setupPromises = (this.config.queues || []).map(async (queue) => {
        await this.setupDeadLetterQueue(queue);
        await this.setupMainQueue(queue);
      });
      await Promise.all(setupPromises);
      this.logger.info(
        `Infrastructure setup complete for exchange: ${this.config.exchangeName}`,
      );
    } catch (error) {
      this.logger.error(
        "Failed to setup RabbitMQ infrastructure:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

W metodzie setup wywołujemy wszystkie metody służące do konfiguracji exchange’ów i kolejek.

Kolejną metodą, którą musimy zdefiniować, jest metoda do publikowania wiadomości:

typescript

export class RabbitMQProducer {
  // Rest of the code...
  async publish(message: any, routingKey: string = "") {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    try {
      const messageBuffer = Buffer.from(JSON.stringify(message));
      const sent = this.channel.publish(
        this.config.exchangeName,
        routingKey,
        messageBuffer,
        {
          persistent: true,
        },
      );

      if (sent) {
        this.logger.debug(
          `Message published to exchange ${this.config.exchangeName}:`,
          message,
        );
      } else {
        this.logger.warn("Message was not sent, buffer full");
      }

      return sent;
    } catch (error) {
      this.logger.error(
        "Error publishing message:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

W metodzie publish wysyłamy wiadomość za pomocą metody publish z obiektu channel. Metoda ta wymaga czterech argumentów: nazwy exchange, która odnosi się do exchange odpowiedzialnego za kierowanie wiadomości do właściwych kolejek; routingKey, który, jeśli jest podany, określa kolejkę lub kolejki, do których wiadomość powinna zostać wysłana; messageBuffer, który zawiera treść wiadomości; oraz obiektu konfiguracji, w którym ustawiamy wiadomość jako trwałą (persistent).

Ostatnią metodą jest disconnect, która pozwala na bezpieczne rozłączenie producer'a.

typescript

// Rest of the code...
export class RabbitMQProducer {
  // Rest of the code...
  async disconnect() {
    try {
      if (this.channel) {
        await this.channel.close();
      }
      if (this.connection) {
        await this.connection.close();
      }
      this.logger.info("Disconnected from RabbitMQ");
    } catch (error) {
      this.logger.error(
        "Error disconnecting from RabbitMQ:",
        error instanceof Error ? error.message : String(error),
      );
    }
  }
}

Consumer

Klasa consumer'a powinna być mniej skomplikowana, ponieważ musimy jedynie połączyć się z RabbitMQ, a następnie nasłuchiwać przychodzących wiadomości.

Zdefiniujmy klasę wraz z jej pierwszą metodą, connect:

typescript

import amqp from "amqplib";
import { ILogger } from "./logger";
import { ConsumerMessage, ConsumerConfig } from "./types";
import { Connection, Channel, Message } from "./amqplib.types";

export class RabbitMQConsumer {
  private connection: Connection | null = null;
  private channel: Channel | null = null;
  private config: ConsumerConfig;
  private logger: ILogger;

  constructor(config: ConsumerConfig, logger: ILogger) {
    this.config = {
      prefetch: 1,
      ...config,
    };
    this.logger = logger;
  }

  async connect() {
    try {
      this.connection = (await amqp.connect(
        this.config.rabbitmqUrl,
      )) as any as Connection;
      this.logger.info("Connected to RabbitMQ");

      this.channel = await this.connection.createChannel();
      this.logger.info("Channel created");

      this.connection.on("error", (err: Error) => {
        this.logger.error("RabbitMQ connection error:", err.message);
      });

      this.connection.on("close", () => {
        this.logger.warn("RabbitMQ connection closed");
      });
    } catch (error) {
      this.logger.error(
        "Failed to connect to RabbitMQ:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

Ta metoda tworzy połączenie z usługą RabbitMQ, ustanawia kanał oraz ustawia obsługę generycznych zdarzeń.

Kolejną metodą jest metoda to setup'u consumer'a. Metoda ta pozwala przypisać obsługę przychodzących wiadomości:

typescript

// rest of the code...
export class RabbitMQConsumer {
  // rest of the code...
  async setupConsumer(messageHandler: MessageHandler) {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    try {
      await this.channel.prefetch(this.config.prefetch || 1);

      this.channel.consume(
        this.config.queueName,
        async (msg: Message | null) => {
          if (!msg) {
            return;
          }

          let parsedMessage: ConsumerMessage | null = null;

          try {
            const content = msg.content.toString();
            parsedMessage = JSON.parse(content) as ConsumerMessage;

            this.logger.info(`Received message: ${parsedMessage.filename}`);

            await messageHandler(parsedMessage);

            this.channel!.ack(msg);
            this.logger.debug(
              `Message acknowledged: ${parsedMessage.filename}`,
            );
          } catch (error) {
            const filename = parsedMessage?.filename || "unknown";

            this.logger.error(
              `Error processing message: ${filename}`,
              error instanceof Error ? error.message : String(error),
            );

            this.channel!.nack(msg, false, false);
          }
        },
      );

      this.logger.info(
        `Started consuming from queue: ${this.config.queueName}`,
      );
    } catch (error) {
      this.logger.error(
        "Failed to setup consumer:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

W kwestii obsługi sukcesu i błędów wszystko jest zarządzane za pomocą metod ack i nack. Mówiąc prościej: gdy wywoływana jest metoda ack, wiadomość jest uznawana za prawidłowo przetworzoną; gdy wywoływana jest metoda nack, wiadomość jest uznawana za nieprzetworzoną i zostanie wysłana do kolejki dead-letter w celu ponownego wysłania do docelowej kolejki.

Ostatnią metodą jest metoda do rozłączenia się z usługą RabbitMQ.

typescript

// rest of the code...
export class RabbitMQConsumer {
  // rest of the code...
  async disconnect() {
    try {
      await this?.channel?.close();
      await this?.connection?.close();

      this.logger.info("Disconnected from RabbitMQ");
    } catch (error) {
      this.logger.error(
        "Error disconnecting from RabbitMQ:",
        error instanceof Error ? error.message : String(error),
      );
    }
  }
}

Użycie RabbitMQ

Mamy wszystko skonfigurowane i gotowe do użycia, więc możemy zaimplementować główną usługę, która będzie odpowiedzialna za monitorowanie folderu i uruchamianie wysyłki wiadomości do oczekujących usług.

Nazwiemy ten serwis: "producer".

Przygotujmy konfigurację dla RabbitMQ:

typescript

import { RabbitMQProducer, type ProducerConfig } from "@monorepo/rabbitmq";

const EXCHANGE_NAME = "content-fanout";
const EXCHANGE_TYPE = "fanout";

const producerConfig: ProducerConfig = {
  rabbitmqUrl: process.env.RABBITMQ_URL || "amqp://guest:guest@localhost:5672",
  exchangeName: EXCHANGE_NAME,
  exchangeType: EXCHANGE_TYPE as "fanout" | "topic" | "direct",
  queues: [
    {
      name: "seo-queue",
      consumerType: "seo",
      routingKey: "seo",
      maxRetries: 4,
    },
    {
      name: "github-upload-queue",
      consumerType: "github-upload",
      routingKey: "github-upload",
      maxRetries: 4,
    },
  ],
};

Tutaj definiujemy dwie kolejki dla naszych usług - consumerów, które będą odbierać wiadomości wyzwalające przetwarzanie plików. Kolejki dead-letter zostaną skonfigurowane automatycznie, ponieważ zaimplementowaliśmy tą funkcjonalność w module producera(packages).

Teraz możemy zdefiniować funkcje, które będą wywoływać metody do łączenia się z usługą message brokera, publikowania wiadomości oraz zamykania połączenia.

typescript

// Rest of the code...
let producer: RabbitMQProducer | null = null;

export async function connectRabbitMQ(): Promise<void> {
  let lastError: Error | null = null;

  try {
    producer = new RabbitMQProducer(producerConfig, logger);
    await producer.connect();
    await producer.setup();
    logger.info("RabbitMQ connected successfully");
    return;
  } catch (error) {
    lastError = error instanceof Error ? error : new Error(String(error));
  }

  logger.error("Failed to connect to RabbitMQ after all retries");
  throw lastError || new Error("Failed to connect to RabbitMQ");
}

export async function publishMessage(
  message: any,
  routingKey: string = "",
): Promise<boolean> {
  try {
    if (!producer) {
      throw new Error(
        "RabbitMQ producer not initialized. Call connectRabbitMQ first.",
      );
    }
    const sent = await producer.publish(message, routingKey);
    if (sent) {
      logger.debug(`Message published to exchange ${EXCHANGE_NAME}:`, message);
    } else {
      logger.warn("Message was not sent, buffer full");
    }
    return sent;
  } catch (error) {
    logger.error("Error publishing message:", error);
    throw error;
  }
}

export async function closeConnection(): Promise<void> {
  try {
    if (producer) {
      await producer.disconnect();
    }
    logger.info("RabbitMQ connection closed");
  } catch (error) {
    logger.error("Error closing RabbitMQ connection:", error);
  }
}

Te funkcje będą wywoływane bezpośrednio w module fileObserver, który wygląda następująco:

typescript

import { watch } from "chokidar";
import * as fs from "fs";
import * as path from "path";
import { v4 as uuidv4 } from "uuid";
import { CONTENT_FOLDER, FILE_EXTENSION, WATCHER_OPTIONS } from "./consts";
import { logger } from "./logger";
import { publishMessage } from "./rabbitmq";

interface FileMessage {
  sourceId: string;
  filename: string;
  path: string;
  timestamp: string;
}

async function handleFileAdded(filePath: string): Promise<void> {
  try {
    const fileName = path.basename(filePath);
    const ext = path.extname(filePath);

    if (ext !== FILE_EXTENSION) {
      logger.debug(`Skipping non-markdown file: ${fileName}`);
      return;
    }

    logger.info(`File detected: ${fileName}`);

    if (!fs.existsSync(filePath)) {
      logger.warn(`File does not exist: ${fileName}`);
      return;
    }

    const message: FileMessage = {
      sourceId: uuidv4(),
      filename: fileName,
      path: path.resolve(filePath),
      timestamp: new Date().toISOString(),
    };

    await publishMessage(message, "");
    logger.info(`File processed and published: ${fileName}`);
  } catch (error) {
    logger.error(`Error processing file: ${filePath}`, error);
  }
}

export async function startFileObserver(): Promise<void> {
  try {
    if (!fs.existsSync(CONTENT_FOLDER)) {
      fs.mkdirSync(CONTENT_FOLDER, { recursive: true });
      logger.info(`Created content folder: ${CONTENT_FOLDER}`);
    }

    logger.info(`Starting file observer on: ${CONTENT_FOLDER}`);

    const watcher = watch(CONTENT_FOLDER, WATCHER_OPTIONS);

    watcher.on("add", (filePath) => {
      logger.debug(`File added event: ${filePath}`);
      handleFileAdded(filePath);
    });

    watcher.on("change", (filePath) => {
      logger.debug(`File changed event: ${filePath}`);
      handleFileAdded(filePath);
    });

    watcher.on("error", (error) => {
      logger.error("File watcher error:", error);
    });

    logger.info("File observer started successfully");
  } catch (error) {
    logger.error("Failed to start file observer:", error);
    throw error;
  }
}

Serwisy Consumer'ów

Dzięki reużywalnym klasom, które stworzyliśmy w packages, możemy teraz łatwo wywoływać metody, nie martwiąc się o niskopoziomowe szczegóły związane z samym message brokerem.

Kod do obsługi wiadomości wygląda następująco:

typescript

// github-upload-service

// ...rest of the code
consumer = new RabbitMQConsumer(
  {
    rabbitmqUrl: env.RABBITMQ_URL,
    exchangeName: GITHUB_EXCHANGE,
    queueName: GITHUB_UPLOAD_QUEUE,
    prefetch: 1,
  },
  logger,
);

await consumer.connect();
logger.info("Connected to RabbitMQ");

await consumer.setupConsumer(async (message: ConsumerMessage) => {
  if (!message || !message.filename) {
    logger.warn("Received invalid message, skipping");
    return;
  }

  const validatedMessage = GithubUploadMessageSchema.parse(message);
  logger.info("Received message for GitHub upload", {
    filename: validatedMessage.filename,
    sourceId: validatedMessage.sourceId,
  });

  await uploadFileViaPR({
    octokit,
    message: validatedMessage,
    owner: env.GITHUB_OWNER,
    repo: env.GITHUB_REPO,
    uploadPath: env.GITHUB_UPLOAD_PATH,
    baseBranch: env.GITHUB_BRANCH,
    prTitlePrefix: env.GITHUB_PR_TITLE_PREFIX,
  });
});

// ...rest of the code

Musimy utworzyć instancję klasy consumera. Musimy podać wszystkie argumenty odpowiadające usłudze zdefiniowanej wcześniej podczas tworzenia producera. Właściwość prefetch określa, ile wiadomości consumer może pobrać z kolejki przed ich potwierdzeniem(ack). W tym przypadku oznacza to, że usługa będzie przetwarzać tylko jedną wiadomość naraz.

Metoda setupConsumer przyjmuje obsługę wiadomości, która zostanie wykonana po jej nadejściu. W tym przypadku dane wiadomości są weryfikowane i przekazywane do funkcji odpowiedzialnej za utworzenie pull requesta z pliku, którego referencja została przesłana w treści wiadomości.

W seo-service kod consumer'a jest prawie taki sam:

typescript

// rest of the code...
consumer = new RabbitMQConsumer(
  {
    rabbitmqUrl,
    exchangeName: RABBITMQ_EXCHANGE,
    queueName: SEO_WORKER_QUEUE,
    prefetch: 1,
  },
  logger,
);

await consumer.connect();
logger.info("Connected to RabbitMQ");

await consumer.setupConsumer(async (message) => {
  if (!message || !message.filename) {
    logger.warn("Received invalid message, skipping");
    return;
  }
  logger.info("Received message for processing", {
    filename: message.filename,
  });
  await processor.processContent(message);
});
// rest of the code...

Podsumowanie

Postanowiłem pokazać raczej prosty przykład użycia RabbitMQ w NodeJS, ale z real-world scenariuszem użycia. Nie użyliśmy żadnego frameworka, aby mieć neutralną, surową implementację w środowisku NodeJS. W świecie złożonych systemów działających asynchronicznie musimy brać pod uwagę wiele wzorców i technologii. Dodatkowo często generujemy dużo kodu przy pomocy AI, kodu, który musi być przez nas nadzorowany, dlatego solidne opanowanie funkcji ekosystemu jest bezcenne. Takie przykłady świetnie sprawdzają się dla mnie, kiedy chcę nauczyć się nowej technologii albo odświeżyć swoją wiedzę. Zachęcam do nauki przez praktykę — jeśli chcesz poznać wzorzec lub bibliotekę, spróbuj coś stworzyć z jej pomocą, nawet małą aplikację rozwiązującą jakiś problem. Nie ma znaczenia, czy na rynku istnieje już rozwiązanie; staw czoła problemom i zdobądź praktyczne doświadczenie z biblioteką. Przy szerszych tematach, takich jak wzorce architektoniczne czy inne szersze koncepcje,staraj się je rozbić i zrozumieć każdą część, eksperymentując z nimi, jeżeli to możliwe, osobno. Obecnie wiedza i praktyczne doświadczenie są kluczem do sukcesu w karierze programisty.

Mam nadzieję, że zechcesz zgłębić przykład z tego artykułu. Oto repozytorium!

Stay tuned👋🏻

typescript and javascript logogreg@aboutjs.dev

©Grzegorz Dubiel | 2025