typescript and javascript logo
author avatar

Grzegorz Dubiel

31-12-2025

Użycie RabbitMQ w nowoczesnym Node.js: praktyczny przykład fanout exchange

Celem aplikacji, które tworzymy, jest automatyzacja procesów i ułatwianie życia ludziom. Podczas automatyzacji często konieczne jest wykonywanie wielu zadań, z których wiele można — a nawet należy — wykonywać równolegle. W tym celu tworzymy niezależne usługi odpowiedzialne za obsługę konkretnych zadań. Usługi te zazwyczaj oczekują na wiadomość, aby rozpocząć przetwarzanie danych.

Mogłeś zauważyć, że mówię o architekturze mikroserwisów — i częściowo masz rację. Warto jednak zaznaczyć, że message brokery nie są wykorzystywane wyłącznie w świecie mikroserwisów. Skupmy się na tym jak działa message broker w ekosystemie Node.js. Narzędziem którym wybrałem jest RabbitMQ.

Czym jest RabbitMQ

RabbitMQ jest brokerem wiadomości — usługą odpowiedzialną za dystrybucję wiadomości. Proces komunikacji jest obsługiwany przez producer'a, który publikuje wiadomość do exchange. Następnie exchange kieruje wiadomość do odpowiednich kolejek, z których consumerzy mogą ją odbierać i wykonywać działania na podstawie otrzymanych danych.

Warstwa exchange jest potężną abstrakcją odpowiedzialną za routowanie wiadomości do kolejek. Pozwala to na przykład wysłać kopie tej samej wiadomości do wielu kolejek, umożliwiając wielu consumer'om niezależne przetwarzanie tej samej wiadomości. Dzięki użyciu osobnych kolejek każdy consumer może np. stosować własny mechanizm back-pressure, co pozwala zaimplementować zachowanie typu publish/subscribe przy użyciu kolejek.

RabbitMQ implementuje protokół AMQP (Advanced Message Queuing Protocol), który umożliwia niezawodną komunikację opartą na wiadomościach z wykorzystaniem kolejek. Protokół AMQP definiuje sposób wysyłania wiadomości przez producer'a, sposób ich routowania przez brokera oraz to, jak współdziałają kolejki, exchanges i bindings. Określa on również, w jaki sposób komunikacja między klientami a brokerem odbywa się za pośrednictwem protokołu TCP.

Exchange

Jest to bardzo potężna i istotna funkcja w RabbitMQ. Jak wspomniałem wcześniej, exchange odbiera wiadomość od producer'a i kieruje ją do odpowiednich kolejek zgodnie z zadeklarowanym typem wymiany.

Istnieje wiele typów exchanges. W tym artykule omówimy następujące:

  • fanout odpowiada za odbieranie wiadomości i wysyłanie jej kopii do wszystkich kolejek powiązanych z exchange, co pozwala wielu consumer'om przetwarzać tę samą wiadomość za pośrednictwem własnych kolejek.
  • direct kieruje wiadomość do kolejki, której binding key odpowiada routing key podanemu podczas publikowania wiadomości. Zapewnia to komunikację w postaci producer → exchange → konkretna kolejka (routing_key) → consumer.

Jeśli chcesz dowiedzieć się więcej o pozostałych typach exchange, możesz znaleźć je w oficjalnej dokumentacji RabbitMQ.

Zarys Projektu

Jeśli czytałeś którykolwiek z moich wcześniejszych artykułów, wiesz, że lubię omawiać konkretne funkcje, wzorce poprzez budowanie praktycznych, działających projektów. Porozmawiajmy więc o projekcie, który zbudujemy.

Aplikacja, którą zbudujemy, będzie pełnić rolę orkiestratora metadanych postów. Jej zadaniem jest obserwowanie folderu i oczekiwanie na pojawienie się posta w postaci Markdown. Gdy takie zdarzenie wystąpi, producent wysyła wiadomość zawierającą metadane pliku — takie jak nazwa pliku i ścieżka do niego — przez wymianę typu fanout do dwóch kolejek. Będziemy mieć dwóch consumer'ów:

  • Serwis SEO odpowiedzialny za generowanie metadanych SEO dla posta przy użyciu modelu LLM, a następnie zapisanie wyników w MongoDB.

  • Serwis Github Upload odpowiedzialny za tworzenie pull requesta z plikiem MD posta w skonfigurowanym repozytorium.

Zaimplementujemy również dead-letter exchange, aby móc ponownie przetwarzać martwe wiadomości, korzystając z typu exchange direct. Każda usługa będzie miała osobną kolejkę przeznaczoną do obsługi martwych wiadomości.

