Outbox Pattern in NestJS: Praktyczna Implementacja
Kiedy patrzę na coś co wygląda na skomplikowane, czuję ekscytację jakbym zobaczył coś magicznego, wielkiego albo epickiego, to zachęca mnie żeby zanurkować głębiej i dowiedzieć się jak ta RZECZ zosała stworzona. Do programowania trafiłem będąc wcześniej muzykiem. Te emocje są dla mnie takie same w obu światach. Jednym z impulsów, żeby sięgnąć po instrument, nie mniej ważnym niż sam akt tworzenia, była ciekawość, jak skomponowano i zagrano piosenkę którą ostatnio usłyszałem. Na początku mojej drogi z instrumentem próbowałem grać ze słuchu i było to świetne doświadczenia, ale szybko nauczyłem się jednej rzeczy: im więcej znam schematów kompozycyjnych i „licków”, tym więcej słyszę w utworze i tym więcej potrafię zagrać. Ten sam motor napędza mnie, gdy próbuję zrozumieć szerszy obraz w aplikacjach: jak framework działa pod spodem, jak rozwiązać zadanie z LeetCode, jak projektować kod, jak działa system płatności. Możemy stanąć przed problemem spięcia projektu funkcji płatności i musimy pamietać żeby nie obciążyła ona tej samej osoby dwa razy, mogła się komunikować z wieloma usługami, wysyłać powiadomienia, mieć możliwość ponawiania części procesu płatności lub zamówienia i jednocześnie mieć nad nią widoczność oraz kontrolę dzięki solidnej możliwości śledzenia. Najgorsze, co możemy zrobić, to wymyślać koło na nowo, zwłaszcza że w tym przypadku mamy już solidne, sprawdzone w boju wzorce. Chciałbym z Tobą zanukrować głębiej w implementację jednego z nich: wzorca Outbox.
Czym jest wzorzec Outbox
Wzorzec Outbox to sposób na to, żeby zapis w bazie danych i zdarzenie, które chcesz opublikować, były zatwierdzone razem. Zamiast publikować wiadomość bezpośrednio, zapisujesz ją w trwałej pamięci (tabeli outbox) w tej samej transakcji co powiązana operacja. Dzięki temu serwis zapisuje, to co się wydarzyło, i publikuje to później: jeśli transakcja zostanie zatwierdzona, zdarzenie w tabeli outbox zostanie zapisane; jeśli coś pójdzie nie tak i transakcja zostanie wycofana, cofnięty zostanie również zapis zdarzenia outbox. Dzięki temu mamy pewność co do stanu operacji. Poprawia to też śledzenia, debugowanie i spójność. Publisher nigdy nie wyśle wiadomości do kolejki, jeśli powiązana operacja się nie powiedzie.
Ten wzorzec jest bardzo przydatny w mikroserwisach z komunikacją event-driven, na przykład w systemach płatności, gdzie potwierdzenie płatności i złożenie zamówienia są niezwykle ważne i muszą być dobrze skoordynowane. W takim scenariuszu zdarzenia publikuje się pośrednio: w trakcie transakcji zapisujesz je w dedykowanej tabeli outbox. Później wiadomość jest pobierana z bazy danych, publikowana do brokera wiadomości, a rekord outbox'a jest oznaczany jako wysłany. Ten wzorzec pozwala na niezawodne publikowanie wiadomości w message brokerze bez sięgania po bardzo złożone transakcje rozproszone pomiędzy serwisami. Innymi słowy: możesz mieć wiele baz danych i wiele serwisów, wciąż rozdzielonych i niezależnych, a cały flow nie zdesynchronizuje danych, w razie pojawienia się błedu na dowolnym etapie procesu.
Rysujemy architekturę projektu i implementację wzorca
Narysujmy szybki diagram pokazujący, jak to zaimplementujemy. Aplikacja, którą zbudujemy, będzie prostym systemem płatności złożonym z: API Gateway, który wysyła dane do Order Service; Order Service po utworzeniu zamówienia zapisuje zdarzenie OrderCreated w tabeli outbox, a następnie publisher publikuje wiadomość o złożeniu zamówienia; później worker-consumer w Payment Service odbiera to zdarzenie i rozpoczyna przetwarzanie transakcji płatniczej. Gdy transakcja zakończy się sukcesem, Payment Service opublikuje wiadomość do fanout exchange, którą później skonsumują Notification Service i Email Service.

