1. 소개
카프카를 설치, 구성하는 문서이다.
기타 카프카 관련 Tech Note는 다음과 같다.
2. 설치
- http://apache.tt.co.kr/kafka/2.1.0/kafka_2.12-2.1.0.tgz
- http://apache.tt.co.kr/kafka/2.3.1/kafka_2.12-2.3.1.tgz
카프카는 스칼라로 작성되어 있기 때문에 실행하려면 JVM 필요하다.
3. 실행
3.1. 설정
[ config/server.properties 파일 ]
log.dirs=/sw/kafka-data/data1,/sw/kafka-data/data2 zookeeper.connect=zookeeper-node-01:2181
외부 네트워크를 통해 접속할 일이 있다면 advertised listener 설정이 필요하다. advertised listener는 반드시 호스트 이름 혹은 도메인 주소여야 하며, IP는 안된다. advertised listener 값은 Zookeeper에 등록되고 Kafka Producer API는 이 정보를 사용한다.
3.2. 실행
카프카 실행 방법은 다음과 같다.
bin/kafka-server-start.sh config/server.properties
데몬으로 기동하려면 다음과 같이 실행한다.
bin/kafka-server-start.sh -daemon config/server.properties
만약 JVM 버전이 맞지 않으면 이런 에러가 날 수 있다. Java 8 버전 이상이 필요하다.
# bin/kafka-server-start.sh config/server.properties Exception in thread "main" java.lang.UnsupportedClassVersionError: kafka/Kafka : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:808) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:443) at java.net.URLClassLoader.access$100(URLClassLoader.java:65) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.net.URLClassLoader$1.run(URLClassLoader.java:349) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:348) at java.lang.ClassLoader.loadClass(ClassLoader.java:430) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:323) at java.lang.ClassLoader.loadClass(ClassLoader.java:363) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:482)
4. 토픽
4.1. 토픽 생성
# bin/kafka-topics.sh -create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest Created topic "mytest".
4.2. 토픽 확인
# bin/kafka-topics.sh --list --zookeeper localhost:2181 mytest
5. 메시지 교환
5.1. Producer
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest
5.2. Consumer
# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest
6. MongoDB를 Kafka Consumer로 사용하기
pom.xml 파일에 추가
<!-- https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver --> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.9.1</version> </dependency>
7 AWS에서 Kafka 사용
Amazon EC2에서 Kafka 배포를 실행하면 스트리밍 데이터를 위해 성능이 뛰어나고 확장 가능한 솔루션을 제공할 수 있다. Amazon EC2에서 Kafka를 배포하려면 EC2 인스턴스 유형을 선택 및 프로비저닝하고, Kafka 및 Apache Zookeeper를 비롯한 소프트웨어 구성 요소를 설치 및 구성한 다음, Amazon Elastic Block Store(EBS)를 사용하여 스트리밍 데이터 처리량을 수용하는 데 필요한 블록 스토리지를 프로비저닝해야 한다. Kafka 클러스터가 예상치 못한 이벤트(스트림의 용량 한도를 넘는 데이터 볼륨의 스파이크 등)를 관리할 수 있도록, Apache Zookeeper를 사용해 복제를 구축할 수 있다. 이를 통해 Kafka 클러스터에 있는 노드를 계속 추적하고 노드 전체에서 프로세스 분산을 조정할 수 있다. Kafka가 설치되면, Kafka 클러스터의 보안을 위해 HTTPS를 배포하고, 인증 기관을 유지 관리하며, SSL용 Kafka 인스턴스를 구성해야 한다.