Kod projektu będzie przechowywany w monorepo, które będzie zarządzane przy użyciu PNPM workspaces.

Struktura monorepo będzie wyglądać następująco:

Markdown

Tworzenie Wrapper'a RabbitMQ

Stworzymy dwie klasy: jedną dla producer'a i jedną dla consumer'a. Te dwie klasy wyraźnie demonstrują, jak działa RabbitMQ.

Producer

Na początku tworzymy dwa interfejsy dla konfiguracji kolejek. Określają one strukturę konfiguracji zgodną z wymaganiami RabbitMQ.

typescript

rabbitmqUrl pozwala podać URL do połączenia z usługą message brokera. exchangeName jest wymagany do skonfigurowania exchange, który przechowuje kolejki. Każda kolejka musi mieć przynajmniej określony name, opcjonalnie może zawierać routingKey i maxRetries, które będą używane do konfiguracji kolejek obsługi błędów (dead-letter).

Teraz zdefiniujmy klasę producera oraz jego metodę connect:

typescript

Aby skonfigurować exchange, musimy go utworzyć(assert), co oznacza, że zostanie utworzony, jeśli nie istnieje. Exchange jest tworzony w uprzednio utworzonym kanale. Następnie musimy podać trzy argumenty do metody assertExchange. Pierwszy to nazwa exchange, drugi to typ exchange (w naszym przypadku domyślnie fanout), a trzeci to obiekt konfiguracji, w którym określamy, czy exchange jest trwały (durable), czy nie. Ten argument jest ważny, ponieważ trwały exchange przetrwa restarty RabbitMQ, co w większości przypadków jest dobrym wyborem, czyniąc aplikację bardiej niezawodną.

Kolejne metody konfigurują exchange do obsługi błędów. Tutaj widać wyraźną różnicę między typami exchange. Exchange obsługujący błędy ma typ direct, natomiast exchange służący do dostarczania wiadomości do wielu usług jednocześnie ma typ fanout. Jak wspomniałem wcześniej, exchange typu fanout wysyła kopię tej samej wiadomości do wszystkich powiązanych kolejek, podczas gdy exchange typu direct dostarcza wiadomość bezpośrednio do kolejki powiązanej z nim i określonej przez klucz routingu.

Nadszedł czas na zdefiniowanie metod dla kolejek. Musimy stworzyć metody zarówno dla standardowych kolejek, jak i dla kolejek dead-letter:

typescript

Aby skonfigurować kolejkę, musimy użyć metody assertQueue. Metoda ta wymaga dwóch argumentów: nazwy kolejki (pobranej z konfiguracji) oraz obiektu konfiguracji. W obiekcie konfiguracji określamy dwie rzeczy: czy kolejka ma być trwała (durable), oraz argumenty służące do ustawienia referencji do kolejki DLX (dead-letter exchange). x-dead-letter-exchange wskazuje DLX exchange utworzony wcześniej, a x-dead-letter-routing-key jest używany przez DLX exchange jako wskazanie, do której kolejki wiadomość powinna zostać wysłana.

Metody do konfiguracji kolejki dead-letter wyglądają podobnie; argumenty przekazywane do assertQueue są prawie takie same. Dodatkowo musimy podać x-message-ttl, który określa, kiedy wiadomość dead-letter powinna zostać ponownie wysłana.

Następnie tworzymy metodę do skonfigurowania wszystkiego razem przy użyciu uprzednio zdefiniowanych metod:

typescript

W metodzie setup wywołujemy wszystkie metody służące do konfiguracji exchange’ów i kolejek.

Kolejną metodą, którą musimy zdefiniować, jest metoda do publikowania wiadomości:

typescript

W metodzie publish wysyłamy wiadomość za pomocą metody publish z obiektu channel. Metoda ta wymaga czterech argumentów: nazwy exchange, która odnosi się do exchange odpowiedzialnego za kierowanie wiadomości do właściwych kolejek; routingKey, który, jeśli jest podany, określa kolejkę lub kolejki, do których wiadomość powinna zostać wysłana; messageBuffer, który zawiera treść wiadomości; oraz obiektu konfiguracji, w którym ustawiamy wiadomość jako trwałą (persistent).

Ostatnią metodą jest disconnect, która pozwala na bezpieczne rozłączenie producer'a.

typescript

Consumer

Klasa consumer'a powinna być mniej skomplikowana, ponieważ musimy jedynie połączyć się z RabbitMQ, a następnie nasłuchiwać przychodzących wiadomości.

