Print
카테고리: [ Miscellaneous ]
조회수: 3723

1. 소개

카프카를 설치, 구성하는 문서이다.

기타 카프카 관련 Tech Note는 다음과 같다.


2. 설치

카프카는 스칼라로 작성되어 있기 때문에 실행하려면 JVM 필요하다.


3. 실행 

3.1. 설정

[ config/server.properties 파일 ]

log.dirs=/sw/kafka-data/data1,/sw/kafka-data/data2
zookeeper.connect=zookeeper-node-01:2181

외부 네트워크를 통해 접속할 일이 있다면 advertised listener 설정이 필요하다. advertised listener는 반드시 호스트 이름 혹은 도메인 주소여야 하며, IP는 안된다. advertised listener 값은 Zookeeper에 등록되고 Kafka Producer API는 이 정보를 사용한다.

3.2. 실행

카프카 실행 방법은 다음과 같다.

bin/kafka-server-start.sh config/server.properties

데몬으로 기동하려면 다음과 같이 실행한다.

bin/kafka-server-start.sh -daemon config/server.properties

만약 JVM 버전이 맞지 않으면 이런 에러가 날 수 있다. Java 8 버전 이상이 필요하다.

# bin/kafka-server-start.sh config/server.properties
Exception in thread "main" java.lang.UnsupportedClassVersionError: kafka/Kafka : Unsupported major.minor version 52.0
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:808)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:443)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:65)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:349)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:348)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:430)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:323)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:363)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)

4. 토픽

4.1. 토픽 생성

# bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest
Created topic "mytest".

4.2. 토픽 확인

# bin/kafka-topics.sh --list --zookeeper localhost:2181
mytest

5. 메시지 교환

5.1. Producer

# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest

5.2. Consumer

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest

6. MongoDB를 Kafka Consumer로 사용하기

pom.xml 파일에 추가

<!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver -->
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongo-java-driver</artifactId>
    <version>3.9.1</version>
</dependency>

7 AWS에서 Kafka 사용

Amazon EC2에서 Kafka 배포를 실행하면 스트리밍 데이터를 위해 성능이 뛰어나고 확장 가능한 솔루션을 제공할 수 있다. Amazon EC2에서 Kafka를 배포하려면 EC2 인스턴스 유형을 선택 및 프로비저닝하고, Kafka 및 Apache Zookeeper를 비롯한 소프트웨어 구성 요소를 설치 및 구성한 다음, Amazon Elastic Block Store(EBS)를 사용하여 스트리밍 데이터 처리량을 수용하는 데 필요한 블록 스토리지를 프로비저닝해야 한다. Kafka 클러스터가 예상치 못한 이벤트(스트림의 용량 한도를 넘는 데이터 볼륨의 스파이크 등)를 관리할 수 있도록, Apache Zookeeper를 사용해 복제를 구축할 수 있다. 이를 통해 Kafka 클러스터에 있는 노드를 계속 추적하고 노드 전체에서 프로세스 분산을 조정할 수 있다. Kafka가 설치되면, Kafka 클러스터의 보안을 위해 HTTPS를 배포하고, 인증 기관을 유지 관리하며, SSL용 Kafka 인스턴스를 구성해야 한다.