๋ฉ์์ง ํ๋ ๋ฌด์์ธ๊ฐ
ํธ์์ ํ๋ฐฐ๋ฅผ ์๊ฐํด๋ณด์.
๋ณด๋ด๋ ์ฌ๋์ด ํ๋ฐฐ๋ฅผ ๋งก๊ธฐ๋ฉด,
๋ฐ๋ ์ฌ๋์ด ๋์ค์ ์ฐพ์๊ฐ๋ค.
๋ณด๋ด๋ ์ฌ๋์ ๋ฐ๋ ์ฌ๋์ ๊ธฐ๋ค๋ฆด ํ์ ์๋ค.
๋ฐ๋ ์ฌ๋๋ ๋ณด๋ด๋ ์ฌ๋์ ๊ธฐ๋ค๋ฆด ํ์ ์๋ค.
๋ฉ์์ง ํ๋ ์ด๋ฐ ํ๋ฐฐ ๋ณด๊ดํจ ์ญํ ์ ํ๋ค.
์์คํ
๊ฐ ๋ฉ์์ง๋ฅผ ์์ ์ ์ฅํ๊ณ ,
๋ฐ๋ ์ชฝ์ด ์ค๋น๋๋ฉด ์ฒ๋ฆฌํ๋ค.
์ ๋ฉ์์ง ํ๋ฅผ ๋ฐฐ์์ผ ํ ๊น?
์ด์ 1: ๋น๋๊ธฐ ์ฒ๋ฆฌ
์ค๋ ๊ฑธ๋ฆฌ๋ ์์
์ ๋์ค์ ์ฒ๋ฆฌ
์ด์ 2: ์์คํ
๋ถ๋ฆฌ
์๋น์ค ๊ฐ ๋์จํ ๊ฒฐํฉ
์ด์ 3: ๋ถํ ๋ถ์ฐ
ํธ๋ํฝ ๊ธ์ฆ ์ ๋ฒํผ ์ญํ
์ด์ 4: ๋ฉด์ ํ์
MSA, ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ
์ฒ ๋จ๊ณจ ์ง๋ฌธ
๊ธฐ๋ณธ ๊ฐ๋ ์์ฝ
๐ท๏ธ ๋ฉ์์ง ํ ๊ตฌ์กฐ
[Producer] โโโ [Message Queue] โโโ [Consumer]
(๋ฐ์ ์) (๋ฉ์์ง ์ ์ฅ์) (์์ ์)
Producer (์์ฐ์)
๊ฐ๋
: ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ์ชฝ
์์:
- ์ฃผ๋ฌธ ์๋น์ค๊ฐ โ์ฃผ๋ฌธ ์์ฑ๋จโ ๋ฉ์์ง ๋ฐํ
- ๊ฒฐ์ ์๋น์ค๊ฐ โ๊ฒฐ์ ์๋ฃโ ๋ฉ์์ง ๋ฐํ
Queue (ํ)
๊ฐ๋
: ๋ฉ์์ง๋ฅผ ์์ ์ ์ฅํ๋ ๊ณต๊ฐ
ํน์ง:
- FIFO (First In First Out)
- ๋ฉ์์ง ์์์ฑ ๋ณด์ฅ
- ์ฌ๋ฌ Consumer๊ฐ ๊ตฌ๋ ๊ฐ๋ฅ
Consumer (์๋น์)
๊ฐ๋
: ๋ฉ์์ง๋ฅผ ๋ฐ์์ ์ฒ๋ฆฌํ๋ ์ชฝ
์์:
- ์๋ฆผ ์๋น์ค๊ฐ โ์ฃผ๋ฌธ ์์ฑ๋จโ ๋ฉ์์ง ์์
- ๋ฐฐ์ก ์๋น์ค๊ฐ โ๊ฒฐ์ ์๋ฃโ ๋ฉ์์ง ์์
๐ท๏ธ ๋ฉ์์ง ํจํด
1. Point-to-Point (P2P)
[Producer] โโโ [Queue] โโโ [Consumer]
โ
โโโโ ํ๋์ Consumer๋ง ์ฒ๋ฆฌ
ํน์ง: ๋ฉ์์ง๋น ํ๋์ Consumer๋ง ์ฒ๋ฆฌ
์ฌ์ฉ ์: ์์
๋ถ๋ฐฐ, ํ์คํฌ ํ
2. Publish/Subscribe (Pub/Sub)
โโโโ [Consumer 1]
[Producer] โโโ [Topic] โโโ [Consumer 2]
โโโโ [Consumer 3]
ํน์ง: ๋ชจ๋ ๊ตฌ๋
์๊ฐ ๋ฉ์์ง ์์
์ฌ์ฉ ์: ์ด๋ฒคํธ ๋ธ๋ก๋์บ์คํธ, ์๋ฆผ
๐ท๏ธ RabbitMQ vs Kafka
| ๊ตฌ๋ถ | RabbitMQ | Kafka |
|---|---|---|
| ๋ชจ๋ธ | ๋ฉ์์ง ๋ธ๋ก์ปค | ์ด๋ฒคํธ ์คํธ๋ฆฌ๋ฐ |
| ๋ฉ์์ง ๋ณด๊ด | ์๋น ํ ์ญ์ | ์๊ตฌ ๋ณด๊ด |
| ์ฒ๋ฆฌ๋ | ์ค๊ฐ | ๋งค์ฐ ๋์ |
| ์์ ๋ณด์ฅ | ํ ๋จ์ | ํํฐ์ ๋จ์ |
| ์ฌ์ฒ๋ฆฌ | ์ด๋ ค์ | ์ฌ์ |
| ์ ํฉํ ๊ฒฝ์ฐ | ์์ ํ, RPC | ๋ก๊ทธ, ์ด๋ฒคํธ ์์ฑ |
๐ท๏ธ ํต์ฌ ์ฉ์ด
Exchange (RabbitMQ): ๋ฉ์์ง ๋ผ์ฐํ
๊ท์น
Topic (Kafka): ๋ฉ์์ง ์นดํ
๊ณ ๋ฆฌ
Partition (Kafka): ํ ํฝ ๋ด ๋ณ๋ ฌ ์ฒ๋ฆฌ ๋จ์
Consumer Group: ๊ฐ์ ๋ฉ์์ง๋ฅผ ๋๋ ์ฒ๋ฆฌํ๋ ๊ทธ๋ฃน
Offset (Kafka): ๋ฉ์์ง ์์น (์ฌ์ฒ๋ฆฌ ๊ฐ๋ฅ)
ACK: ๋ฉ์์ง ์ฒ๋ฆฌ ์๋ฃ ํ์ธ
์ค์ ์์
๐ท๏ธ RabbitMQ + NestJS
1. ์ค์น ๋ฐ ์ค์
# RabbitMQ Docker ์คํ
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-management
# NestJS ํจํค์ง ์ค์น
npm install @nestjs/microservices amqplib amqp-connection-manager
2. Producer ๊ตฌํ
// src/orders/orders.module.ts
import { Module } from "@nestjs/common";
import { ClientsModule, Transport } from "@nestjs/microservices";
@Module({
imports: [
ClientsModule.register([
{
name: "NOTIFICATION_SERVICE",
transport: Transport.RMQ,
options: {
urls: ["amqp://localhost:5672"],
queue: "notifications_queue",
queueOptions: { durable: true },
},
},
]),
],
})
export class OrdersModule {}
// src/orders/orders.service.ts
import { Inject, Injectable } from "@nestjs/common";
import { ClientProxy } from "@nestjs/microservices";
@Injectable()
export class OrdersService {
constructor(
@Inject("NOTIFICATION_SERVICE")
private readonly notificationClient: ClientProxy
) {}
async createOrder(dto: CreateOrderDto): Promise<Order> {
// 1. ์ฃผ๋ฌธ ์์ฑ
const order = await this.orderRepository.save({
userId: dto.userId,
items: dto.items,
status: "PENDING",
});
// 2. ๋ฉ์์ง ๋ฐํ (๋น๋๊ธฐ)
this.notificationClient.emit("order_created", {
orderId: order.id,
userId: order.userId,
totalAmount: order.totalAmount,
});
// 3. ์ฆ์ ์๋ต (์๋ฆผ์ ๋์ค์ ์ฒ๋ฆฌ๋จ)
return order;
}
async cancelOrder(orderId: string): Promise<void> {
const order = await this.orderRepository.findById(orderId);
order.status = "CANCELLED";
await this.orderRepository.save(order);
// ์ทจ์ ์ด๋ฒคํธ ๋ฐํ
this.notificationClient.emit("order_cancelled", {
orderId: order.id,
userId: order.userId,
});
}
}
3. Consumer ๊ตฌํ
// src/notifications/notifications.controller.ts
import { Controller } from "@nestjs/common";
import { EventPattern, Payload } from "@nestjs/microservices";
@Controller()
export class NotificationsController {
constructor(
private readonly emailService: EmailService,
private readonly pushService: PushService
) {}
@EventPattern("order_created")
async handleOrderCreated(
@Payload() data: { orderId: string; userId: string; totalAmount: number }
): Promise<void> {
console.log("์ฃผ๋ฌธ ์์ฑ ์ด๋ฒคํธ ์์ :", data);
// ์ด๋ฉ์ผ ๋ฐ์ก
await this.emailService.sendOrderConfirmation(
data.userId,
data.orderId,
data.totalAmount
);
// ํธ์ ์๋ฆผ
await this.pushService.sendNotification(
data.userId,
`์ฃผ๋ฌธ์ด ์๋ฃ๋์์ต๋๋ค. ์ฃผ๋ฌธ๋ฒํธ: ${data.orderId}`
);
}
@EventPattern("order_cancelled")
async handleOrderCancelled(
@Payload() data: { orderId: string; userId: string }
): Promise<void> {
console.log("์ฃผ๋ฌธ ์ทจ์ ์ด๋ฒคํธ ์์ :", data);
await this.emailService.sendOrderCancellation(data.userId, data.orderId);
}
}
// main.ts (Consumer ์๋น์ค)
import { NestFactory } from "@nestjs/core";
import { Transport, MicroserviceOptions } from "@nestjs/microservices";
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
NotificationsModule,
{
transport: Transport.RMQ,
options: {
urls: ["amqp://localhost:5672"],
queue: "notifications_queue",
queueOptions: { durable: true },
noAck: false, // ์๋ ACK
},
}
);
await app.listen();
console.log("Notification ์๋น์ค ์์");
}
bootstrap();
๐ท๏ธ Kafka + NestJS
1. ์ค์น ๋ฐ ์ค์
# Kafka Docker Compose
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# NestJS Kafka ํจํค์ง
npm install @nestjs/microservices kafkajs
2. Producer ๊ตฌํ
// src/orders/orders.module.ts
import { Module } from "@nestjs/common";
import { ClientsModule, Transport } from "@nestjs/microservices";
@Module({
imports: [
ClientsModule.register([
{
name: "KAFKA_SERVICE",
transport: Transport.KAFKA,
options: {
client: {
clientId: "orders",
brokers: ["localhost:9092"],
},
producer: {
allowAutoTopicCreation: true,
},
},
},
]),
],
})
export class OrdersModule {}
// src/orders/orders.service.ts
@Injectable()
export class OrdersService {
constructor(
@Inject("KAFKA_SERVICE")
private readonly kafkaClient: ClientKafka
) {}
async onModuleInit() {
// ํ ํฝ ๊ตฌ๋
(์๋ต ๋ฐ์ ๊ฒฝ์ฐ)
this.kafkaClient.subscribeToResponseOf("order.created");
await this.kafkaClient.connect();
}
async createOrder(dto: CreateOrderDto): Promise<Order> {
const order = await this.orderRepository.save({
userId: dto.userId,
items: dto.items,
status: "PENDING",
});
// Kafka ๋ฉ์์ง ๋ฐํ
this.kafkaClient.emit("order.created", {
key: order.id, // ํํฐ์
ํค
value: {
orderId: order.id,
userId: order.userId,
items: order.items,
totalAmount: order.totalAmount,
createdAt: new Date().toISOString(),
},
});
return order;
}
}
3. Consumer ๊ตฌํ
// src/inventory/inventory.controller.ts
import { Controller } from "@nestjs/common";
import { MessagePattern, Payload } from "@nestjs/microservices";
@Controller()
export class InventoryController {
constructor(private readonly inventoryService: InventoryService) {}
@MessagePattern("order.created")
async handleOrderCreated(
@Payload()
message: {
orderId: string;
items: Array<{ productId: string; quantity: number }>;
}
): Promise<void> {
console.log("์ฌ๊ณ ์ฐจ๊ฐ ์ด๋ฒคํธ ์์ :", message.orderId);
// ์ฌ๊ณ ์ฐจ๊ฐ
for (const item of message.items) {
await this.inventoryService.decreaseStock(item.productId, item.quantity);
}
}
}
// main.ts (Consumer ์๋น์ค)
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
InventoryModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ["localhost:9092"],
clientId: "inventory-consumer",
},
consumer: {
groupId: "inventory-group",
},
},
}
);
await app.listen();
}
๐ท๏ธ ์๋ฌ ์ฒ๋ฆฌ์ ์ฌ์๋
// Dead Letter Queue (DLQ) ํจํด
@Injectable()
export class OrderEventHandler {
private readonly MAX_RETRIES = 3;
@EventPattern("order.created")
async handleOrderCreated(@Payload() data: OrderCreatedEvent): Promise<void> {
try {
await this.processOrder(data);
} catch (error) {
// ์ฌ์๋ ํ์ ํ์ธ
const retryCount = data.retryCount || 0;
if (retryCount < this.MAX_RETRIES) {
// ์ฌ์๋
this.kafkaClient.emit("order.created", {
...data,
retryCount: retryCount + 1,
});
} else {
// DLQ๋ก ์ด๋
this.kafkaClient.emit("order.created.dlq", {
...data,
error: error.message,
failedAt: new Date().toISOString(),
});
}
}
}
}
// ๋ฉฑ๋ฑ์ฑ ๋ณด์ฅ
@Injectable()
export class IdempotentHandler {
constructor(private readonly redis: Redis) {}
async handleMessage(messageId: string, handler: () => Promise<void>) {
// ์ด๋ฏธ ์ฒ๋ฆฌ๋ ๋ฉ์์ง์ธ์ง ํ์ธ
const processed = await this.redis.get(`processed:${messageId}`);
if (processed) {
console.log("์ด๋ฏธ ์ฒ๋ฆฌ๋ ๋ฉ์์ง:", messageId);
return;
}
// ์ฒ๋ฆฌ
await handler();
// ์ฒ๋ฆฌ ์๋ฃ ํ์ (24์๊ฐ ์ ์ง)
await this.redis.set(`processed:${messageId}`, "1", "EX", 86400);
}
}
๐ท๏ธ ์ค์ ์ํคํ ์ฒ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ ์ฃผ๋ฌธ API โโโโโโโ Kafka โโโโโโโ ์ฌ๊ณ ์๋น์ค โ
โโโโโโโโโโโโโโโ โ โ โโโโโโโโโโโโโโโ
โ order. โ
โ created โโโโโโโโโโโโโโโโโโโโโ
โ โ โ ์๋ฆผ ์๋น์ค โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ
โ
โโโโโโโโโโโโโโโ
โ ๋ถ์ ์๋น์ค โ
โโโโโโโโโโโโโโโ
// ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ฃผ๋ฌธ ์ฒ๋ฆฌ ํ๋ฆ
// 1. ์ฃผ๋ฌธ ์์ฑ โ order.created ๋ฐํ
// 2. ์ฌ๊ณ ์๋น์ค โ ์ฌ๊ณ ์ฐจ๊ฐ โ inventory.updated ๋ฐํ
// 3. ๊ฒฐ์ ์๋น์ค โ ๊ฒฐ์ ์ฒ๋ฆฌ โ payment.completed ๋ฐํ
// 4. ์๋ฆผ ์๋น์ค โ ์ด๋ฉ์ผ/ํธ์ ๋ฐ์ก
// 5. ๋ถ์ ์๋น์ค โ ๋ฐ์ดํฐ ์ ์ฅ
์ค์ ์ฒดํฌ๋ฆฌ์คํธ
โ ๋ฉ์์ง ์ค๊ณ
- ๋ฉ์์ง ์คํค๋ง ์ ์
- ๋ฒ์ ๊ด๋ฆฌ ๊ณ ๋ ค
- ํ์ํ ์ ๋ณด๋ง ํฌํจ
- ๋ฉฑ๋ฑ์ฑ ํค ํฌํจ
โ ์ ๋ขฐ์ฑ
- ๋ฉ์์ง ์์์ฑ ์ค์
- ACK ์ ๋ต ๊ฒฐ์
- ์ฌ์๋ ๋ก์ง ๊ตฌํ
- DLQ ์ค์
โ ์ฑ๋ฅ
- ํํฐ์ ์ ๊ฒฐ์ (Kafka)
- Consumer Group ์ค๊ณ
- ๋ฐฐ์น ์ฒ๋ฆฌ ๊ณ ๋ ค
- ๋ชจ๋ํฐ๋ง ์ค์
โ ์ด์
- ๋ฉ์์ง ์ถ์ ๊ฐ๋ฅ
- ์๋ฆผ ์ค์
- ๋ฐฑ์ ์ ๋ต
- ์ฅ์ ๋์ ๊ณํ
์์ฝ
๋ฉ์์ง ํ๋ ์์คํ
๊ฐ ๋น๋๊ธฐ ํต์ ์ ๊ฐ๋ฅํ๊ฒ ํ๋ ํต์ฌ ์ธํ๋ผ๋ค.
๐ ํต์ฌ ํฌ์ธํธ:
- Producer: ๋ฉ์์ง ๋ฐํ
- Queue/Topic: ๋ฉ์์ง ์ ์ฅ
- Consumer: ๋ฉ์์ง ์ฒ๋ฆฌ
- P2P: ํ๋์ Consumer๋ง ์ฒ๋ฆฌ
- Pub/Sub: ๋ชจ๋ ๊ตฌ๋ ์๊ฐ ์์
- ACK: ์ฒ๋ฆฌ ์๋ฃ ํ์ธ
๐ ์ ํ ๊ธฐ์ค:
| ์ํฉ | ๊ถ์ฅ |
|---|---|
| ์์ ํ | RabbitMQ |
| ์ด๋ฒคํธ ์คํธ๋ฆฌ๋ฐ | Kafka |
| ์ค์๊ฐ ์๋ฆผ | RabbitMQ |
| ๋ก๊ทธ ์์ง | Kafka |
| ๋ง์ดํฌ๋ก์๋น์ค | ๋ ๋ค ๊ฐ๋ฅ |
๐ Best Practices:
- ๋ฉฑ๋ฑ์ฑ ๋ณด์ฅ ํ์
- DLQ๋ก ์คํจ ๋ฉ์์ง ๊ด๋ฆฌ
- ๋ฉ์์ง ์คํค๋ง ๋ฒ์ ๊ด๋ฆฌ
- Consumer Group์ผ๋ก ํ์ฅ
- ๋ชจ๋ํฐ๋ง๊ณผ ์๋ฆผ ์ค์
- ์ฌ์๋ ์ ๋ต ์๋ฆฝ
๋ฉ์์ง ํ๋ฅผ ์ ํ์ฉํ๋ฉด,
์์คํ
๊ฐ ๊ฒฐํฉ๋๋ฅผ ๋ฎ์ถ๊ณ ,
ํ์ฅ์ฑ๊ณผ ์์ ์ฑ์ ๋์ผ ์ ์๋ค.