Za ekranami: Projektowanie i budowa systemu kampanii reklamowych w Node.js
Jestem typem osoby, która trzyma się z dala od ekranów, bo mnie rozpraszają. Wiem, że brzmi to dziwnie, biorąc pod uwagę mój zawód, ale nie będę tu opowiadał o moim podejściu do disital well-being'u (to temat na inny artykuł). Ostatnio zauważyłem kilka cyfrowych ekranów rozmieszczonych w centrum miasta, w którym mieszkam. Ogólnie nie lubię reklam, zbyt wielu ekranów ani zbyt wielu migających kolorów, ale przyszła mi do głowy jedna rzecz: jak, do licha, tworzy się taki system rozproszonych ekranów wyświetlających to, co zostanie zaplanowane? Co jeśli stracą połączenie? Jak wtedy kampanie mają dalej działać na tych ekranach? Dodatkowo zdałem sobie sprawę, że technologia, którą lubię i w której się specjalizuję, mogłaby dobrze pasować do takiego systemu. Postanowiłem podjąć wyzwanie i zaprojektować taki system oraz zbudować jego POC. Co więcej, od niedawna zacząłem bardziej skupiać się na rozwiązywaniu „problemów z zakresu system design”, więc zanurzmy się w temat.
Zbieranie wymagań funkcjonalnych i niefunkcjonalnych
Zanim zaczniemy rysować jakikolwiek diagram, musimy zebrać wymagania i jasno je określić. Najlepiej zacząć od podzielenia ich na wymagania funkcjonalne i niefunkcjonalne.
Wymagania funkcjonalne
- Użytkownik powinien mieć możliwość utworzenia kampanii poprzez zdefiniowanie metadanych, harmonogramu i zasobów
- Użytkownik powinien mieć możliwość zobaczenia wszystkich swoich kampanii
- System powinien utworzyć reklamę na podstawie metadanych i zasobów przesłanych przez użytkownika
- Reklamy powinny być wyświetlane zgodnie z harmonogramem na wybranych urządzeniach (ekranach)
- System powinien mieć możliwość anulowania wybranej kampanii
Wymagania niefunkcjonalne
- System powinien obsługiwać do 500 współbieżnych żądań utworzenia kampanii
- System powinien równocześnie wysyłać kampanie do maksymalnie 20000 ekranów
- Tworzenie kampanii powinno być idempotentne
- Kampania powinna rozpoczynać się w ciągu 1 sekundy od zaplanowanego czasu
- Ekrany powinny wysyłać potwierdzenia do backendu natychmiast po zainstalowaniu reklamy na urządzeniu
- Ekrany powinny działać offline, a jednocześnie nadal uruchamiać kampanie zgodnie z harmonogramem
Określenie kluczowych komponentów systemu
Jak widać, to zadanie nie jest trywialne. Aby spełnić wszystkie te wymagania, będziemy musieli zdefiniować trzy główne warstwy aplikacji:
- Dashboard Admin UI - obsługuje tworzenie kampanii poprzez dodawanie metadanych i zasobów oraz wyświetla utworzone kampanie i ich statusy.
- Backend - będzie zawierał komponenty takie jak dashboard API, worker szablonów do tworzenia i przechowywania szablonów, bazę danych do przechowywania metadanych, publisher do publikowania kampanii na urządzenia oraz consumer ACK-ów, który będzie odbierał wiadomości z urządzeń, aby śledzić stan publikacji kampanii. Workery będą połączone przez trwałą kolejkę.
- Frontend ekranów - będzie prostą aplikacją w vanilla JS, która będzie odbierać manifest kampanii, instalować kampanię w pamięci podręcznej, uruchamiać kampanie i powiadamiać backend o statusie publikacji.
UWAGA: W tym projekcie nie będziemy omawiać uwierzytelniania ani autoryzacji. Przykładowe repozytorium również ich nie implementuje. W tym artykule chcę skupić się wyłącznie na głównej idei systemu interaktywnych kampanii relkamowych na ekranach cyfrowych.
Przpływ danych oraz diagram design system'u
Najlepszym sposobem, aby zrozumieć, jak działają komponenty systemu, jest narysowanie diagramu. Po jego przeanalizowaniu będziemy mieli jasny, wysokopoziomowy obraz całego systemu.
Przepływ danych działa w pewnego rodzaju pętli sprzężenia zwrotnego uruchamianej z poziomu Admin UI, gdzie mamy: request -> API -> compute template, create manifest -> publish to the screens -> send ack to the backend, co domyka tę pętlę. Serwisy backendowe komunikują się przez BullMQ. Backend ma też storage plików (na potrzeby tego POC jest to po prostu system plików, ale równie dobrze może to być object storage albo cokolwiek innego, co pasuje do potrzeb). Spójność i atomowość są zapewnione dzięki użyciu wzorca outbox podczas rejestrowania zdarzeń tworzenia i anulowania kampanii oraz tworzenia template. Request POST służacy do utworzenia nowej kampanii wymaga klucza idempotencyjnego, który zapobiega tworzeniu wielu jobów dla tej samej kampanii. Bez idempotencji mogłoby to być trudne do udźwignięcia, szczególnie przy większej skali, ponieważ generowanie szablonu i przechowywanie zasobów kampanii nie są trywialnymi zadaniami, a w systemie oczywiście nie może być duplikatów. Od samego początku będziemy używać repliki do operacji odczytu jako świadomego wyboru na potrzeby skalowalności, ponieważ nawet przy umiarkowanym ruchu procesy działające w tle generują dużo zapisów i mogą szybko wpłynąć na wydajność odczytów, co przełoży się na słabe UX.
Model danych
Model danych w tym przypadku jest prosty. W ramach tego POC stworzymy cztery główne encje domenowe: device, campaign, campaign asset, and delivery event. Dodatkowo będziemy używać tabeli outbox, aby wspierać niezawodne przetwarzanie zdarzeń wewnętrznych. W przyszłości model danych można rozszerzyć o users, teams itd.
Oto minimalny model danych aplikacji, wraz z tabelą outbox:
Tabela: campaigns
SQL
CREATE TABLE campaigns (
id uuid NOT NULL DEFAULT uuid_generate_v4(),
name text NOT NULL,
version integer NOT NULL DEFAULT 1,
start_at bigint NOT NULL,
expire_at bigint NOT NULL,
metadata jsonb,
status campaigns_status_enum NOT NULL DEFAULT 'draft',
created_by text,
idempotency_key uuid,
created_at timestamp NOT NULL DEFAULT now(),
updated_at timestamp NOT NULL DEFAULT now(),
PRIMARY KEY (id)
);
CREATE INDEX idx_campaigns_start_at ON campaigns (start_at);Tabela: devices
SQL
CREATE TABLE devices (
device_id text NOT NULL,
group_id text,
last_seen timestamptz,
status devices_status_enum NOT NULL DEFAULT 'offline',
metadata jsonb,
PRIMARY KEY (device_id)
);Tabela: campaign_assets
SQL
CREATE TABLE campaign_assets (
id uuid NOT NULL DEFAULT uuid_generate_v4(),
campaign_id uuid NOT NULL,
asset_type campaign_assets_asset_type_enum NOT NULL,
url text NOT NULL,
checksum text,
duration_ms integer,
size_bytes bigint,
created_at timestamp NOT NULL DEFAULT now(),
PRIMARY KEY (id),
FOREIGN KEY (campaign_id) REFERENCES campaigns(id) ON DELETE CASCADE
);Tabela: delivery_events
SQL
CREATE TABLE delivery_events (
id uuid NOT NULL DEFAULT uuid_generate_v4(),
event_id uuid NOT NULL,
device_id text NOT NULL,
campaign_id uuid NOT NULL,
version integer NOT NULL,
event_type delivery_events_event_type_enum NOT NULL,
asset_id uuid,
payload jsonb,
created_at timestamp NOT NULL DEFAULT now(),
PRIMARY KEY (id),
UNIQUE (event_id),
FOREIGN KEY (device_id) REFERENCES devices(device_id) ON DELETE CASCADE,
FOREIGN KEY (campaign_id) REFERENCES campaigns(id) ON DELETE CASCADE
);
CREATE INDEX idx_delivery_events_device_id ON delivery_events (device_id);
CREATE INDEX idx_delivery_events_campaign_id ON delivery_events (campaign_id);Tabela: outbox
SQL
CREATE TABLE outbox (
id uuid NOT NULL DEFAULT uuid_generate_v4(),
aggregate_type text NOT NULL,
aggregate_id uuid NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
processed boolean NOT NULL DEFAULT false,
created_at timestamp NOT NULL DEFAULT now(),
processed_at timestamptz,
locked_by text,
locked_at timestamptz,
PRIMARY KEY (id)
);
CREATE INDEX idx_outbox_processed_locked_at_created_at ON outbox (processed, locked_at, created_at);Jak widać, cały model danych na tym etapie jest prosty i obejmuje przechowywanie zasobów oraz powiązanie kampanii z urządzeniami za pomocą tabeli delivery_events, a także wsparcie dla wzorca outbox. W Admin UI nie ma jeszcze żadnych filtrów, więc indeksy w przedstawionym projekcie nie są rozbudowane.
Struktura projektu
Struktura projektu jest zorganizowana jako monorepo, które składa się z Admin UI, backendu ze wszystkimi istotnymi usługami backendowymi w formie modułów, takimi jak API, publisher, outbox service i template worker, a także applikacji do ekranów. Na tym etapie wszystkie usługi backendowe będą miały jedno źródło prawdy, którym jest baza danych SQL.
Aby omówić wybory, kompromisy i wzorce, musimy przyjrzeć się nieco bliżej serwisom.
Admin UI
Admin UI odpowiada za tworzenie, odczytywanie i anulowanie kampanii. Do zbudowania tego serwisu użyłem React + Vite + TanStack Query.
Oto integracja API po stronie klienta dla wspomnianych przypadków:
typescript
async function fetchCampaigns({ page, limit }: FetchCampaignsParams) {
const offset = (page - 1) * limit;
const data = await apiFetch<CampaignListResponse>(
`/campaigns?offset=${offset}&limit=${limit}`,
);
return {
...data,
page,
totalPages: Math.ceil(data.total / limit),
};
}
async function createCampaign(payload: CreateCampaignPayload) {
return apiFetch<CampaignResponse>("/campaigns", {
method: "POST",
body: JSON.stringify(payload),
});
}
async function cancelCampaign(id: string) {
return apiFetch<CampaignResponse>(`/campaigns/${id}/cancel`, {
method: "POST",
});
}Te fetchery API są opakowane hookami TanStack Query i użyte poniżej w formularzu oraz w liście:
Tabela z paginacją
JSX
function CampaignTable() {
const [page, setPage] = useState(1);
const { data, isLoading, isError, error, isPlaceholderData } = useCampaigns({
page,
limit: PAGE_SIZE,
});
const cancelCampaign = useCancelCampaign();
if (isLoading) {
return (
<Card>
<CardHeader>
<CardTitle>Campaigns</CardTitle>
</CardHeader>
<CardContent>
<p className="text-sm text-muted-foreground">Loading campaigns...</p>
</CardContent>
</Card>
);
}
if (isError) {
return (
<Card>
<CardHeader>
<CardTitle>Campaigns</CardTitle>
</CardHeader>
<CardContent>
<p className="text-sm text-destructive">
{error?.message || "Failed to load campaigns"}
</p>
</CardContent>
</Card>
);
}
const campaigns = data?.data ?? [];
const totalPages = data?.totalPages ?? 1;
const total = data?.total ?? 0;
if (campaigns.length === 0 && page === 1) {
return (
<Card>
<CardHeader>
<CardTitle>Campaigns</CardTitle>
</CardHeader>
<CardContent>
<p className="text-sm text-muted-foreground">
No campaigns found. Create one below.
</p>
</CardContent>
</Card>
);
}
return (
<Card>
<CardHeader>
<CardTitle>Campaigns</CardTitle>
</CardHeader>
<CardContent className="space-y-4">
<Table>
<TableHeader>
<TableRow>
<TableHead>Name</TableHead>
<TableHead>Status</TableHead>
<TableHead>Start At</TableHead>
<TableHead>Expire At</TableHead>
<TableHead>Version</TableHead>
<TableHead>Created At</TableHead>
<TableHead className="w-[50px]" />
</TableRow>
</TableHeader>
<TableBody>
{campaigns.map((campaign) => (
<TableRow key={campaign.id}>
<TableCell className="font-medium">{campaign.name}</TableCell>
<TableCell>
<Badge
variant={STATUS_VARIANT_MAP[campaign.status] ?? "secondary"}
>
{campaign.status}
</Badge>
</TableCell>
<TableCell>{formatDate(campaign.startAt)}</TableCell>
<TableCell>{formatDate(campaign.expireAt)}</TableCell>
<TableCell>{campaign.version}</TableCell>
<TableCell>{formatDate(campaign.createdAt)}</TableCell>
<TableCell>
{campaign.status !== "cancelled" && (
<Button
variant="ghost"
size="icon-sm"
disabled={cancelCampaign.isPending}
onClick={() => cancelCampaign.mutate(campaign.id)}
>
<CircleX className="text-muted-foreground" />
</Button>
)}
</TableCell>
</TableRow>
))}
</TableBody>
</Table>
<div className="flex items-center justify-between">
<p className="text-sm text-muted-foreground">
{total} {total === 1 ? "campaign" : "campaigns"} total
</p>
<div className="flex items-center gap-2">
<Button
variant="outline"
size="icon-sm"
disabled={page <= 1}
onClick={() => setPage((p) => Math.max(1, p - 1))}
>
<ChevronLeft />
</Button>
<span className="text-sm tabular-nums">
{page} / {totalPages}
</span>
<Button
variant="outline"
size="icon-sm"
disabled={page >= totalPages || isPlaceholderData}
onClick={() => setPage((p) => p + 1)}
>
<ChevronRight />
</Button>
</div>
</div>
</CardContent>
</Card>
);
}Formularz
JSX
function CampaignForm() {
const createCampaign = useCreateCampaign();
const {
register,
handleSubmit,
control,
reset,
formState: { errors },
} = useForm<CreateCampaignInput>({
resolver: zodResolver(createCampaignSchema),
defaultValues: {
name: "",
startAt: "",
expireAt: "",
assets: [{ assetType: "image", url: "", durationMs: undefined }],
},
});
const { fields, append, remove } = useFieldArray({
control,
name: "assets",
});
function onSubmit(data: CreateCampaignInput) {
const idempotencyKey = crypto.randomUUID();
createCampaign.mutate(
{
name: data.name,
startAt: new Date(data.startAt).getTime(),
expireAt: new Date(data.expireAt).getTime(),
assets: data.assets.map((asset) => ({
assetType: asset.assetType,
url: asset.url,
durationMs: asset.durationMs,
})),
idempotencyKey,
},
{
onSuccess: () => {
toast.success("Campaign created successfully");
reset();
},
onError: (error) => {
toast.error(error.message || "Failed to create campaign");
},
},
);
}
return (
<Card>
<CardHeader>
<CardTitle>Create Campaign</CardTitle>
</CardHeader>
<CardContent>
<form onSubmit={handleSubmit(onSubmit)} className="space-y-4">
<div className="space-y-1.5">
<Label htmlFor="name">Name</Label>
<Input
id="name"
placeholder="Campaign name"
{...register("name")}
/>
{errors.name && (
<p className="text-sm text-destructive">{errors.name.message}</p>
)}
</div>
<div className="grid grid-cols-2 gap-4">
<div className="space-y-1.5">
<Label htmlFor="startAt">Start At</Label>
<Input
id="startAt"
type="datetime-local"
{...register("startAt")}
/>
{errors.startAt && (
<p className="text-sm text-destructive">
{errors.startAt.message}
</p>
)}
</div>
<div className="space-y-1.5">
<Label htmlFor="expireAt">Expire At</Label>
<Input
id="expireAt"
type="datetime-local"
{...register("expireAt")}
/>
{errors.expireAt && (
<p className="text-sm text-destructive">
{errors.expireAt.message}
</p>
)}
</div>
</div>
<div className="space-y-2">
<div className="flex items-center justify-between">
<Label>Assets</Label>
<Button
type="button"
variant="outline"
size="sm"
onClick={() =>
append({ assetType: "image", url: "", durationMs: undefined })
}
>
<PlusIcon className="size-4 mr-1" />
Add Asset
</Button>
</div>
{errors.assets?.root && (
<p className="text-sm text-destructive">
{errors.assets.root.message}
</p>
)}
{fields.map((field, index) => (
<div key={field.id} className="flex items-start gap-2">
<select
{...register(`assets.${index}.assetType`)}
className="focus-visible:ring-3 h-8 rounded-lg border border-input bg-transparent px-2.5 text-sm outline-none focus-visible:border-ring focus-visible:ring-ring/50"
>
<option value="image">Image</option>
<option value="video">Video</option>
<option value="html">HTML</option>
</select>
<div className="min-w-0 flex-1">
<Input
placeholder="Asset URL"
{...register(`assets.${index}.url`)}
/>
{errors.assets?.[index]?.url && (
<p className="text-sm text-destructive">
{errors.assets[index].url.message}
</p>
)}
</div>
<Input
type="number"
placeholder="Duration (ms)"
className="w-36"
{...register(`assets.${index}.durationMs`, {
setValueAs: (v: string) =>
v === "" ? undefined : Number(v),
})}
/>
{fields.length > 1 && (
<Button
type="button"
variant="ghost"
size="icon"
onClick={() => remove(index)}
>
<Trash2Icon className="size-4" />
</Button>
)}
</div>
))}
</div>
<Button type="submit" disabled={createCampaign.isPending}>
{createCampaign.isPending ? "Creating..." : "Create Campaign"}
</Button>
</form>
</CardContent>
</Card>
);
}Jedną z najważniejszych rzeczy z perspektywy niezawodności i spójności całego systemu jest to, że Admin UI generuje klucz idempotencyjny, dzięki czemu to samo żądanie utworzenia kampanii zostanie zapisane tylko raz. Dzięki temu żądanie jest bezpieczne w sytuacjach, gdy klient musi je ponowić, dochodzi do podwójnego kliknięcia albo z innych powodów klient wysyła zduplikowane żądanie. Usługa po stronie backendowego API wykorzysta później ten klucz podczas wstawiania kampanii do bazy danych.
Dlaczego React, a nie React z metaframeworkiem?
Od początku wybrałem React + Vite, ponieważ to ugruntowany i dobrze utrzymywany stack, co zawsze jest dobre, gdy zespół musi się skalować. W tym przypadku nie jestem skłonny wybierać żadnego metaframeworka. Moim zdaniem ten projekt nie będzie wymagał przypadków użycia z dużą przewagą odczytów, które często optymalizuje się za pomocą SSR, ani nie ma potrzeby optymalizowania SEO w przypadku dashboardu Admin UI.
Campaign API
Dla usług backendowych wybrałem NestJS i PostgreSQL, a TypeORM pełni tutaj rolę warstwy ORM.
Campaign API odpowiada za serwowania endpointów dla Admin UI i odzwierciedla te same zadania: tworzenie kampanii, anulowanie kampanii oraz wyświetlanie listy kampanii.
Przyjrzyjmy się od razu jego serwisowi:
typescript
import { Injectable, NotFoundException } from "@nestjs/common";
import { InjectRepository } from "@nestjs/typeorm";
import { DataSource, Repository } from "typeorm";
import { CampaignEntity, CampaignStatusEnum } from "./campaign.entity";
import { CampaignAssetEntity } from "./campaign-asset.entity";
import { OutboxEntity } from "../outbox/outbox.entity";
import { CreateCampaignDto } from "./dtos/create-campaign.dto";
import { CampaignListQueryDto } from "./dtos/campaign-list-response.dto";
@Injectable()
export class CampaignsService {
constructor(
@InjectRepository(CampaignEntity)
private readonly campaignRepository: Repository<CampaignEntity>,
private readonly dataSource: DataSource,
) {}
async create(dto: CreateCampaignDto) {
const existing = await this.campaignRepository.findOne({
where: { idempotencyKey: dto.idempotencyKey },
relations: ["assets"],
});
if (existing) {
return existing;
}
const campaign = await this.dataSource.transaction(async (manager) => {
const campaignEntity = manager.create(CampaignEntity, {
name: dto.name,
startAt: dto.startAt,
expireAt: dto.expireAt,
metadata: dto.metadata ?? null,
idempotencyKey: dto.idempotencyKey,
status: CampaignStatusEnum.DRAFT,
});
campaignEntity.assets = dto.assets.map((asset) =>
manager.create(CampaignAssetEntity, {
assetType: asset.assetType,
url: asset.url,
durationMs: asset.durationMs ?? null,
}),
);
const savedCampaign = await manager.save(CampaignEntity, campaignEntity);
const outboxEntry = manager.create(OutboxEntity, {
aggregateType: "campaign",
aggregateId: savedCampaign.id,
eventType: "campaign_created",
payload: {
campaignId: savedCampaign.id,
version: savedCampaign.version,
},
});
await manager.save(OutboxEntity, outboxEntry);
return savedCampaign;
});
return this.campaignRepository.findOne({
where: { id: campaign.id },
relations: ["assets"],
});
}
async findAll(query: CampaignListQueryDto) {
const { status, offset, limit } = query;
const [data, total] = await this.campaignRepository.findAndCount({
where: status ? { status } : {},
relations: ["assets"],
order: { createdAt: "DESC" },
skip: offset,
take: limit,
});
return { data, total, offset, limit };
}
async findById(id: string) {
const campaign = await this.campaignRepository.findOne({
where: { id },
relations: ["assets"],
});
if (!campaign) {
throw new NotFoundException(`Campaign with id ${id} not found`);
}
return campaign;
}
async cancel(id: string) {
const campaign = await this.campaignRepository.findOne({
where: { id },
});
if (!campaign) {
throw new NotFoundException(`Campaign with id ${id} not found`);
}
await this.dataSource.transaction(async (manager) => {
campaign.status = CampaignStatusEnum.CANCELLED;
await manager.save(CampaignEntity, campaign);
const outboxEntry = manager.create(OutboxEntity, {
aggregateType: "campaign",
aggregateId: campaign.id,
eventType: "campaign_cancelled",
payload: {
campaignId: campaign.id,
version: campaign.version,
},
});
await manager.save(OutboxEntity, outboxEntry);
});
return this.campaignRepository.findOne({
where: { id },
relations: ["assets"],
});
}
}W metodzie create widać, jak wykorzystywany jest wspomniany wcześniej klucz idempotencyjny. Służy on do sprawdzenia, czy kampania już istnieje. Jeśli tak, nowa kampania nie zostanie wstawiona do bazy danych.
Dlaczego wybrałem NestJS do backend'u?
To opinionated framework, ma szeroki zestaw paczek, jest dobrze utrzymywany, wspiera modularną architekturę, ma ugruntowane wzorce i świetnie integruje się z kolejkami oraz brokerami, co pomaga utrzymać wszystko uporządkowane. Dobrze sprawdza się też w przypadkach, w których trzeba zaktualizować wiele tabel w ramach jednej transakcji.
Dlaczego wybrałem relacyjną bazę danych z TypeORM
TypeORM ma wbudowane wsparcie dla replikacji w modelu primary-replica i świetnie integruje się z NestJS. PostgreSQL wybrałem ze względu na wygodne wsparcie dla jsonb, które przydaje się do przechowywania metadanych w ważnych encjach, takich jak campaigns i devices. Całość dobrze integruje się także z resztą systemu. Korzystanie z SQL i silnych gwarancji ACID jest ważne dla spójności i dobrze współgra ze wzorcami takimi jak outbox przy rozwiązywaniu problemu dual write w workerach.
Outbox Poller
Aby zapewnić atomowość i spójność oraz poradzić sobie z problemem dual write, zdecydowałem się wdrożyć wzorzec outbox dla tworzenia kampanii, przetwarzania template i anulowania kampanii. Wewnętrzna dystrybucja zdarzeń jest obsługiwana przez BullMQ. Widoczne jest to w przykładzie kiedy, Outbox Poller odpowiada za publikowanie do kolejki, dzięki czemu nie trafiają do niej niespójne joby:
typescript
import { Inject, Injectable, Logger, OnModuleDestroy } from "@nestjs/common";
import { DataSource } from "typeorm";
import { InjectQueue } from "@nestjs/bullmq";
import { Queue } from "bullmq";
import { Cron, CronExpression } from "@nestjs/schedule";
import type Redis from "ioredis";
import type Redlock from "redlock";
import { v4 as uuidv4 } from "uuid";
import { OUTBOX_REDIS_CLIENT, OUTBOX_REDLOCK } from "./redlock.provider";
type OutboxRow = {
id: string;
event_type: string;
payload: Record<string, unknown>;
};
const BATCH_SIZE = 10;
const LOCK_TTL_MS = 4000;
const LOCK_KEY = "locks:outbox-poll";
const CLAIM_TTL_MINUTES = 10;
@Injectable()
export class OutboxService implements OnModuleDestroy {
private readonly logger = new Logger(OutboxService.name);
constructor(
@InjectQueue("template-build")
private readonly templateBuildQueue: Queue,
@InjectQueue("publish")
private readonly publishQueue: Queue,
@Inject(OUTBOX_REDIS_CLIENT)
private readonly redisClient: Redis,
@Inject(OUTBOX_REDLOCK)
private readonly redlock: Redlock,
private readonly dataSource: DataSource,
) {}
async onModuleDestroy() {
this.redisClient.disconnect();
}
@Cron(CronExpression.EVERY_5_SECONDS)
async handleCron() {
let lock;
try {
lock = await this.redlock.acquire([LOCK_KEY], LOCK_TTL_MS);
} catch {
return;
}
try {
await this.processBatch();
await this.cleanupStaleClaims();
} finally {
await lock.release();
}
}
private async processBatch() {
const workerId = uuidv4();
const entries: OutboxRow[] = await this.dataSource.transaction(
async (manager) => {
const rows: OutboxRow[] = await manager.query(
`
SELECT * FROM outbox
WHERE processed = false
AND ("locked_by" IS NULL OR "locked_at" < now() - INTERVAL '${CLAIM_TTL_MINUTES} minutes')
ORDER BY "created_at" ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
`,
[BATCH_SIZE],
);
if (rows.length === 0) return [];
const ids = rows.map((r) => r.id);
await manager.query(
`UPDATE outbox SET "locked_by" = $1, "locked_at" = now() WHERE id = ANY($2::uuid[]) AND processed = false`,
[workerId, ids],
);
return rows;
},
);
if (entries.length === 0) return;
let processedCount = 0;
for (const entry of entries) {
try {
const jobOpts = { jobId: `outbox-${entry.id}` };
switch (entry.event_type) {
case "campaign_created":
await this.templateBuildQueue.add(
entry.event_type,
entry.payload,
jobOpts,
);
break;
case "template_ready":
await this.publishQueue.add(
entry.event_type,
entry.payload,
jobOpts,
);
break;
case "campaign_cancelled":
await this.publishQueue.add(
entry.event_type,
entry.payload,
jobOpts,
);
break;
default:
this.logger.warn(
`Unknown event type: ${entry.event_type} for outbox entry ${entry.id}`,
);
break;
}
await this.dataSource.query(
`UPDATE outbox SET processed = true, "processed_at" = now(), "locked_by" = NULL, "locked_at" = NULL WHERE id = $1 AND "locked_by" = $2`,
[entry.id, workerId],
);
processedCount++;
} catch (error) {
this.logger.error(
`Failed to publish outbox entry ${entry.id}: ${error}`,
);
try {
await this.dataSource.query(
`UPDATE outbox SET "locked_by" = NULL, "locked_at" = NULL WHERE id = $1 AND "locked_by" = $2`,
[entry.id, workerId],
);
} catch (e) {
this.logger.error(
`Failed to clear lock for outbox ${entry.id}: ${e}`,
);
}
}
}
this.logger.log(
`Processed ${processedCount}/${entries.length} outbox entries`,
);
}
private async cleanupStaleClaims() {
try {
await this.dataSource.query(
`UPDATE outbox SET "locked_by" = NULL, "locked_at" = NULL WHERE processed = false AND "locked_at" < now() - INTERVAL '${CLAIM_TTL_MINUTES} minutes'`,
);
} catch (error) {
this.logger.error(`Failed to cleanup stale outbox claims: ${error}`);
}
}
}Jak widać, próbowałem osiągnąć tutaj wysoką niezawodność poprzez batchowanie jobów i użycie locków. Cały przepływ jest zorganizowany w sposób przypominający claim/lock pattern: najpierw pobieramy zdarzenia z outboxa, potem przypisujemy do nich workera, aby mieć pewność, że tylko jeden konkretny worker obsługuje dany job, następnie publikujemy go do kolejki i aktualizujemy zdarzenie w outboxie. Gdy lock staje się nieaktualny, cleanupStaleClaims zwolni zdarzenie. Redukuje to ryzyko, że przypadku błędu zdarzenie zostanie utracone.
Dlaczego BullMQ do komunikacji między usługami?
W tym przypadku wybrałem BullMQ, ponieważ nie potrzebuję złożonego routingu ani wielu różnych typów wiadomości. Wymagania funkcjonalne sugerują też relatywnie umiarkowany ruch przy masowym tworzeniu kampanii (500 współbieżnych żądań), więc to rozwiązanie powinno dać radę.
Dlaczego polling zamiast CDC?
Wybrałem polling zamiast CDC, ponieważ szybkość reakcji nie jest w tym przypadku aż tak istotna. Wiele kampanii będzie zaplanowanych z dużym wyprzedzeniem względem czasu publikacji, więc jeśli polling co 5 sekund okaże się zbyt częsty ze względu na obciążenie bazy danych albo koszty, będziemy mieć przestrzeń, by zmniejszyć jego częstotliwość. Dla wymaganej przepustowości podejście oparte na pollingu jest lepszym wyborem, ponieważ powinno działać dobrze bez niepotrzebnego narzutu infrastruktury i dodatkowej złożoności związanej z CDC.
Template Worker
Template worker ma jedno zadanie. Zbiera zasoby i metadane kampanii oraz składa z nich szablon, który później zostanie pobrany przez urządzenie ekranu w celu wyświetlenia:
typescript
import { Injectable, Logger } from "@nestjs/common";
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import { DataSource } from "typeorm";
import { mkdir, writeFile } from "node:fs/promises";
import { join } from "node:path";
import {
CampaignEntity,
CampaignStatusEnum,
} from "../campaigns/campaign.entity";
import { OutboxEntity } from "../outbox/outbox.entity";
import { generateTemplate } from "./template-generator";
@Injectable()
@Processor("template-build")
export class TemplateProcessor extends WorkerHost {
private readonly logger = new Logger(TemplateProcessor.name);
constructor(private readonly dataSource: DataSource) {
super();
}
async process(job: Job) {
const { campaignId } = job.data;
const campaign = await this.dataSource
.getRepository(CampaignEntity)
.findOne({
where: { id: campaignId },
relations: ["assets"],
});
if (!campaign) {
throw new Error(`Campaign ${campaignId} not found`);
}
await this.dataSource.getRepository(CampaignEntity).update(campaignId, {
status: CampaignStatusEnum.BUILDING,
});
const html = generateTemplate({
campaignId: campaign.id,
name: campaign.name,
startAt: Number(campaign.startAt),
expireAt: Number(campaign.expireAt),
assets: campaign.assets.map((a) => ({
id: a.id,
assetType: a.assetType,
url: a.url,
durationMs: a.durationMs,
})),
});
const dir = join(process.cwd(), "storage", campaignId);
await mkdir(dir, { recursive: true });
await writeFile(join(dir, "index.html"), html);
await this.dataSource.transaction(async (manager) => {
const campaignToUpdate = await manager.findOne(CampaignEntity, {
where: { id: campaignId },
});
if (!campaignToUpdate) {
throw new Error(`Campaign ${campaignId} not found`);
}
campaignToUpdate.status = CampaignStatusEnum.READY;
await manager.save(CampaignEntity, campaignToUpdate);
const outboxEntry = manager.create(OutboxEntity, {
aggregateType: "campaign",
aggregateId: campaignId,
eventType: "template_ready",
payload: { campaignId, version: campaignToUpdate.version },
});
await manager.save(OutboxEntity, outboxEntry);
});
this.logger.log(
`Template built for campaign ${campaignId} v${campaign.version}`,
);
}
}Ta klasa jest po prostu workerem BullMQ. Zdecydowałem się użyć systemu plików jako storage, ponieważ to tylko POC, ale w środowisku produkcyjnym lepiej byłoby użyć object storage, np. S3.
Obszar do optymalizacji
Funkcja generateTemplate to dobre miejsce, które warto tutaj obserwować. W testach obciążeniowych nie zauważyłem problemów z wydajnością, ale jeśli kod w środku zostanie rozbudowany o większą liczbę obliczeń, może zacząć blokować event loop. Jeśli tak się stanie, tę funkcję należałoby przenieść do worker_threads.
Publisher Worker
Kolejny worker odpowiada za wypychanie manifestu kampanii do klientów ekranów albo publikowanie wiadomości odwołujących kampanię. Wysokopoziomowo jest to po prostu model pub-sub. Publisher używa protokołu MQTT do komunikacji z urządzeniami. MQTT (Message Queuing Telemetry Transport) implementuje wzorzec pub-sub z założenia. Jego głównym komponentem jest broker, który działa jak centralny serwer odbierający wiadomości i rozprowadzający je do klientów poprzez organizowanie danych w topics.
Ok, zobaczmy, co mamy w kodzie procesora:
typescript
import { Injectable, Logger } from "@nestjs/common";
import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq";
import { Job, Queue } from "bullmq";
import { DataSource } from "typeorm";
import { ConfigService } from "@nestjs/config";
import { TOPICS } from "@campaign-system/shared";
import {
CampaignEntity,
CampaignStatusEnum,
} from "../campaigns/campaign.entity";
import { MqttProvider } from "./mqtt.provider";
import { DeviceStreamService } from "./device-stream.service";
import { DevicePublishTracker } from "./device-publish-tracker";
import { buildManifest } from "./build-manifest";
const BATCH_SIZE = 200;
@Injectable()
@Processor("publish")
export class PublisherProcessor extends WorkerHost {
private readonly logger = new Logger(PublisherProcessor.name);
constructor(
private readonly dataSource: DataSource,
private readonly configService: ConfigService,
private readonly mqttProvider: MqttProvider,
private readonly deviceStreamService: DeviceStreamService,
private readonly devicePublishTracker: DevicePublishTracker,
@InjectQueue("verify-delivery")
private readonly verifyDeliveryQueue: Queue,
) {
super();
}
async process(job: Job) {
if (job.name === "campaign_cancelled") {
return this.handleCancel(job);
}
return this.handlePublish(job);
}
private async handlePublish(job: Job) {
const { campaignId } = job.data;
const campaign = await this.dataSource
.getRepository(CampaignEntity)
.findOne({
where: { id: campaignId },
relations: ["assets"],
});
if (!campaign) {
throw new Error(`Campaign ${campaignId} not found`);
}
const startResult = await this.dataSource
.getRepository(CampaignEntity)
.createQueryBuilder()
.update(CampaignEntity)
.set({ status: CampaignStatusEnum.PUBLISHING })
.where("id = :id AND status = :expected", {
id: campaignId,
expected: CampaignStatusEnum.READY,
})
.execute();
if (startResult.affected === 0) {
this.logger.warn(
`Campaign ${campaignId} is no longer in READY state, skipping publish`,
);
return;
}
const apiBaseUrl = this.configService.get(
"API_BASE_URL",
"http://localhost:3000",
);
const manifest = buildManifest({ campaign, apiBaseUrl });
const payload = JSON.stringify(manifest);
const alreadyPublished = await this.devicePublishTracker.getPublished({
campaignId: campaign.id,
version: campaign.version,
});
let successCount = 0;
let failureCount = 0;
for await (const batch of this.deviceStreamService.streamBatches({
batchSize: BATCH_SIZE,
excludeDeviceIds: alreadyPublished,
})) {
const results = await Promise.allSettled(
batch.map((device) =>
this.mqttProvider.publishWithTimeout({
topic: TOPICS.NOTIFICATIONS(device.deviceId),
payload,
qos: 1,
}),
),
);
const publishedIds: string[] = [];
results.forEach((result, j) => {
if (result.status === "fulfilled") {
successCount++;
publishedIds.push(batch[j]!.deviceId);
} else {
failureCount++;
this.logger.error(
`Failed to publish to device ${batch[j]?.deviceId}: ${result.reason}`,
);
}
});
await this.devicePublishTracker.markPublished({
campaignId: campaign.id,
version: campaign.version,
deviceIds: publishedIds,
});
}
await this.verifyDeliveryQueue.add(
"verify-delivery",
{ campaignId: campaign.id, version: campaign.version },
{
delay: 30000,
jobId: `verify-${campaign.id}-v${campaign.version}-a1`,
},
);
const finalResult = await this.dataSource
.getRepository(CampaignEntity)
.createQueryBuilder()
.update(CampaignEntity)
.set({ status: CampaignStatusEnum.ACTIVE })
.where("id = :id AND status = :expected", {
id: campaignId,
expected: CampaignStatusEnum.PUBLISHING,
})
.execute();
if (finalResult.affected === 0) {
this.logger.warn(
`Campaign ${campaignId} was modified during publishing (likely cancelled), not marking as ACTIVE`,
);
}
this.logger.log(
`Campaign ${campaignId} published — success: ${successCount}, failed: ${failureCount}, skipped: ${alreadyPublished.size}`,
);
}
private async handleCancel(job: Job) {
const { campaignId } = job.data;
const campaign = await this.dataSource
.getRepository(CampaignEntity)
.findOne({ where: { id: campaignId } });
if (!campaign || campaign.status !== CampaignStatusEnum.CANCELLED) {
this.logger.warn(
`Campaign ${campaignId} is not in CANCELLED state, skipping revoke`,
);
return;
}
const payload = JSON.stringify({ type: "revoke", campaignId });
const alreadyRevoked = await this.devicePublishTracker.getRevoked({
campaignId,
});
let successCount = 0;
let failureCount = 0;
for await (const batch of this.deviceStreamService.streamBatches({
batchSize: BATCH_SIZE,
excludeDeviceIds: alreadyRevoked,
})) {
const results = await Promise.allSettled(
batch.map((device) =>
this.mqttProvider.publishWithTimeout({
topic: TOPICS.CONTROL(device.deviceId),
payload,
qos: 1,
}),
),
);
const revokedIds: string[] = [];
results.forEach((result, j) => {
if (result.status === "fulfilled") {
successCount++;
revokedIds.push(batch[j]!.deviceId);
} else {
failureCount++;
this.logger.error(
`Failed to send revoke to device ${batch[j]?.deviceId}: ${result.reason}`,
);
}
});
await this.devicePublishTracker.markRevoked({
campaignId,
deviceIds: revokedIds,
});
}
await this.verifyDeliveryQueue.add(
"verify-revoke",
{ campaignId, attempt: 1 },
{
delay: 30000,
jobId: `verify-revoke-${campaignId}-a1`,
},
);
this.logger.log(
`Campaign ${campaignId} revoke sent — success: ${successCount}, failed: ${failureCount}, skipped: ${alreadyRevoked.size}`,
);
}
}Metody do sprawdzania i ustawiania danych w trackerze są wywoływane wewnątrz metod handlePublish i handleCancel w serwisie publisher processora.
Serwis obsługuje dwie ścieżki publikacji: publikowanie manifestu instalacyjnego (handlePublish) oraz anulowanie kampanii (handleCancel).
Kontrole startResult i finalResult pomagają zapobiegać race conditions.
Aby pomóc zagwarantować idempotentność publikacji, mamy mały serwis wykorzystujący Redis'a:
typescript
import { Injectable, OnModuleDestroy } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import Redis from "ioredis";
const KEY_TTL_SECONDS = 60 * 60 * 24;
type TrackParams = {
campaignId: string;
version: number;
deviceIds: string[];
};
type CampaignVersionParams = {
campaignId: string;
version: number;
};
type RevokeTrackParams = {
campaignId: string;
deviceIds: string[];
};
type RevokeQueryParams = {
campaignId: string;
};
@Injectable()
export class DevicePublishTracker implements OnModuleDestroy {
private readonly redis: Redis;
constructor(private readonly configService: ConfigService) {
this.redis = new Redis({
host: this.configService.get("REDIS_HOST", "localhost"),
port: this.configService.getOrThrow<number>("REDIS_PORT"),
keyPrefix: "pubtrack:",
});
}
onModuleDestroy() {
this.redis.disconnect();
}
private buildKey({ campaignId, version }: CampaignVersionParams) {
return `${campaignId}:v${version}`;
}
async markPublished({ campaignId, version, deviceIds }: TrackParams) {
if (deviceIds.length === 0) return;
const key = this.buildKey({ campaignId, version });
const pipeline = this.redis.pipeline();
pipeline.sadd(key, ...deviceIds);
pipeline.expire(key, KEY_TTL_SECONDS);
await pipeline.exec();
}
async getPublished({ campaignId, version }: CampaignVersionParams) {
const key = this.buildKey({ campaignId, version });
const members = await this.redis.smembers(key);
return new Set(members);
}
private buildRevokeKey({ campaignId }: RevokeQueryParams) {
return `revoke:${campaignId}`;
}
async markRevoked({ campaignId, deviceIds }: RevokeTrackParams) {
if (deviceIds.length === 0) return;
const key = this.buildRevokeKey({ campaignId });
const pipeline = this.redis.pipeline();
pipeline.sadd(key, ...deviceIds);
pipeline.expire(key, KEY_TTL_SECONDS);
await pipeline.exec();
}
public async getRevoked({ campaignId }: RevokeQueryParams) {
const key = this.buildRevokeKey({ campaignId });
const members = await this.redis.smembers(key);
return new Set(members);
}
}Wymagania mówią nam, że będziemy obsługiwać całkiem dużą liczbę urządzeń, więc potrzebujemy małego serwisu do strumieniowania urządzeń z bazy danych i publikowania do każdej takiej partii:
typescript
import { Injectable } from "@nestjs/common";
import { DataSource } from "typeorm";
import { DeviceEntity } from "../devices/device.entity";
type StreamBatchesParams = {
batchSize: number;
excludeDeviceIds?: Set<string>;
};
@Injectable()
export class DeviceStreamService {
constructor(private readonly dataSource: DataSource) {}
async *streamBatches({ batchSize, excludeDeviceIds }: StreamBatchesParams) {
let cursor: string | null = null;
while (true) {
const qb = this.dataSource
.getRepository(DeviceEntity)
.createQueryBuilder("device")
.orderBy("device.deviceId", "ASC")
.take(batchSize);
if (cursor) {
qb.where("device.deviceId > :cursor", { cursor });
}
const rows = await qb.getMany();
if (rows.length === 0) break;
cursor = rows[rows.length - 1]!.deviceId;
if (excludeDeviceIds && excludeDeviceIds.size > 0) {
const filtered = rows.filter((d) => !excludeDeviceIds.has(d.deviceId));
if (filtered.length > 0) {
yield filtered;
}
} else {
yield rows;
}
if (rows.length < batchSize) break;
}
}
}Ta klasa udostępnia metodę będącą generatorem. Generator zwraca urządzenia w konfigurowalnych batchach opartych na paginacji kursorem, co daje dużą kontrolę nad przetwarzaniem danych i pozwala dostosować je w razie potrzeby.
Musiałem też uwzględnić przypadek, w którym urządzenia nie otrzymują manifestu albo wiadomości anulującej z powodu bycia offline. Miałem co najmniej dwie opcje do wyboru:
- Zmusić broker MQTT do obsługi retry poprzez skonfigurowanie kolejkowania po stronie brokera.
- Utworzyć osobny worker service odpowiedzialny za ponawianie dostarczania manifestu do urządzeń offline.
Wybrałem drugą opcję jako bezpieczniejszą. Dzięki temu nie trzeba używać kolejek wewnątrz brokera MQTT, a jednocześnie mamy pełną kontrolę nad redelivery, w tym nad tym, jak paginowane są niedostarczone joby. Zmniejsza to też ryzyko wyczerpania zasobów po stronie brokera, bo przechowywanie niedostarczonych wiadomości mogłoby stać się wąskim gardłem, zwłaszcza przy skali rzędu 20k urządzeń.
Oto kod:
typescript
import { Injectable, Logger } from "@nestjs/common";
import { InjectQueue, Processor, WorkerHost } from "@nestjs/bullmq";
import { Job, Queue } from "bullmq";
import { DataSource } from "typeorm";
import { ConfigService } from "@nestjs/config";
import { TOPICS } from "@campaign-system/shared";
import { CampaignEntity } from "../campaigns/campaign.entity";
import {
DeliveryEventEntity,
DeliveryEventTypeEnum,
} from "../ack-consumer/delivery-event.entity";
import { DeviceStreamService } from "./device-stream.service";
import { DevicePublishTracker } from "./device-publish-tracker";
import { buildManifest } from "./build-manifest";
import { MqttProvider } from "../mqtt/mqtt.provider";
const BATCH_SIZE = 200;
const VERIFY_DELAY_MS = 30_000;
const MAX_ATTEMPTS = 3;
type VerifyJobData = {
campaignId: string;
version: number;
attempt?: number;
};
type VerifyRevokeJobData = {
campaignId: string;
attempt?: number;
};
@Injectable()
@Processor("verify-delivery")
export class VerifyDeliveryProcessor extends WorkerHost {
private readonly logger = new Logger(VerifyDeliveryProcessor.name);
constructor(
private readonly dataSource: DataSource,
private readonly configService: ConfigService,
private readonly mqttProvider: MqttProvider,
private readonly deviceStreamService: DeviceStreamService,
private readonly devicePublishTracker: DevicePublishTracker,
@InjectQueue("verify-delivery")
private readonly verifyDeliveryQueue: Queue,
) {
super();
}
async process(job: Job) {
if (job.name === "verify-revoke") {
return this.handleVerifyRevoke(job as Job<VerifyRevokeJobData>);
}
return this.handleVerifyDelivery(job as Job<VerifyJobData>);
}
private async handleVerifyDelivery(job: Job<VerifyJobData>) {
const { campaignId, version, attempt = 1 } = job.data;
this.logger.log(
`Verifying delivery for campaign ${campaignId} v${version} (attempt ${attempt})`,
);
const campaign = await this.dataSource
.getRepository(CampaignEntity)
.findOne({ where: { id: campaignId }, relations: ["assets"] });
if (!campaign) {
this.logger.warn(`Campaign ${campaignId} not found`);
return;
}
const ackRows = await this.dataSource
.getRepository(DeliveryEventEntity)
.find({
select: { deviceId: true },
where: {
campaignId,
version,
eventType: DeliveryEventTypeEnum.INSTALL_ACK,
},
});
const ackedDeviceIds = new Set(ackRows.map((r) => r.deviceId));
const apiBaseUrl = this.configService.get(
"API_BASE_URL",
"http://localhost:3000",
);
const manifest = buildManifest({ campaign, apiBaseUrl });
const payload = JSON.stringify(manifest);
let missingCount = 0;
let republishCount = 0;
for await (const batch of this.deviceStreamService.streamBatches({
batchSize: BATCH_SIZE,
excludeDeviceIds: ackedDeviceIds,
})) {
missingCount += batch.length;
const results = await Promise.allSettled(
batch.map((device) =>
this.mqttProvider.publishWithTimeout({
topic: TOPICS.NOTIFICATIONS(device.deviceId),
payload,
qos: 1,
retain: true,
timeout: 5000,
}),
),
);
results.forEach((result, j) => {
if (result.status === "fulfilled") {
republishCount++;
} else {
this.logger.error(
`Retry publish failed for device ${batch[j]?.deviceId}: ${result.reason}`,
);
}
});
}
if (missingCount === 0) {
this.logger.log(
`All devices acknowledged campaign ${campaignId} v${version}`,
);
return;
}
this.logger.log(
`Re-published to ${republishCount}/${missingCount} missing devices for campaign ${campaignId}`,
);
if (attempt < MAX_ATTEMPTS) {
try {
const nextAttempt = attempt + 1;
await this.verifyDeliveryQueue.add(
"verify-delivery",
{ campaignId, version, attempt: nextAttempt },
{
delay: VERIFY_DELAY_MS * Math.pow(2, attempt - 1),
jobId: `verify-${campaignId}-v${version}-a${nextAttempt}`,
},
);
} catch (e) {
this.logger.error(`Failed to schedule next verify attempt: ${e}`);
}
} else {
this.logger.log(
`Max attempts reached for campaign ${campaignId} v${version}`,
);
}
}
private async handleVerifyRevoke(job: Job<VerifyRevokeJobData>) {
const { campaignId, attempt = 1 } = job.data;
this.logger.log(
`Verifying revoke delivery for campaign ${campaignId} (attempt ${attempt})`,
);
const alreadyRevoked = await this.devicePublishTracker.getRevoked({
campaignId,
});
const payload = JSON.stringify({ type: "revoke", campaignId });
let missingCount = 0;
let republishCount = 0;
for await (const batch of this.deviceStreamService.streamBatches({
batchSize: BATCH_SIZE,
excludeDeviceIds: alreadyRevoked,
})) {
missingCount += batch.length;
const results = await Promise.allSettled(
batch.map((device) =>
this.mqttProvider.publishWithTimeout({
topic: TOPICS.CONTROL(device.deviceId),
payload,
qos: 1,
timeout: 5000,
}),
),
);
const revokedIds: string[] = [];
results.forEach((result, j) => {
if (result.status === "fulfilled") {
republishCount++;
revokedIds.push(batch[j]!.deviceId);
} else {
this.logger.error(
`Retry revoke failed for device ${batch[j]?.deviceId}: ${result.reason}`,
);
}
});
await this.devicePublishTracker.markRevoked({
campaignId,
deviceIds: revokedIds,
});
}
if (missingCount === 0) {
this.logger.log(`All devices received revoke for campaign ${campaignId}`);
return;
}
this.logger.log(
`Re-sent revoke to ${republishCount}/${missingCount} missing devices for campaign ${campaignId}`,
);
if (attempt < MAX_ATTEMPTS) {
const nextAttempt = attempt + 1;
await this.verifyDeliveryQueue.add(
"verify-revoke",
{ campaignId, attempt: nextAttempt },
{
delay: VERIFY_DELAY_MS * Math.pow(2, attempt - 1),
jobId: `verify-revoke-${campaignId}-a${nextAttempt}`,
},
);
} else {
this.logger.log(
`Max revoke verify attempts reached for campaign ${campaignId}`,
);
}
}
}Jeszcze jedna rzecz to konfiguracja brokera MQTT, gdzie wywołanie mqtt.connect() używa dostrojonych opcji:
keepalive: 30 — wysyła PINGREQ co 30 s, aby wykrywać martwe połączeniareconnectPeriod: 1000 — ponawia połączenie po 1 s od rozłączeniaconnectTimeout: 10_000 — 10 s timeout dla początkowego połączeniaclean: true — startuje z czystą sesją (bez starego stanu)reschedulePings: true — resetuje timer keepalive przy aktywności (unika zbędnych pingów przy wysokiej przepustowości)
W scenariuszu z 20k urządzeń wystarczyło jedynie lekko dostroić konfigurację, aby obsłużyć wyższą przepustowość. Jeśli ten limit wzrośnie, możemy wprowadzić małą pulę połączeń i użyć algorytmu round-robin do rozdzielania ruchu wiadomości, ale warto pamiętać, że wiąże się to z większym zużyciem zasobów, ponieważ do osiągnięcia tego celu potrzeba więcej połączeń WebSocket'ów.
Dlaczego MQTT zamiast samych WebSocketów
Dla tej części kodu zdecydowałem się polegać na MQTT, ponieważ rozwiązuje wiele problemów out out of the box, takich jak dystrybucja wiadomości oparta na topicach. Zachowuje się już jak prawdziwy system pub-sub. Gdybym zdecydował się użyć wyłącznie WebSocketów, skończyłbym na implementowaniu dodatkowej abstrakcji. Koszt infrastruktury jest też niższy w porównaniu z budowaniem i utrzymywaniem własnego serwisu pub-sub od zera.
Screen Device
Aplikacja działająca na ekranie to mała aplikacja frontendowa napisana bez użycia żadnego frameworka frontendowego. Zdecydowałem się na takie podejście, ponieważ template jest składany po stronie serwera, zapisywany, a następnie pobierany przez ekran, więc nie ma potrzeby budowania go za każdym razem w runtime na urządzeniu. Dzięki temu frontend pozostaje bardzo lekki, co ma znaczenie w przypadku takich urządzeń.
Główny moduł aplikacji ekranu wygląda tak:
typescript
import type { MqttClient } from "mqtt";
import { TOPICS } from "@campaign-system/shared";
import type { Manifest } from "@campaign-system/shared";
import {
initStorage,
saveManifest,
getManifestByCampaign,
deleteCampaign,
} from "./storage";
import { connectMqtt } from "./mqtt-client";
import { fetchTemplate } from "./display";
import { initScheduler, checkAndSchedule, cancelScheduled } from "./scheduler";
import {
publishInstallAck,
publishDisplayStart,
publishDisplayComplete,
publishRevokeAck,
} from "./ack";
type ControlMessage = {
type: string;
campaignId: string;
};
type ClientContext = {
client: MqttClient;
deviceId: string;
};
function getDeviceId() {
const params = new URLSearchParams(window.location.search);
return params.get("deviceId") ?? "dev-001";
}
function getBrokerUrl() {
const params = new URLSearchParams(window.location.search);
return params.get("brokerUrl") ?? "ws://localhost:9001";
}
async function handleManifest(
manifest: Manifest,
{ client, deviceId }: ClientContext,
) {
console.log(
`[main] received manifest for campaign ${manifest.campaignId} v${manifest.version}`,
);
if (!manifest.manifestId || !manifest.campaignId || !manifest.templateUrl) {
console.error("[main] invalid manifest: missing required fields");
return;
}
const existing = await getManifestByCampaign(manifest.campaignId);
if (existing && existing.version >= manifest.version) {
console.log(
`[main] campaign ${manifest.campaignId} already installed at v${existing.version}, skipping`,
);
return;
}
await saveManifest(manifest);
try {
await fetchTemplate(manifest.templateUrl);
} catch (error) {
console.error("[main] failed to prefetch template:", error);
}
publishInstallAck({
client,
deviceId,
campaignId: manifest.campaignId,
version: manifest.version,
});
await checkAndSchedule();
}
async function handleControl(
message: ControlMessage,
{ client, deviceId }: ClientContext,
) {
console.log(`[main] received control message: ${message.type}`);
if (message.type === "revoke") {
const manifest = await getManifestByCampaign(message.campaignId);
cancelScheduled(message.campaignId);
await deleteCampaign(message.campaignId);
publishRevokeAck({
client,
deviceId,
campaignId: message.campaignId,
version: manifest?.version ?? 0,
});
console.log(`[main] revoked campaign ${message.campaignId}`);
}
}
async function bootstrap() {
const deviceId = getDeviceId();
const brokerUrl = getBrokerUrl();
console.log(`[main] starting screen client for device ${deviceId}`);
console.log(`[main] connecting to broker at ${brokerUrl}`);
await initStorage();
const container = document.getElementById("display");
if (!container) {
throw new Error("Display container #display not found");
}
const client = connectMqtt({ brokerUrl, deviceId });
const context: ClientContext = { client, deviceId };
initScheduler({
container,
callbacks: {
onDisplayStart: (manifest) => {
publishDisplayStart({
client,
deviceId,
campaignId: manifest.campaignId,
version: manifest.version,
});
},
onDisplayComplete: (manifest) => {
publishDisplayComplete({
client,
deviceId,
campaignId: manifest.campaignId,
version: manifest.version,
});
},
},
});
client.on("message", (topic, payload) => {
try {
const message = JSON.parse(payload.toString());
if (topic === TOPICS.NOTIFICATIONS(deviceId)) {
handleManifest(message as Manifest, context);
} else if (topic === TOPICS.CONTROL(deviceId)) {
handleControl(message as ControlMessage, context);
}
} catch (error) {
console.error(`[mqtt] failed to parse message on topic ${topic}:`, error);
}
});
await checkAndSchedule();
console.log("[main] screen client ready");
}
bootstrap().catch((error) => {
console.error("[main] failed to initialize screen client:", error);
});Ten kod orkiestruje kilka rzeczy, które są kluczowe, aby ekran mógł zainstalować, zaplanować i anulować kampanię:
- storage inicjalizuje, zapisuje, pobiera i usuwa manifest. W tym przypadku wykorzystuje IndexedDB, które jest wystarczające jako storage.
- połączenie klienta MQTT pozwala urządzeniu subskrybować pub-sub po stronie serwera i komunikować się z serwerem poprzez wysyłanie ACK-ów dla operacji publikacji i anulowania.
- scheduler inicjalizuje scheduler, który obsługuje czas wyświetlania na urządzeniu i spełnia jedno z wymagań funkcjonalnych.
- fetch template pobiera szablon zbudowany na serwerze i zapisany w storage zasobów. Warto zauważyć, że szablon jest prefetchowany wcześniej (5 min obsługiwane przez scheduler), aby mieć pewność, że będzie dostępny, gdy nadejdzie zaplanowany czas. Zgodnie z wymaganiami funkcjonalnymi dopuszczalne opóźnienie wynosi 1 sekundę.
Dlaczego cache’ować manifest i planować kampanie lokalnie zamiast wypychać je zgodnie z harmonogramem przez MQTT
Gdy mamy ~20k urządzeń subskrybujących pub-sub, poleganie na serwerze przy live publishing'u staje się ryzykowne. W trakcie publikacji wiele rzeczy może pójść nie tak, na przykład opóźnienia sieciowe, przeciążony broker albo urządzenia będące offline w zaplanowanym czasie. Przechowywanie manifestu po stronie klienta na urządzeniu ma praktycznie zerowy koszt. Jest też przestrzeń na dalsze zwiększanie niezawodności, na przykład przez zdefiniowanie maksymalnego czasu wyprzedzenia przed startem harmonogramu, w którym kampania musi zostać utworzona.
ACK Consumer
Teraz czas na ostatni serwis w naszym projekcie, który domyka pętlę publish -> ACK. Ten consumer odpowiada za odbieranie i przetwarzanie ACK-ów. Urządzenia ekranów publikują do niego wiadomości w zależności od tego, którą operację trzeba potwierdzić.
typescript
import { Injectable, Logger, OnModuleDestroy } from "@nestjs/common";
import { InjectQueue } from "@nestjs/bullmq";
import { Queue } from "bullmq";
import { z } from "zod";
import { EVENT_TYPES } from "@campaign-system/shared";
import { DeliveryEventTypeEnum } from "./delivery-event.entity";
const eventTypes = Object.values(EVENT_TYPES);
export const ackEventSchema = z.object({
eventId: z.string(),
eventType: z.enum(eventTypes as [string, ...string[]]),
deviceId: z.string(),
campaignId: z.string(),
version: z.number(),
timestamp: z.number(),
payload: z.record(z.string(), z.unknown()).optional(),
});
export type AckEvent = z.infer<typeof ackEventSchema>;
export const EVENT_TYPE_MAP: Record<
AckEvent["eventType"],
DeliveryEventTypeEnum
> = {
[EVENT_TYPES.INSTALL_ACK]: DeliveryEventTypeEnum.INSTALL_ACK,
[EVENT_TYPES.DISPLAY_START]: DeliveryEventTypeEnum.DISPLAY_START,
[EVENT_TYPES.DISPLAY_COMPLETE]: DeliveryEventTypeEnum.DISPLAY_COMPLETE,
[EVENT_TYPES.REVOKE_ACK]: DeliveryEventTypeEnum.REVOKE_ACK,
[EVENT_TYPES.ERROR]: DeliveryEventTypeEnum.ERROR,
};
export const ACK_QUEUE_NAME = "ack-events-batch";
const FLUSH_INTERVAL_MS = 500;
const MAX_BUFFER_SIZE = 2000;
@Injectable()
export class AckConsumerService implements OnModuleDestroy {
private readonly logger = new Logger(AckConsumerService.name);
private buffer: AckEvent[] = [];
private flushTimer: ReturnType<typeof setInterval>;
constructor(@InjectQueue(ACK_QUEUE_NAME) private readonly ackQueue: Queue) {
this.flushTimer = setInterval(() => {
this.flush().catch((error) => {
this.logger.error(`Flush failed: ${error}`);
});
}, FLUSH_INTERVAL_MS);
}
async onModuleDestroy() {
clearInterval(this.flushTimer);
await this.flush();
}
bufferEvent(event: AckEvent) {
const mappedEventType = EVENT_TYPE_MAP[event.eventType];
if (!mappedEventType) {
this.logger.warn(`Unknown event type: ${event.eventType}, discarding`);
return;
}
this.buffer.push(event);
if (this.buffer.length >= MAX_BUFFER_SIZE) {
void this.flush();
}
}
private async flush() {
if (this.buffer.length === 0) return;
const events = this.buffer;
this.buffer = [];
try {
await this.ackQueue.add(
"batch",
{ events },
{
attempts: 3,
backoff: { type: "exponential", delay: 1000 },
removeOnComplete: true,
removeOnFail: 100,
},
);
this.logger.debug(`Flushed ${events.length} events to queue`);
} catch (error) {
this.logger.error(
`Failed to enqueue batch of ${events.length} events: ${error}`,
);
this.buffer = [...events, ...this.buffer];
}
}
}Ten serwis musi być przygotowany na obsługę znaczącego burstu ACK-ów, ponieważ wymagania mówią o ~20k urządzeń. Wybranie naiwnego podejścia polegającego na wstawianiu każdego zdarzenia ACK bezpośrednio do bazy danych nie skalowałoby się dobrze. W tym przypadku zdecydowałem się połączyć buffering z batchingiem w kolejce. Jako kolejki użyłem istniejącej infrastruktury (BullMQ). Rozważałem Kafkę, ale w tym przypadku BullMQ w zupełności wystarcza i daje nam wszystko, czego potrzebujemy, w tym buforowanie danych, obsługę błędów i konfigurowalną współbieżność.
Logika batch processora wygląda tak:
typescript
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Logger } from "@nestjs/common";
import { DataSource } from "typeorm";
import { Job } from "bullmq";
import {
DeliveryEventEntity,
DeliveryEventTypeEnum,
} from "./delivery-event.entity";
import { DeviceEntity } from "../devices/device.entity";
import { ACK_QUEUE_NAME, EVENT_TYPE_MAP } from "./ack-consumer.service";
type AckEventJobItem = {
eventId: string;
eventType: string;
deviceId: string;
campaignId: string;
version: number;
timestamp: number;
payload?: Record<string, unknown>;
};
type BatchJobData = {
events: AckEventJobItem[];
};
type MappedDeliveryEvent = {
eventId: string;
deviceId: string;
campaignId: string;
version: number;
eventType: DeliveryEventTypeEnum;
payload: any;
};
const DB_CHUNK_SIZE = 500;
const DEVICE_UPDATE_CHUNK_SIZE = 5000;
function chunk<T>(array: T[], size: number) {
const result: T[][] = [];
for (let i = 0; i < array.length; i += size) {
result.push(array.slice(i, i + size));
}
return result;
}
@Processor(ACK_QUEUE_NAME, { concurrency: 3 })
export class AckBatchProcessor extends WorkerHost {
private readonly logger = new Logger(AckBatchProcessor.name);
constructor(private readonly dataSource: DataSource) {
super();
}
async process(job: Job<BatchJobData>) {
const { events } = job.data;
const deliveryValues: MappedDeliveryEvent[] = [];
for (const event of events) {
const mappedType = EVENT_TYPE_MAP[event.eventType];
if (!mappedType) continue;
deliveryValues.push({
eventId: event.eventId,
deviceId: event.deviceId,
campaignId: event.campaignId,
version: event.version,
eventType: mappedType,
payload: event.payload ?? null,
});
}
const poolSize = (this.dataSource.options as any).extra?.max ?? 10;
const maxParallelChunks = Math.max(1, Math.floor(poolSize / 4));
const insertChunks = chunk(deliveryValues, DB_CHUNK_SIZE);
for (let i = 0; i < insertChunks.length; i += maxParallelChunks) {
const batch = insertChunks.slice(i, i + maxParallelChunks);
await Promise.all(
batch.map((values) =>
this.dataSource
.getRepository(DeliveryEventEntity)
.createQueryBuilder()
.insert()
.values(values)
.orIgnore()
.execute(),
),
);
}
const uniqueDeviceIds = [...new Set(events.map((e) => e.deviceId))];
for (const deviceIdBatch of chunk(
uniqueDeviceIds,
DEVICE_UPDATE_CHUNK_SIZE,
)) {
await this.dataSource
.createQueryBuilder()
.update(DeviceEntity)
.set({ lastSeen: () => "NOW()" })
.whereInIds(deviceIdBatch)
.execute();
}
this.logger.log(
`Batch processed: ${events.length} events, ${uniqueDeviceIds.length} devices`,
);
}
}Zdarzenia są najpierw mapowane, a następnie maksymalny poziom równoległości jest obliczany dynamicznie na podstawie maksymalnej liczby dostępnych połączeń w puli, tak aby zachować równowagę między szybkością a niezawodnością. Następnie wykonywane są inserty, a na końcu dotknięte urządzenia są oznaczane jako widziane.
Podsumowanie
Projektowanie systemów, rozbijanie ich na komponenty, rozważanie kompromisów i podejmowanie decyzji daje mi ogromną satysfakcję. Wiem, że ten kod nie jest jeszcze gotowy na produkcję (i tak naprawdę wciąż bardzo daleko mu do tego stanu), ale mimo to mam z jego budowania mnóstwo frajdy i nadal czuję wyrzut dopaminy, gdy widzę przechodzące load testy. Ten POC jest dobrym punktem wyjścia do stworzenia prawdziwego systemu. Odkąd AI stało się "mądrzejsze", słyszałem wiele narzekań na utratę satysfakcji z programowania. Dla mnie ta satysfakcja jest taka sama jak przed erą AI. Najbardziej satysfakcjonujące nadal jest budowanie złożonych systemów, składanie wzorców oraz eksplorowanie różnych opcji i rozwiązań. W kodzie często dzieje się to w mikro skali, ale kiedy patrzymy na szerszy obraz, satysfakcja przychodzi z makro skali.
Sprawdź repozytorium z tym projektem na moim GitHubie
PS: W tym artykule wspomniałem też o wzorcu outbox. Zajrzyj jeśli chcesz dowiedzieć się więcej o samym wzorcu.
