☀️
kafka를 설치 및 연동해보자 (with NestJs)
May 20, 2023
설치 방법
STEP 1. DOWNLOAD KAFKA
# 다운로드
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.13-2.8.0.tgz
# 압축 풀기
tar xvf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
STEP 2. zookeeper 구동
bin/zookeeper-server-start.sh config/zookeeper.properties
백그라운드로 구동시키고 싶을 경우 -daemon 명령어를 추가합니다.
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
zookeeper?
분산 애플리케이션을 구축하다보면, 분산 애플리케이션 관리를 위한 안정적인 코디네이션 애플리케이션이 추가로 필요하게 됩니다.
따라서 안정적인 코디네이션 서비스로 검증된 주키퍼를 많이 사용하게 됩니다.
주키퍼는 분산 애플리케이션이 안정적인 서비스를 할 수 있도록 분산되어 있는 각 애플리케이션의 정보를 중앙에 집중하고 구성 관리, 그룹 관리 네이밍, 동기화 등의 서비스를 제공합니다.
서버 여러 대를 클러스터로 구성하고, 분산 애플리케이션들이 각각 클라이언트가 되어 주키퍼 서버들과 커넥션을 맺은 후 상태 정보 등을 주고 받습니다.
위의 그림에서 Server는 주키퍼, Client는 카프카가 됩니다.
STEP 3. Kafka 구동
bin/kafka-server-start.sh -daemon config/server.properties
서비스가 제대로 띄워졌는지 확인해보려면 포트가 정상적으로 리스닝 중인 것을 확인해보면 됩니다.
netstat -an | grep 2181 # 2181포트는 zookeeper의 client Port
STEP 4. TEST
토픽 생성
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
# 결과 로그
Created topic test-topic
예시
TIP
- —create: 토픽 생성
- —topic [topic-name]: 생성할 토픽 이름
- —partitions [number]: 생성할 토픽의 파티션 개수
- —replication-factor [number]: 각 파티션 내 메시지를 복제할 Replica의 개수
- —bootstrap-server: 연결될 카프카 서버
토픽 리스트 확인
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 결과 로그
test-topic
예시
메시지 발행 (Producer)
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
명령어 실행 후 > 라인이 생기면 메시지 입력 가능합니다. enter 키 입력 시 메시지 발행
메시지 구독 (Consumer)
bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
Producer로 발행한 메시지 내용을 확인해 볼 수 있습니다.
NestJs와 연동하기
NestJs가 기본적으로 설치되어 있다고 가정합니다.
1. kafkajs 설치
yarn add kafkajs
2. kafka Service 로직 생성
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { Kafka, Producer, Partitioners, Consumer } from 'kafkajs';
import { SUBSCRIBER_FN_REF_MAP, SUBSCRIBER_OBJ_REF_MAP } from './kafka.decorator';
import { KafkaConfig } from './kafka.message';
@Injectable()
export class KafkaService implements OnModuleInit, OnModuleDestroy {
private kafka: Kafka;
private producer: Producer;
private consumer: Consumer;
constructor(private kafkaConfig: KafkaConfig) {
this.kafka = new Kafka({
clientId: this.kafkaConfig.clientId,
brokers: this.kafkaConfig.brokers,
});
this.producer = this.kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
allowAutoTopicCreation: true,
});
this.consumer = this.kafka.consumer({
allowAutoTopicCreation: true,
groupId: this.kafkaConfig.groupId,
});
}
async onModuleInit(): Promise<void> {
await this.connect();
SUBSCRIBER_FN_REF_MAP.forEach((functionRef, topic) => {
console.log('subscribe', topic);
this.bindAllTopicToConsumer(functionRef, topic);
});
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const functionRef = SUBSCRIBER_FN_REF_MAP.get(topic);
const object = SUBSCRIBER_OBJ_REF_MAP.get(topic);
await functionRef.apply(object, [message.value.toString()]);
},
});
}
async onModuleDestroy(): Promise<void> {
await this.disconnect();
}
async connect() {
await this.producer.connect();
await this.consumer.connect();
}
async disconnect() {
await this.producer.disconnect();
await this.consumer.disconnect();
}
async sendMessage(kafkaTopic: string, kafkaMessage) {
await this.producer.connect();
const metadata = await this.producer
.send({
topic: kafkaTopic,
messages: [{ value: JSON.stringify(kafkaMessage) }],
})
.catch((e) => console.error(e.message, e));
await this.producer.disconnect();
return metadata;
}
async bindAllTopicToConsumer(callback, _topic) {
await this.consumer.subscribe({ topic: _topic, fromBeginning: false });
}
}
3. Kafka Module 생성
import { DynamicModule, Global, Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { KafkaConfig } from './kafka.message';
@Global()
@Module({})
export class KafkaModule {
static register(kafkaConfig: KafkaConfig): DynamicModule {
console.log('KafkaModule.register', kafkaConfig);
return {
global: true,
module: KafkaModule,
providers: [
{
provide: KafkaService,
useValue: new KafkaService(kafkaConfig),
},
],
exports: [KafkaService],
};
}
}
4. Kafka Type 및 Decorator 생성
// type
export class KafkaPayload {
public body: any;
public messageId: string;
public messageType: string;
public topicName: string;
public createdTime?: string;
create?(messageId, body, messageType, topicName): KafkaPayload {
return {
messageId,
body,
messageType,
topicName,
createdTime: new Date().toISOString(),
};
}
}
export declare class KafkaConfig {
clientId: string;
brokers: string[];
groupId: string;
}
// ---
// decorator
export const SUBSCRIBER_FN_REF_MAP = new Map();
export const SUBSCRIBER_OBJ_REF_MAP = new Map();
export function SubscribeTo(topic) {
return (target, propertyKey, descriptor) => {
const originalMethod = target[propertyKey];
SUBSCRIBER_FN_REF_MAP.set(topic, originalMethod);
SUBSCRIBER_OBJ_REF_MAP.set(topic, target);
return descriptor;
};
}
5. App Module에 Kafka Module Import
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { KafkaModule } from './kafka/kafka.module';
@Module({
imports: [
KafkaModule.register({
clientId: `test-id`,
brokers: ['localhost:9092'],
groupId: 'consumer-group-id',
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
6. App Service Producer 함수 생성
import { Injectable } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { KafkaService } from './kafka/kafka.service';
@Injectable()
export class AppService {
constructor(private readonly kafkaService: KafkaService) {}
async sendMessage() {
return this.kafkaService.sendMessage('test-topic', 'Hello World!');
}
}
7. Producer Test
# 메시지 발행
curl -X POST http://localhost:3000/send-message
# ----
# 메시지 구독
bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
8. App Service Consumer 함수 생성
import { Injectable } from '@nestjs/common';
import { SubscribeTo } from './kafka/kafka.decorator';
import { KafkaService } from './kafka/kafka.service';
@Injectable()
export class AppService {
...
@SubscribeTo('test-topic')
async subscribeToTestTopic(message: string) {
console.log('message', message);
}
}
지금까지 카프카 설치부터 NestJs에 연동하는 방법을 알아보았습니다.