Zdefiniujmy klasę wraz z jej pierwszą metodą, connect:

typescript

Ta metoda tworzy połączenie z usługą RabbitMQ, ustanawia kanał oraz ustawia obsługę generycznych zdarzeń.

Kolejną metodą jest metoda to setup'u consumer'a. Metoda ta pozwala przypisać obsługę przychodzących wiadomości:

typescript

W kwestii obsługi sukcesu i błędów wszystko jest zarządzane za pomocą metod ack i nack. Mówiąc prościej: gdy wywoływana jest metoda ack, wiadomość jest uznawana za prawidłowo przetworzoną; gdy wywoływana jest metoda nack, wiadomość jest uznawana za nieprzetworzoną i zostanie wysłana do kolejki dead-letter w celu ponownego wysłania do docelowej kolejki.

Ostatnią metodą jest metoda do rozłączenia się z usługą RabbitMQ.

typescript

Użycie RabbitMQ

Mamy wszystko skonfigurowane i gotowe do użycia, więc możemy zaimplementować główną usługę, która będzie odpowiedzialna za monitorowanie folderu i uruchamianie wysyłki wiadomości do oczekujących usług.

Nazwiemy ten serwis: "producer".

Przygotujmy konfigurację dla RabbitMQ:

typescript

Tutaj definiujemy dwie kolejki dla naszych usług - consumerów, które będą odbierać wiadomości wyzwalające przetwarzanie plików. Kolejki dead-letter zostaną skonfigurowane automatycznie, ponieważ zaimplementowaliśmy tą funkcjonalność w module producera(packages).

Teraz możemy zdefiniować funkcje, które będą wywoływać metody do łączenia się z usługą message brokera, publikowania wiadomości oraz zamykania połączenia.

typescript

Te funkcje będą wywoływane bezpośrednio w module fileObserver, który wygląda następująco:

typescript

Serwisy Consumer'ów

Dzięki reużywalnym klasom, które stworzyliśmy w packages, możemy teraz łatwo wywoływać metody, nie martwiąc się o niskopoziomowe szczegóły związane z samym message brokerem.

Kod do obsługi wiadomości wygląda następująco:

typescript

Musimy utworzyć instancję klasy consumera. Musimy podać wszystkie argumenty odpowiadające usłudze zdefiniowanej wcześniej podczas tworzenia producera. Właściwość prefetch określa, ile wiadomości consumer może pobrać z kolejki przed ich potwierdzeniem(ack). W tym przypadku oznacza to, że usługa będzie przetwarzać tylko jedną wiadomość naraz.

Metoda setupConsumer przyjmuje obsługę wiadomości, która zostanie wykonana po jej nadejściu. W tym przypadku dane wiadomości są weryfikowane i przekazywane do funkcji odpowiedzialnej za utworzenie pull requesta z pliku, którego referencja została przesłana w treści wiadomości.

W seo-service kod consumer'a jest prawie taki sam:

typescript

Podsumowanie

Postanowiłem pokazać raczej prosty przykład użycia RabbitMQ w NodeJS, ale z real-world scenariuszem użycia. Nie użyliśmy żadnego frameworka, aby mieć neutralną, surową implementację w środowisku NodeJS. W świecie złożonych systemów działających asynchronicznie musimy brać pod uwagę wiele wzorców i technologii. Dodatkowo często generujemy dużo kodu przy pomocy AI, kodu, który musi być przez nas nadzorowany, dlatego solidne opanowanie funkcji ekosystemu jest bezcenne. Takie przykłady świetnie sprawdzają się dla mnie, kiedy chcę nauczyć się nowej technologii albo odświeżyć swoją wiedzę. Zachęcam do nauki przez praktykę — jeśli chcesz poznać wzorzec lub bibliotekę, spróbuj coś stworzyć z jej pomocą, nawet małą aplikację rozwiązującą jakiś problem. Nie ma znaczenia, czy na rynku istnieje już rozwiązanie; staw czoła problemom i zdobądź praktyczne doświadczenie z biblioteką. Przy szerszych tematach, takich jak wzorce architektoniczne czy inne szersze koncepcje,staraj się je rozbić i zrozumieć każdą część, eksperymentując z nimi, jeżeli to możliwe, osobno. Obecnie wiedza i praktyczne doświadczenie są kluczem do sukcesu w karierze programisty.

Mam nadzieję, że zechcesz zgłębić przykład z tego artykułu. Oto repozytorium!

Stay tuned👋🏻

typescript and javascript logogreg@aboutjs.dev

©Grzegorz Dubiel | 2026