typescript and javascript logo
author avatar

Grzegorz Dubiel

31-12-2025

Using RabbitMQ in Modern NodeJS: A Practical Fanout Exchange Example

The purpose of the applications we create is to automate processes and make people’s lives easier. When automating something, multiple tasks often need to be performed, and many of them can—and should—be parallelized. To achieve this, we create independent services responsible for handling specific tasks. These services usually wait for a message to start working.

At this point, you may realize that I am talking about microservices — and you would be partially right. However, it is worth noting that message brokers are not used exclusively in the microservices world. Let's focus on how a message broker works in the Node.js ecosystem. The tool I chose is RabbitMQ.

What is RabbitMQ

RabbitMQ is a message broker — a service responsible for message distribution. The messaging process is handled by a producer, which publishes a message to an exchange. The exchange then routes the message to the appropriate queues, where consumers can pick it up and perform actions based on the information received.

The exchange layer is a powerful abstraction responsible for routing messages to queues. This allows, for example, sending copies of the same message to multiple queues, enabling multiple consumers to process the same message independently. By using separate queues, each consumer can, for example, apply back-pressure on its own, which makes it possible to implement pub/sub–like behavior using queues.

RabbitMQ implements the AMQP (Advanced Message Queuing Protocol), which enables reliable message-based communication using queues. The AMQP protocol defines how a producer sends a message, how the broker routes it, and how queues, exchanges, and bindings work together. It also specifies how communication between clients and the broker is performed over TCP.

Exchange

This is a very powerful and important feature in RabbitMQ. As I mentioned earlier, the exchange takes a message from a producer and routes it to the appropriate queues according to the declared exchange type.

There are multiple types of exchanges. In this article, we will cover the following:

  • fanout is responsible for receiving a message and sending a copy of it to all queues bound to the exchange, which allows multiple consumers to process the same message through their own queues.
  • direct routes a message to the queue whose binding key exactly matches the routing key provided when the message is published. This ensures communication in the form of producer → exchange → exact queue (routing_key) → consumer.

If you would like to read about the remaining exchange types, you can find them in the official RabitMQ documentation.

The Project Overview

If you have read any of my previous articles, you know that I like to discuss specific features, patterns and libraries by building practical, working projects. So let’s talk about the project we will build.

The application we will build acts as a post metadata orchestrator. Its task is to observe a directory and wait for a post to be published in Markdown format. Once such an event occurs, the producer sends a message containing file metadata—such as the file name and file path—through a fanout exchange to two queues. There will be two consumers:

  • SEO service responsible for generating SEO metadata for the post using an LLM, and then saving the results to MongoDB.

  • Github Upload service responsible for creating a pull request with the post’s MD file to the configured repository.

We will also implement a dead-letter exchange to allow retrying unresolved messages, which utilizes the direct exchange type. Each service will have a separate queue dedicated to handling dead messages.

The project’s code will be stored in a monorepo, which will be managed using PNPM workspaces.

The structure of the monorepo will look like this:

Markdown

rabitmq-node (monorepo)

├── 📦 packages/
│ ├── logger/
│ │ └── TypeScript logger utility package
│ └── rabbitmq/
│ └── RabbitMQ consumer/producer package

├── 🔧 services/
│ ├── seo-service/
│ │ └── SEO content analysis service (OpenAI + MongoDB)
│ └── github-upload-service/
│ └── GitHub PR-based file upload service

├── 📝 producer/
│ └── File observer + message publishing service

├── 🛠️ scripts/
│ └── Development and utility scripts

Creating RabbitMQ Wrapper

We will create two classes: one for the producer and one for the consumer. These two classes clearly demonstrate how RabbitMQ works.

Producer

At the beginning, we create two interfaces for queue configuration. They define the configuration shape that corresponds to RabbitMQ requirements.

typescript

export interface QueueConfig {
  name: string;
  routingKey?: string;
  maxRetries?: number;
  consumerType: string;
}