Jak widzisz, planujemy użyć wzorca Outbox w najbardziej krytycznych serwisach: Order Service i Payment Service. Przybliżmy jeden z nich, żeby zobaczyć, jak to zaimplementujemy:

Serwis wykonuje operacje w ramach transakcji bazodanowej. Załóżmy, że po drodze są jakieś obliczenia i trochę „ping-ponga” z bazą danych. W naszym scenariuszu w Order Service musimy zapisać zamówienie w bazie danych, w ralnym przypadku z życia moglibyśmy też robić przeliczenia stanów magazynowych, sprawdzać dostępność itd. W tej samej transakcji, po wszystkich operacjach domenowych, zapisujemy zdarzenie w tabeli outbox. Jeśli coś w ramach transakcji się wysypie (na przykład tworzenie zamówienia), cała transakcja zostanie wycofana i nie zostanie zapisane ani zamówienie, ani zdarzenie w outboxie. Ostatni element obsługuje worker-publisher, który przy pomocy cron job'a okresowo sprawdza w tabeli outbox zdarzenia do opublikowania i, jeśli coś znajdzie, publikuje wiadomość do exchange oraz aktualizuje stan zdarzenia w tabeli outbox.
Wchodzimy głębiej w implementację
Implementację zrobimy w frameworku NestJS, który świetnie sprawdza się w podobnych przypadkach dzięki szerokiemu zestawowi narzędzi i opiniotwórczym (opinionated) wzorcom. Dla przejrzystości i w celach wyjaśniających nie będziemy skupiać się na konfiguracji, boilerplate’cie, testach ani detalach zależności. Cała implementacja będzie dostępna w tym repozytorium na GitHubie.
API Gateway
Najprościej zacząć od przeglądu implementacji API, zaglądając do kontrolera.
typescript
// order.controller.ts
import {
Controller,
Post,
Get,
Body,
Param,
Query,
HttpCode,
HttpStatus,
Headers,
UseInterceptors,
} from "@nestjs/common";
import { CreateOrderDto } from "../dto/create-order.dto";
import { OrderDto } from "../dto/order.dto";
import { OrderService } from "../services/order.service";
import { IdempotencyKeyValidationInterceptor } from "../interceptors/idempotency-key-validation.interceptor";
@Controller("orders")
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@Post()
@HttpCode(HttpStatus.CREATED)
@UseInterceptors(IdempotencyKeyValidationInterceptor)
async createOrder(
@Body() createOrderDto: CreateOrderDto,
@Headers("idempotency-key") idempotencyKey?: string,
): Promise<OrderDto> {
return this.orderService.createOrder({ createOrderDto, idempotencyKey });
}
@Get(":id")
async getOrder(@Param("id") id: string): Promise<OrderDto> {
return this.orderService.getOrder(id);
}
@Get()
async getAllOrders(
@Query("skip") skip: number = 0,
@Query("take") take: number = 10,
): Promise<{ data: OrderDto[]; total: number }> {
return this.orderService.getAllOrders(skip, take);
}
}Mamy tu kilka bardzo typowych handlerów: do złożenia zamówienia, do pobrania listy zamówień z paginacją oraz do pobrania zamówienia po id. Widać też jedną ważną rzecz jeśli chodzi o systemy płatności: idempotencję. Jest ona kluczowa, żeby zapobiec niespójnościom danych, na przykład złożeniu tego samego zamówienia dwa razy.
Dalej przejdziemy do OrderService, żeby zobaczyć, jak tworzymy zamówienie i jak komunikujemy się z Order Service. Zajrzyjmy tam przez metodę createOrder.
typescript
// order.service.ts
import { Injectable, HttpException, HttpStatus } from "@nestjs/common";
import { HttpService } from "@nestjs/axios";
import { firstValueFrom } from "rxjs";
import { CreateOrderDto } from "../dto/create-order.dto";
import { OrderDto } from "../dto/order.dto";
@Injectable()
export class OrderService {
private readonly orderServiceUrl =
process.env.ORDER_SERVICE_URL || "http://localhost:3001";
constructor(private readonly httpService: HttpService) {}
async createOrder({
createOrderDto,
idempotencyKey,
}: {
createOrderDto: CreateOrderDto;
idempotencyKey?: string;
}): Promise<OrderDto> {
try {
const headers: Record<string, string> = {};
if (idempotencyKey) {
headers["Idempotency-Key"] = idempotencyKey;
}
const response = await firstValueFrom(
this.httpService.post<OrderDto>(
`${this.orderServiceUrl}/orders`,
createOrderDto,
{
headers,
},
),
);
return response.data;
} catch (error) {
throw new HttpException(
error.response?.data || "Failed to create order",
error.response?.status || HttpStatus.INTERNAL_SERVER_ERROR,
);
}
}
// REST OF THE CODE ....
}Widzimy tu, że bierzemy CreateOrderDto, wyciągamy idempotencyKey z nagłówków i wywołujemy Order Service przez request POST. Do wywołania serwisu używamy kombinacji @nestjs/axios oraz rxjs, co pozwala obsłużyć request bez nadmiaru boilerplate’u. Odpowiedź dostajemy przez rozwiązanie observable'a.
Order Service
To jest miejsce, o którym wspominałem, gdy omawiałem diagram wzorca outbox. Zadanie tutaj polega na przyjęciu żądania złożenia zamówienia, przetworzeniu zamówienia, zapisaniu zdarzenia w tabeli outbox i wysłaniu wiadomości do brokera wiadomości.
OK, zacznijmy od kontrolera, tak jak wcześniej.
typescript
// order.controller.ts
import {
Controller,
Post,
Get,
Body,
Param,
Query,
HttpCode,
HttpStatus,
Headers,
UseInterceptors,
} from "@nestjs/common";
import { CreateOrderDto } from "../dto/create-order.dto";
import { OrderDto } from "../dto/order.dto";
import { OrderService } from "../services/order.service";
import { IdempotencyKeyValidationInterceptor } from "../interceptors/idempotency-key-validation.interceptor";
@Controller("orders")
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@Post()
@HttpCode(HttpStatus.CREATED)
@UseInterceptors(IdempotencyKeyValidationInterceptor)
async createOrder(
@Body() createOrderDto: CreateOrderDto,
@Headers("idempotency-key") idempotencyKey?: string,
): Promise<OrderDto> {
return this.orderService.createOrder({ createOrderDto, idempotencyKey });
}
@Get(":id")
async getOrder(@Param("id") id: string): Promise<OrderDto> {
return this.orderService.getOrder(id);
}
@Get()
async getAllOrders(
@Query("skip") skip: number = 0,
@Query("take") take: number = 10,
): Promise<{ data: OrderDto[]; total: number }> {
return this.orderService.getAllOrders(skip, take);
}
}Ponieważ nasz API Gateway ma możliwie najprostszą formę, kontroler Order Service na pierwszy rzut oka wygląda prawie tak samo. W realnym scenariuszu API Gateway miałby mnóstwo handlerów, które przekazują wiadomości lub żądania do wielu serwisów.
Jak widać, wzorzec tworzy zdarzenie, ale jeszcze nie wysyła go do brokera wiadomości. Żeby to zrobić, potrzebujemy workera, który komunikuje się z brokerem wiadomości.
Message Relay dla Order Service
Są co najmniej dwa sposoby obsługi zdarzeń outbox:
- Change Data Capture (CDC) - przechwytujemy zmiany w tabeli outbox; gdy tylko zmiana zostanie wykryta, wiadomość jest publikowana dzięki integracji z message brokerem.
- Polling - worker działa w tle i cyklicznie odpytuje tabelę outbox; gdy zdarzenie zostanie opublikowane, wysyła wiadomość przez brokera i oznacza zdarzenie outbox jako zakończone.
Skupimy się na opcji 2.
Przejdź do pliku, żeby zobaczyć, jak worker-publisher jest zintegrowany z Order Service.
typescript
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { TypeOrmModule } from "@nestjs/typeorm";
import { ScheduleModule } from "@nestjs/schedule";
import { DatabaseModule, OutboxEvent, IdempotencyRecord } from "@app/database";
import { MessagingModule } from "@app/messaging";
import { Order, OrderItem } from "./entities";
import { OrderController } from "./controllers/order.controller";
import { OrderService } from "./services/order.service";
import { OrderRepository } from "./repositories/order.repository";
import { IdempotencyService } from "./services/idempotency.service";
import { IdempotencyInterceptor } from "./interceptors/idempotency.interceptor";
@Module({
imports: [
ConfigModule.forRoot({ envFilePath: ".env.order" }),
DatabaseModule,
TypeOrmModule.forFeature([
Order,
OrderItem,
OutboxEvent,
IdempotencyRecord,
]),
ScheduleModule.forRoot(),
MessagingModule.forDirectProducer({
clientToken: "ORDER_SERVICE",
queue: "payment_service_queue",
patternTransformer: (eventType) =>
eventType
.replace(/([A-Z])/g, ".$1")
.toLowerCase()
.slice(1),
}),
],
controllers: [OrderController],
providers: [
OrderService,
OrderRepository,
IdempotencyService,
IdempotencyInterceptor,
],
exports: [OrderService, OrderRepository],
})
export class OrderServiceModule {}Przyjrzyjmy się importom. Za odczytywanie zdarzeń outbox i wysyłanie wiadomości do brokera odpowiada MessagingModule, w którym wywołujemy metodę forDirectProducer. Widać, że przyjmuje konfigurację brokera wiadomości, a słowo „direct” w nazwie metody sugeruje, że użyjemy exchange typu direct.
Zejdźmy głębiej i zobaczmy, co kryje się pod metodą forDirectProducer.
typescript
// messaging.module.ts
// REST OF THE CODE...
@Module({})
export class MessagingModule {
static forDirectProducer({
clientToken,
queue,
patternTransformer,
}: DirectProducerOptions): DynamicModule {
return {
module: MessagingModule,
imports: [
TypeOrmModule.forFeature([OutboxEvent]),
ClientsModule.register([
{
name: clientToken,
...getRabbitMQConfig({ queue, noAck: true }),
},
]),
],
providers: [
OutboxPublisher,
OutboxPublisherWorker,
{
provide: "DIRECT_CLIENT",
useExisting: clientToken,
},
DirectPublisher,
{
provide: "OUTBOX_MESSAGE_PUBLISHER",
useExisting: DirectPublisher,
},
{
provide: "OUTBOX_EVENT_PATTERN_TRANSFORMER",
useValue: patternTransformer,
},
],
exports: [OutboxPublisher, OutboxPublisherWorker],
};
}
// REST OF THE CODE...
}Ta metoda to w praktyce funkcja-fabryka, która zwraca dynamiczny moduł, rejestrujący workera do pollingu i jego zależności. W tym przypadku spina ona workera do pollingu.
Aby zobaczyć implementację workera odpowiedzialnego za polling, zacznijmy od OutboxPublisherWorker.
typescript
// outbox-publisher-worker.ts
import { Injectable, Logger, Inject } from "@nestjs/common";
import { Cron, CronExpression } from "@nestjs/schedule";
import { OutboxPublisher } from "./outbox-publisher";
type MessagePublisher = {
publish(pattern: string, message: Record<string, any>): Promise<void>;
};
@Injectable()
export class OutboxPublisherWorker {
private readonly logger = new Logger(OutboxPublisherWorker.name);
private isProcessing = false;
constructor(
@Inject("OUTBOX_MESSAGE_PUBLISHER")
private readonly messagePublisher: MessagePublisher,
private readonly outboxPublisher: OutboxPublisher,
@Inject("OUTBOX_EVENT_PATTERN_TRANSFORMER")
private readonly transformPattern: (eventType: string) => string,
) {}
@Cron(CronExpression.EVERY_5_SECONDS)
async publishPendingEvents() {
if (this.isProcessing) {
this.logger.warn(
"Previous outbox processing still running, skipping this cycle",
);
return;
}
this.isProcessing = true;
try {
const { claimId, events } =
await this.outboxPublisher.claimPendingEvents();
if (events.length === 0) {
return;
}
this.logger.log(`Claimed ${events.length} events (claimId: ${claimId})`);
for (const event of events) {
try {
const pattern = this.transformPattern(event.eventType);
const message = {
id: event.id,
...event.payload,
};
await this.messagePublisher.publish(pattern, message);
await this.outboxPublisher.markEventAsSent({
eventId: event.id,
claimId,
});
this.logger.log(
`Published event ${event.id} of type ${event.eventType}`,
);
} catch (error) {
this.logger.error(
`Failed to publish event ${event.id}, will retry:`,
error,
);
await this.outboxPublisher.markEventAsFailed({
eventId: event.id,
claimId,
retryCount: event.retryCount + 1,
});
}
}
} catch (error) {
this.logger.error("Error in OutboxPublisherWorker:", error);
} finally {
this.isProcessing = false;
}
}
}Jak wspominałem wcześniej, worker korzysta z cron joba. Polling jest ustawiony na uruchamianie co 5 sekund.
Flow jest bardzo prosty: worker rezerwuje zdarzenia outbox, które trzeba wysłać, wywołując metodę serwisu OutboxPublisher:
typescript
// outbox-publisher.ts
// REST OF THE CODE...
async claimPendingEvents(batchSize: number = 50): Promise<ClaimResult> {
const claimId = uuid();
const events = await this.dataSource.transaction(async (manager) => {
const lockedRows: Array<{ id: string }> = await manager.query(
`SELECT id FROM outbox_events
WHERE status = $1
ORDER BY "createdAt" ASC
LIMIT $2
FOR UPDATE SKIP LOCKED`,
[OutboxEventStatus.PENDING, batchSize]
);
if (lockedRows.length === 0) {
return [];
}
const ids = lockedRows.map((row) => row.id);
await manager.query(
`UPDATE outbox_events
SET status = $1,
"processingClaim" = $2,
"processingStartedAt" = now()
WHERE id = ANY($3)`,
[OutboxEventStatus.PROCESSING, claimId, ids]
);
return manager.getRepository(OutboxEvent).find({
where: { processingClaim: claimId, status: OutboxEventStatus.PROCESSING },
order: { createdAt: 'ASC' },
});
});
return { claimId, events };
}
// REST OF THE CODE...W tej metodzie dzieją się dwie rzeczy:
- pobranie oczekujących zdarzeń
- oznaczenie ich statusu jako „processing”
typescript
// outbox-publisher.ts
// REST OF THE CODE...
async markEventAsSent({ eventId, claimId }: { eventId: string; claimId: string }): Promise<void> {
await this.dataSource.getRepository(OutboxEvent).update(
{ id: eventId, processingClaim: claimId },
{
status: OutboxEventStatus.SENT,
processedAt: new Date(),
processingClaim: null,
processingStartedAt: null,
}
);
}
async markEventAsFailed({
eventId,
claimId,
retryCount,
}: {
eventId: string;
claimId: string;
retryCount: number;
}): Promise<void> {
if (retryCount >= 5) {
await this.dataSource.getRepository(OutboxEvent).update(
{ id: eventId, processingClaim: claimId },
{
status: OutboxEventStatus.FAILED,
retryCount,
processingClaim: null,
processingStartedAt: null,
}
);
} else {
await this.dataSource.getRepository(OutboxEvent).update(
{ id: eventId, processingClaim: claimId },
{
status: OutboxEventStatus.PENDING,
retryCount,
processingClaim: null,
processingStartedAt: null,
}
);
}
}
// REST OF THE CODE...Payment Service
To drugi najważniejszy serwis: przetwarza płatność. Payment Service jest konsumentem Order Service. Rzućmy szybko okiem na to, jak serwis konsumuje wiadomość z brokera:
typescript
// order-created.consumer.ts
import { Controller, Logger } from "@nestjs/common";
import { EventPattern, Payload } from "@nestjs/microservices";
import { PaymentService } from "../services/payment.service";
import { ProcessedEventRepository } from "@app/messaging";
@Controller()
export class OrderCreatedConsumer {
private readonly logger = new Logger(OrderCreatedConsumer.name);
constructor(
private readonly paymentService: PaymentService,
private readonly processedEventRepository: ProcessedEventRepository,
) {}
@EventPattern("order.created")
async handleOrderCreated(
@Payload()
message: {
id: string;
orderId: string;
userId: string;
totalAmount: number;
currency: string;
items: Array<{
productId: string;
quantity: number;
unitPrice: number;
}>;
},
) {
try {
const consumerId = "payment-service";
const processed = await this.processedEventRepository.findProcessedEvent(
message.id,
consumerId,
);
if (processed) {
this.logger.warn(
`Event ${message.id} already processed by ${consumerId}`,
);
return;
}
this.logger.log(
`Processing OrderCreated event for order ${message.orderId}`,
);
await this.paymentService.processPayment(
message.orderId,
message.totalAmount,
message.currency,
);
await this.processedEventRepository.markAsProcessed(
message.id,
consumerId,
);
this.logger.log(
`Successfully processed OrderCreated event for order ${message.orderId}`,
);
} catch (error) {
this.logger.error(`Error processing OrderCreated event:`, error);
}
}
}Consumer odbiera wiadomość i uruchamia PaymentService.processPayment, czyli kolejną funkcjonalność która używa wzorca outbox.
Zobaczmy co tutaj mamy:
typescript
//payment.service.ts
import { Injectable, Logger } from "@nestjs/common";
import { InjectRepository } from "@nestjs/typeorm";
import { DataSource, Repository } from "typeorm";
import { Payment, PaymentAttempt, PaymentStatus } from "../entities";
import { OutboxEvent } from "@app/database";
import { PaymentDto } from "../dto/payment.dto";
import { PaymentRepository } from "../repositories/payment.repository";
import { PaymentAttemptRepository } from "../repositories/payment-attempt.repository";
import { v4 as uuid } from "uuid";
type PaymentProviderRequest = {
amount: number;
currency: string;
};
@Injectable()
export class PaymentService {
private readonly logger = new Logger(PaymentService.name);
constructor(
private readonly paymentRepository: PaymentRepository,
private readonly paymentAttemptRepository: PaymentAttemptRepository,
@InjectRepository(Payment)
private paymentsRepository: Repository<Payment>,
@InjectRepository(PaymentAttempt)
private paymentAttemptsRepository: Repository<PaymentAttempt>,
private dataSource: DataSource,
) {}
async processPayment(
orderId: string,
amount: number,
currency: string,
): Promise<PaymentDto> {
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
const existingPayment =
await this.paymentRepository.findByOrderId(orderId);
if (existingPayment) {
this.logger.warn(`Payment already exists for order ${orderId}`);
return this.mapToDto(existingPayment);
}
const payment = queryRunner.manager.create(Payment, {
id: uuid(),
orderId,
amount,
currency,
status: PaymentStatus.PROCESSING,
});
const savedPayment = await queryRunner.manager.save(payment);
const paymentResult = await this.callPaymentProvider({
amount,
currency,
});
const updatedPayment = { ...savedPayment };
if (paymentResult.success) {
updatedPayment.status = PaymentStatus.COMPLETED;
updatedPayment.externalPaymentId = paymentResult.transactionId;
} else {
updatedPayment.status = PaymentStatus.FAILED;
}
const paymentUpdate = queryRunner.manager.create(Payment, updatedPayment);
const finalPayment = await queryRunner.manager.save(paymentUpdate);
const outboxEvent = queryRunner.manager.create(OutboxEvent, {
id: uuid(),
aggregateType: "Payment",
aggregateId: finalPayment.id,
eventType: paymentResult.success ? "PaymentCompleted" : "PaymentFailed",
payload: paymentResult.success
? {
paymentId: finalPayment.id,
orderId: finalPayment.orderId,
amount: finalPayment.amount,
currency: finalPayment.currency,
transactionId: paymentResult.transactionId,
}
: {
paymentId: finalPayment.id,
orderId: finalPayment.orderId,
reason: paymentResult.error,
},
});
await queryRunner.manager.save(outboxEvent);
if (!paymentResult.success) {
const attempt = queryRunner.manager.create(PaymentAttempt, {
id: uuid(),
paymentId: finalPayment.id,
attemptNumber: 1,
errorMessage: paymentResult.error,
});
await queryRunner.manager.save(attempt);
}
await queryRunner.commitTransaction();
return this.mapToDto(finalPayment);
} catch (error) {
await queryRunner.rollbackTransaction();
this.logger.error(
`Error processing payment for order ${orderId}:`,
error,
);
throw error;
} finally {
await queryRunner.release();
}
}
async getPaymentByOrderId(orderId: string): Promise<PaymentDto | null> {
const payment = await this.paymentRepository.findByOrderId(orderId);
if (!payment) {
return null;
}
return this.mapToDto(payment);
}
private async callPaymentProvider({
amount,
currency,
}: PaymentProviderRequest) {
const pollCount = 3;
const jitter = (amount % 100) + currency.length;
for (let attempt = 1; attempt <= pollCount; attempt += 1) {
await this.wait(250 + attempt * 150 + (jitter % 50));
const random = Math.random();
if (random > 0.7) {
return {
success: true,
transactionId: `txn_${uuid()}`,
};
}
}
return {
success: false,
error: "Payment provider temporarily unavailable",
};
}
private wait(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms));
}
private mapToDto(payment: Payment): PaymentDto {
return {
id: payment.id,
orderId: payment.orderId,
amount: Number(payment.amount),
currency: payment.currency,
status: payment.status,
externalPaymentId: payment.externalPaymentId,
createdAt: payment.createdAt,
updatedAt: payment.updatedAt,
};
}
}Pomysł jest taki sam jak w Order Service. Otwieramy transakcję, zaczynamy odpytywać (polling) dostawcę płatności (tutaj robimy to dla prostoty i celów wyjaśniających, wywołując funkcję, która mockuje to zachowanie; wzorzec outbox można też zastosować w webhooku, który odbiera powiadomienia o rozstrzygnięciu transakcji), zapisujemy event do tabeli outbox ze statusem zależnym od powodzenia płatności, a jeśli płatność się nie powiedzie, zapisujemy również próbę płatności.
Message Relay dla Payment Service
Reszta data flow polega na dodaniu ostatniego elementu wzorca outbox i stworzeniu publishera, który powiadomi pozostałe serwisy (Notification Service i Email Service).
W tym przypadku musimy sięgnąć po fanout exchange. W tym celu używamy metody forFanoutProducer należącego do MessagingModule.
Pełna konfiguracja modułu Payment Service wygląda tak:
typescript
// payment-service/app/module.ts
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { TypeOrmModule } from "@nestjs/typeorm";
import { ScheduleModule } from "@nestjs/schedule";
import { DatabaseModule, OutboxEvent, ProcessedEvent } from "@app/database";
import { MessagingModule } from "@app/messaging";
import { Payment, PaymentAttempt } from "./entities";
import { PaymentService } from "./services/payment.service";
import { PaymentRepository } from "./repositories/payment.repository";
import { PaymentAttemptRepository } from "./repositories/payment-attempt.repository";
import { OrderCreatedConsumer } from "./consumers/order-created.consumer";
@Module({
imports: [
ConfigModule.forRoot({ envFilePath: ".env.payment" }),
DatabaseModule,
TypeOrmModule.forFeature([
Payment,
PaymentAttempt,
OutboxEvent,
ProcessedEvent,
]),
ScheduleModule.forRoot(),
MessagingModule.forFanoutProducer({
exchange: "payment.events",
patternTransformer: (eventType) => `payment.${eventType.toLowerCase()}`,
}),
MessagingModule.forConsumer(),
],
controllers: [OrderCreatedConsumer],
providers: [PaymentService, PaymentRepository, PaymentAttemptRepository],
exports: [PaymentService, PaymentRepository],
})
export class PaymentServiceModule {}MessagingModule pozwala nam używać RabbitMQ zarówno jako consumera, jak i producera publikującego do fanout exchange. Pod spodem używana jest ta sama klasa (OutboxPublisherWorker). Pokazywałem ją, omawiając message relay dla Order Service.
Serwisy Notification i Email
Na tym etapie możemy bezpiecznie powiadomić użytkownika, wysyłając powiadomienie i e-mail o płatności, mając pewność, że stan danych w naszym systemie jest spójny i aktualny.
Email Service:
typescript
// REST OF THE CODE...
@Controller()
export class PaymentCompletedConsumer {
private readonly logger = new Logger(PaymentCompletedConsumer.name);
constructor(
private readonly emailService: EmailService,
private readonly processedEventRepository: ProcessedEventRepository,
) {}
@EventPattern("payment.paymentcompleted")
async handlePaymentCompleted(
@Payload()
message: PaymentCompletedEvent & { id: string },
) {
// REST OF THE CODE...
try {
await this.emailService.sendConfirmationEmail({
orderId: message.orderId,
eventId: message.id,
paymentId: message.paymentId,
amount: message.amount,
currency: message.currency,
transactionId: message.transactionId,
});
this.logger.log(
`Successfully processed PaymentCompleted event for order ${message.orderId}`,
);
// REST OF THE CODE...
} catch (error) {
this.logger.error(`Error processing PaymentCompleted event:`, error);
}
}
}Notification Service:
typescript
// REST OF THE CODE...
@Controller()
export class PaymentCompletedConsumer {
private readonly logger = new Logger(PaymentCompletedConsumer.name);
constructor(
private readonly notificationService: NotificationService,
private readonly processedEventRepository: ProcessedEventRepository,
) {}
@EventPattern("payment.paymentcompleted")
async handlePaymentCompleted(
@Payload()
message: PaymentCompletedEvent & { id: string },
) {
// REST OF THE CODE...
try {
await this.notificationService.sendNotification({
orderId: message.orderId,
eventId: message.id,
paymentId: message.paymentId,
amount: message.amount,
currency: message.currency,
transactionId: message.transactionId,
});
await this.processedEventRepository.markAsProcessed(
message.id,
consumerId,
);
// REST OF THE CODE...
this.logger.log(
`Successfully processed PaymentCompleted event for order ${message.orderId}`,
);
} catch (error) {
this.logger.error(`Error processing PaymentCompleted event:`, error);
}
}
}Podsumowanie
Co osiągneliśmy implementując outbox pattern? Zdecydowanie poradziliśmy sobie z problemem podwójnego zapisu (dual-write), zapewniając spójność, a do tego uniknęliśmy transakcji rozproszonych, które są prawdziwą zmorą, jeśli chodzi o złożoność i utrzymanie. Te osiągnięcia to nie tylko słowa i terminologia — są krytyczne dla biznesu. Dzięki nim ta funkcjonalność nie przyniesie strat finansowych ani utraty reputacji, a do tego będzie łatwiejsza w utrzymaniu i monitorowaniu. Używanie właściwych wzorców i narzędzi do danego problemu bardzo pomaga w budowaniu niezawodnych systemów i sprawia, że funkcjonalności, które na pierwszy rzut oka wyglądają na złożone, stają się przyjaźniejsze do zbudowania i zrozumienia. Ucz się sprawdzonych, przetestowanych w boju wzorców, potem zastanów się, gdzie i dlaczego ich użyć, a następnie buduj z nimi, zobaczysz, że część skomplikowanych design systemów przestanie być dla Ciebie złożona.
PS: Sprawdź całe repozytorium na GitHubie tutaj.