export interface ProducerConfig {
  rabbitmqUrl: string;
  exchangeName: string;
  exchangeType?: "fanout" | "topic" | "direct";
  queues?: QueueConfig[];
}

The rabbitmqUrl allows you to provide the URL for connecting to the message broker service. The exchangeName is required for setting up the exchange, which holds the queues. Each queue must have at least a defined name, and can optionally include a routingKey and maxRetries, which will be used for configuring error-handling queues (dead-letter queues).

Now, let's define the producer class and its connect method:

typescript

// Rest of the code
export class RabbitMQProducer {
  private connection: ChannelModel | null = null;
  private channel: Channel | null = null;
  private config: ProducerConfig;
  private logger: ILogger;

  constructor(config: ProducerConfig, logger: ILogger) {
    this.config = {
      exchangeType: "fanout",
      ...config,
    };
    this.logger = logger;
  }
  async connect() {
    try {
      this.connection = await amqp.connect(this.config.rabbitmqUrl);
      this.logger.info("Connected to RabbitMQ");

      this.channel = await this.connection.createChannel();
      this.logger.info("Channel created");

      this.connection.on("error", (err: Error) => {
        this.logger.error("RabbitMQ connection error:", err.message);
      });

      this.connection.on("close", () => {
        this.logger.warn("RabbitMQ connection closed");
      });
    } catch (error) {
      this.logger.error(
        "Failed to connect to RabbitMQ:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

In the connect method we need to establish a connection with the RabbitMQ service, in order to do this we need to create a channel and setup callback that handles the generic events. A channel serves as a lightweight virtual connection inside a single TCP connection, allowing independent AMQP message traffic to be handled concurrently.

Now we need to declare method for setting up the exchange.

typescript

// Rest of the code
export class RabbitMQProducer {
  // Rest of the code...
  private async setupExchange() {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const exchangeType = this.config.exchangeType || "fanout";
    await this.channel.assertExchange(this.config.exchangeName, exchangeType, {
      durable: true,
    });
    this.logger.info(
      `Exchange declared: ${this.config.exchangeName} (${exchangeType})`,
    );
  }

  private getCentralDLXName() {
    return "app.dlx";
  }

  private async setupCentralDLX() {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const dlxName = this.getCentralDLXName();
    await this.channel.assertExchange(dlxName, "direct", {
      durable: true,
    });
    this.logger.info(`Central Dead Letter Exchange declared: ${dlxName}`);
  }
}

To set up the exchange, we need to assert it, meaning that the exchange will be created if it doesn’t already exist. The exchange is created inside the previously created channel. Next, we need to provide three arguments to the assertExchange method. The first is the exchange name, the second is the exchange type (in our case, the default is fanout), and the last is a configuration object where we define whether the exchange is durable or not. This argument is important because a durable exchange will persist across RabbitMQ restarts, which is usually a good choice to make the application more reliable.

The next methods set up the exchange for error handling. Here, you can see a clear distinction between types of exchanges. The error-handling exchange has a type of direct, while the exchange used to deliver messages to multiple services at once has a type of fanout. As mentioned earlier, a fanout exchange sends a copy of the same message to all bound queues, whereas a direct exchange delivers a message directly to the queue bound to it and specified by the routing key.

Now it’s time to define the methods for queues. We need to create methods for both standard queues and dead-letter queues:

typescript

// Rest of the code
export class RabbitMQProducer {
  // Rest of the code...
  private getDeadLetterQueueName(consumerType: string) {
    return `dlq.${consumerType}`;
  }
  private async setupMainQueue(queue: QueueConfig) {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const dlxName = this.getCentralDLXName();

    await this.channel.assertQueue(queue.name, {
      durable: true,
      arguments: {
        "x-dead-letter-exchange": dlxName,
        "x-dead-letter-routing-key": queue.consumerType,
      },
    });
    this.logger.info(`Queue declared: ${queue.name}`);

    await this.channel.bindQueue(
      queue.name,
      this.config.exchangeName,
      queue.routingKey || "",
    );
    this.logger.info(
      `Queue bound: ${queue.name} -> ${this.config.exchangeName}`,
    );
  }
  private async setupDeadLetterQueue(queue: QueueConfig) {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    const maxRetries = queue.maxRetries ?? 4;
    const dlxName = this.getCentralDLXName();
    const dlQueueName = this.getDeadLetterQueueName(queue.consumerType);
    const retryDelayMs = 5000 * maxRetries;

    await this.channel.assertQueue(dlQueueName, {
      durable: true,
      arguments: {
        "x-dead-letter-exchange": this.config.exchangeName,
        "x-dead-letter-routing-key": queue.consumerType,
        "x-message-ttl": retryDelayMs,
      },
    });
    this.logger.info(`Dead Letter Queue declared: ${dlQueueName}`);

    await this.channel.bindQueue(dlQueueName, dlxName, queue.consumerType);
    this.logger.info(
      `Dead Letter Queue bound: ${dlQueueName} -> ${dlxName} (routing key: ${queue.consumerType})`,
    );
  }
}

To set up a queue, we need to use the assertQueue method. This method requires two arguments: the queue name (provided from the config) and a configuration object. In the configuration object, we specify two things: first, whether the queue should be durable, and second, the arguments for setting up a reference to the DLX (dead-letter exchange) queue. The x-dead-letter-exchange specifies the DLX exchange we created earlier, and the x-dead-letter-routing-key is used by the dead-letter exchange to determine which queue should receive the message.

The methods for setting up the dead-letter queue look similar; the arguments provided to assertQueue are almost the same. Additionally, we need to provide x-message-ttl, which determines when to resend the dead-letter message.

Next, we create the method to set everything up together by calling previously defined methods:

typescript

// Rest of the code
export class RabbitMQProducer {
  // Rest of the code...
  async setup() {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }
    if (!this.connection) {
      throw new Error("Connection not initialized");
    }

    try {
      await this.setupExchange();
      await this.setupCentralDLX();

      const setupPromises = (this.config.queues || []).map(async (queue) => {
        await this.setupDeadLetterQueue(queue);
        await this.setupMainQueue(queue);
      });
      await Promise.all(setupPromises);
      this.logger.info(
        `Infrastructure setup complete for exchange: ${this.config.exchangeName}`,
      );
    } catch (error) {
      this.logger.error(
        "Failed to setup RabbitMQ infrastructure:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

In the setup method, we simply call all the methods for configuring exchanges and queues.

The next method we need to define is the method for publishing a message:

typescript

export class RabbitMQProducer {
  // Rest of the code...
  async publish(message: any, routingKey: string = "") {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    try {
      const messageBuffer = Buffer.from(JSON.stringify(message));
      const sent = this.channel.publish(
        this.config.exchangeName,
        routingKey,
        messageBuffer,
        {
          persistent: true,
        },
      );

      if (sent) {
        this.logger.debug(
          `Message published to exchange ${this.config.exchangeName}:`,
          message,
        );
      } else {
        this.logger.warn("Message was not sent, buffer full");
      }

      return sent;
    } catch (error) {
      this.logger.error(
        "Error publishing message:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

In the publish method, we send a message using the publish method from the channel. This method requires four arguments: the exchange name, which refers to the relevant exchange responsible for routing the message to the correct queues; the routingKey, which, if provided, is used to determine the queue or queues to which the message should be sent; the messageBuffer, which contains the message body; and a configuration object where we set the message to be persistent.

The last method is disconnect, which gracefully disconnects the producer.

typescript

// Rest of the code...
export class RabbitMQProducer {
  // Rest of the code...
  async disconnect() {
    try {
      if (this.channel) {
        await this.channel.close();
      }
      if (this.connection) {
        await this.connection.close();
      }
      this.logger.info("Disconnected from RabbitMQ");
    } catch (error) {
      this.logger.error(
        "Error disconnecting from RabbitMQ:",
        error instanceof Error ? error.message : String(error),
      );
    }
  }
}

Consumer

The consumer class should be less complex, as we only need to connect to RabbitMQ and then listen for incoming messages.

Let’s define the class along with its first method, connect:

typescript

import amqp from "amqplib";
import { ILogger } from "./logger";
import { ConsumerMessage, ConsumerConfig } from "./types";
import { Connection, Channel, Message } from "./amqplib.types";

export class RabbitMQConsumer {
  private connection: Connection | null = null;
  private channel: Channel | null = null;
  private config: ConsumerConfig;
  private logger: ILogger;

  constructor(config: ConsumerConfig, logger: ILogger) {
    this.config = {
      prefetch: 1,
      ...config,
    };
    this.logger = logger;
  }

  async connect() {
    try {
      this.connection = (await amqp.connect(
        this.config.rabbitmqUrl,
      )) as any as Connection;
      this.logger.info("Connected to RabbitMQ");

      this.channel = await this.connection.createChannel();
      this.logger.info("Channel created");

      this.connection.on("error", (err: Error) => {
        this.logger.error("RabbitMQ connection error:", err.message);
      });

      this.connection.on("close", () => {
        this.logger.warn("RabbitMQ connection closed");
      });
    } catch (error) {
      this.logger.error(
        "Failed to connect to RabbitMQ:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

This method creates a connection to the RabbitMQ service, establishes a channel, and sets up handlers for generic events.

The next method is for setting up the consumer. This method allows you to assign a handler for incoming messages:

typescript

// rest of the code...
export class RabbitMQConsumer {
  // rest of the code...
  async setupConsumer(messageHandler: MessageHandler) {
    if (!this.channel) {
      throw new Error("Channel not initialized");
    }

    try {
      await this.channel.prefetch(this.config.prefetch || 1);

      this.channel.consume(
        this.config.queueName,
        async (msg: Message | null) => {
          if (!msg) {
            return;
          }

          let parsedMessage: ConsumerMessage | null = null;

          try {
            const content = msg.content.toString();
            parsedMessage = JSON.parse(content) as ConsumerMessage;

            this.logger.info(`Received message: ${parsedMessage.filename}`);

            await messageHandler(parsedMessage);

            this.channel!.ack(msg);
            this.logger.debug(
              `Message acknowledged: ${parsedMessage.filename}`,
            );
          } catch (error) {
            const filename = parsedMessage?.filename || "unknown";

            this.logger.error(
              `Error processing message: ${filename}`,
              error instanceof Error ? error.message : String(error),
            );

            this.channel!.nack(msg, false, false);
          }
        },
      );

      this.logger.info(
        `Started consuming from queue: ${this.config.queueName}`,
      );
    } catch (error) {
      this.logger.error(
        "Failed to setup consumer:",
        error instanceof Error ? error.message : String(error),
      );
      throw error;
    }
  }
}

When it comes to success and error handling, everything is managed by calling the ack and nack methods. In plain English: when ack is called, the message is considered successfully handled; when nack is called, the message is considered not successfully handled and will be sent to the dead-letter queue for re-queuing.

The last method is the one for disconnecting from the RabbitMQ service.

typescript

// rest of the code...
export class RabbitMQConsumer {
  // rest of the code...
  async disconnect() {
    try {
      await this?.channel?.close();
      await this?.connection?.close();

      this.logger.info("Disconnected from RabbitMQ");
    } catch (error) {
      this.logger.error(
        "Error disconnecting from RabbitMQ:",
        error instanceof Error ? error.message : String(error),
      );
    }
  }
}

RabbitMQ usage

We have everything set up and ready for use, so we can implement the core service, which is responsible for monitoring the directory and triggering the sending of messages to the waiting services.

We will name this service the "producer".

Let’s prepare the configuration for RabbitMQ:

typescript

import { RabbitMQProducer, type ProducerConfig } from "@monorepo/rabbitmq";

const EXCHANGE_NAME = "content-fanout";
const EXCHANGE_TYPE = "fanout";

const producerConfig: ProducerConfig = {
  rabbitmqUrl: process.env.RABBITMQ_URL || "amqp://guest:guest@localhost:5672",
  exchangeName: EXCHANGE_NAME,
  exchangeType: EXCHANGE_TYPE as "fanout" | "topic" | "direct",
  queues: [
    {
      name: "seo-queue",
      consumerType: "seo",
      routingKey: "seo",
      maxRetries: 4,
    },
    {
      name: "github-upload-queue",
      consumerType: "github-upload",
      routingKey: "github-upload",
      maxRetries: 4,
    },
  ],
};

Here, we define two queues for our consumer services, which will receive messages that trigger file processing. Dead-letter queues will be set up automatically, as we implemented this behavior in the producer package.

Now we can define functions that will call the methods for connecting to the message broker service, publishing messages, and closing the connection.

typescript

// Rest of the code...
let producer: RabbitMQProducer | null = null;

export async function connectRabbitMQ(): Promise<void> {
  let lastError: Error | null = null;

  try {
    producer = new RabbitMQProducer(producerConfig, logger);
    await producer.connect();
    await producer.setup();
    logger.info("RabbitMQ connected successfully");
    return;
  } catch (error) {
    lastError = error instanceof Error ? error : new Error(String(error));
  }

  logger.error("Failed to connect to RabbitMQ after all retries");
  throw lastError || new Error("Failed to connect to RabbitMQ");
}

export async function publishMessage(
  message: any,
  routingKey: string = "",
): Promise<boolean> {
  try {
    if (!producer) {
      throw new Error(
        "RabbitMQ producer not initialized. Call connectRabbitMQ first.",
      );
    }
    const sent = await producer.publish(message, routingKey);
    if (sent) {
      logger.debug(`Message published to exchange ${EXCHANGE_NAME}:`, message);
    } else {
      logger.warn("Message was not sent, buffer full");
    }
    return sent;
  } catch (error) {
    logger.error("Error publishing message:", error);
    throw error;
  }
}

export async function closeConnection(): Promise<void> {
  try {
    if (producer) {
      await producer.disconnect();
    }
    logger.info("RabbitMQ connection closed");
  } catch (error) {
    logger.error("Error closing RabbitMQ connection:", error);
  }
}

These functions will be called directly in the fileObserver module, which looks like this:

typescript

import { watch } from "chokidar";
import * as fs from "fs";
import * as path from "path";
import { v4 as uuidv4 } from "uuid";
import { CONTENT_FOLDER, FILE_EXTENSION, WATCHER_OPTIONS } from "./consts";
import { logger } from "./logger";
import { publishMessage } from "./rabbitmq";

interface FileMessage {
  sourceId: string;
  filename: string;
  path: string;
  timestamp: string;
}

async function handleFileAdded(filePath: string): Promise<void> {
  try {
    const fileName = path.basename(filePath);
    const ext = path.extname(filePath);

    if (ext !== FILE_EXTENSION) {
      logger.debug(`Skipping non-markdown file: ${fileName}`);
      return;
    }

    logger.info(`File detected: ${fileName}`);

    if (!fs.existsSync(filePath)) {
      logger.warn(`File does not exist: ${fileName}`);
      return;
    }

    const message: FileMessage = {
      sourceId: uuidv4(),
      filename: fileName,
      path: path.resolve(filePath),
      timestamp: new Date().toISOString(),
    };

    await publishMessage(message, "");
    logger.info(`File processed and published: ${fileName}`);
  } catch (error) {
    logger.error(`Error processing file: ${filePath}`, error);
  }
}

export async function startFileObserver(): Promise<void> {
  try {
    if (!fs.existsSync(CONTENT_FOLDER)) {
      fs.mkdirSync(CONTENT_FOLDER, { recursive: true });
      logger.info(`Created content folder: ${CONTENT_FOLDER}`);
    }

    logger.info(`Starting file observer on: ${CONTENT_FOLDER}`);

    const watcher = watch(CONTENT_FOLDER, WATCHER_OPTIONS);

    watcher.on("add", (filePath) => {
      logger.debug(`File added event: ${filePath}`);
      handleFileAdded(filePath);
    });

    watcher.on("change", (filePath) => {
      logger.debug(`File changed event: ${filePath}`);
      handleFileAdded(filePath);
    });

    watcher.on("error", (error) => {
      logger.error("File watcher error:", error);
    });

    logger.info("File observer started successfully");
  } catch (error) {
    logger.error("Failed to start file observer:", error);
    throw error;
  }
}

Consumer Services

Thanks to the reusable classes we created in the packages, we can now easily call methods without worrying about low-level details related to the message broker itself.

The code for handling messages looks like this:

typescript

// github-upload-service

// ...rest of the code
consumer = new RabbitMQConsumer(
  {
    rabbitmqUrl: env.RABBITMQ_URL,
    exchangeName: GITHUB_EXCHANGE,
    queueName: GITHUB_UPLOAD_QUEUE,
    prefetch: 1,
  },
  logger,
);

await consumer.connect();
logger.info("Connected to RabbitMQ");

await consumer.setupConsumer(async (message: ConsumerMessage) => {
  if (!message || !message.filename) {
    logger.warn("Received invalid message, skipping");
    return;
  }

  const validatedMessage = GithubUploadMessageSchema.parse(message);
  logger.info("Received message for GitHub upload", {
    filename: validatedMessage.filename,
    sourceId: validatedMessage.sourceId,
  });

  await uploadFileViaPR({
    octokit,
    message: validatedMessage,
    owner: env.GITHUB_OWNER,
    repo: env.GITHUB_REPO,
    uploadPath: env.GITHUB_UPLOAD_PATH,
    baseBranch: env.GITHUB_BRANCH,
    prTitlePrefix: env.GITHUB_PR_TITLE_PREFIX,
  });
});

// ...rest of the code

When creating the instance of the consumer class we must provide all the arguments that correspond to the service properties provided earlier when creating the producer module. The prefetch property indicates how many messages the consumer can receive from the queue before acknowledging them. In this case, it means that the service will handle only one message at a time.

The setupConsumer method takes a message handler, which will be executed when a message arrives. In this case, the message data is validated and passed to the function responsible for creating a pull request from the file referenced in the message body.

In the seo-service, the code for the consumer is almost the same:

typescript

// rest of the code...
consumer = new RabbitMQConsumer(
  {
    rabbitmqUrl,
    exchangeName: RABBITMQ_EXCHANGE,
    queueName: SEO_WORKER_QUEUE,
    prefetch: 1,
  },
  logger,
);

await consumer.connect();
logger.info("Connected to RabbitMQ");

await consumer.setupConsumer(async (message) => {
  if (!message || !message.filename) {
    logger.warn("Received invalid message, skipping");
    return;
  }
  logger.info("Received message for processing", {
    filename: message.filename,
  });
  await processor.processContent(message);
});
// rest of the code...

Conclusions

I decided to show a rather simple example of RabbitMQ usage in NodeJS, but with a real-world use case scenario. We did not use any framework in order to have an unopinionated, raw implementation in the NodeJS environment. In a world of complex systems that operate asynchronously, we need to consider many patterns and technologies. Additionally, we often generate a lot of code with AI, which must be supervised by us, so having a solid grasp of the ecosystem’s features is pure gold. Examples like this work well for me in the case of diving deeper when learning new things or refreshing my knowledge. I encourage you to learn by doing — when you want to understand a pattern or library, try to implement something with it, even a small app that solves a problem. It doesn’t matter if a solution already exists on the market; face the issues and gain practical experience with the library. When tackling a broader topic, such as architecture patterns or other larger concepts, try to break it down and understand each piece by experimenting with them separately if possible. Today, your knowledge and practical experience are key to being successful in a software development career.

I hope you would like to dive deeper into the example from this article. Here is the repository!

Stay tuned 👋🏻

typescript and javascript logogreg@aboutjs.dev

©Grzegorz Dubiel | 2025