Print
카테고리: [ Miscellaneous ]
조회수: 16422

1. 개요

카프카의 핵심 구성요소를 소개한다.

기타 카프카 관련 Tech Note는 다음과 같다.


2. 물리적 구성요소

2.1. 브로커

브로커는 하나의 서버당 하나의 프로세스로 동작한다. 메시지 수신 및 전달을 처리한다. 

클러스터로 구성할 수도 있는데 브로커를 추가함으로써 스케일 아웃 방식의 처리량 향상이 가능하다.

브로커가 받은 데이터는 모두 디스크에 저장되며 디스크 용량에 따라 일정 기간 데이터 보존이 가능하다.

카프카 브로커의 기본 replication은 3개이다. 가용성 측면에서.. 브로커 전체 장애나 replication 할 수 없는 환경에서는 가용성이 보장되지 않는다. 브로커 1개 장애 시에는 replication이 3대이므로 서비스가 지속되나, 브로커 2개 장애 시에는 replication이 안된다. 

replication 수는 3으로 유지하면서 브로커를 5대로 유지하거나, 1대 장애도 즉시 복구하는 전략을 택한다.

2.2. 프로듀서

프로듀서는 프로듀서 API를 통해 브로커로 데이터를 보내기 위해 구현된 것이다.

2.3. 컨슈머

컨슈머 API를 이용하여 브로커로부터 메시지를 획득한다. 메시지는 디스크에 저장되어 있기 때문에 디스크에 보관되어 있는 중에는 메시지 획득이 가능하다.

2.4. 주키퍼

2.4.1. 소개

주키퍼는 원래 하둡의 서브 프로젝트 중 하나였다. 하둡은 대용량 분산 처리 애플리케이션으로, 중앙에서 분산 애플리케이션을 관리하는 코디네이션 애플리케이션이 필요했다. 이에 주키퍼가 탄생하게 된 것이다. 2011년 1월부터 주키퍼는 하둡의 서브 프로젝트에서 아파치 탑 레벨 프로젝트로 승격됐다. 현재는 Storm, HBase, NiFi등 다양한 솔루션에서 사용된다.

카프카에서도 브로커의 분산 처리를 위한 관리도구로 쓰이는데 분산 메시징의 토픽, 파티션 등의 메타 데이터를 관리하기 위한 목적이다.

주키퍼에 저장되는 데이터는 모두 메모리에 저장되어 처리량이 크고 속도가 빠르다. 또 주키퍼는 신뢰도를 높이기 위해 앙상블이라는 호스트 세트를 구성할 수 있다. 3,5 등 홀수개로 구성하는 것이 일반적인데 과반수 방식에 의해 살아 있는 노드 수가 과반 수 이상 유지된다면 지속적인 서비스가 가능해진다.

2.4.2. znode

여러대의 서버를 클러스터인 앙상블로 묶고, 분산 애플리케이션들이 각각 클라이언트가 되어 주키퍼 서버들과 연결을 맺은 후 상태 정보를 주고 받는다. 상태 정보는 주키퍼의 znode라는 곳에 Key-Value 형태로 저장되고, znode의 데이터를 이용하여 서로 데이터를 주고 받는다. znode는 데이터를 저장하기 위한 공간 이름으로 폴더 정도로 이해하면 된다.

znode에 저장하는 데이터는 byte ~ kbyte 수준으로 매우 적다. 그리고 앞서 설명한대로 폴더(디렉토리) 형식의 계층형 구조다. 예를 들면 다음과 같다.

znode는 데이터 변경 등에 따른 유효성 검사를 위해 버전 번호를 관리하며 znode의 데이터가 변경될 때마다 버전번호가 증가한다. 


3. 논리적 구성요소

3.1. 파티션

파티션은 토픽에 대한 대량 메시지 입출력을 지원하기 위해 브로커의 데이터를 읽고 쓰는 단위이다.

토픽을 구성하는 파티션은 브로커 클러스터 내에 분산되어 있어 프로듀서로부터의 데이터를 수신하고, 컨슈머로의 배달을 담당한다. 

파티션을 브로커에 어떻게 배치하는지에 대한 정보는 브로커측에 있다. 프로듀서 및 컨슈머 API는 파티션 정보는 은폐하여 통신하기 떄문에 프로듀서/컨슈머는 토픽만 지정하면 된다. 물론 특정 파티션을 지정한 송수신도 가능은 하다.

카프카 브로커는 파티션별 leader가 다르고 이 leader를 통해서만 데이터를 처리할 수 있다. leader는 데이터를 받고 나서 나머지 2대의 노드에 복제한다. 복제 시 최초 데이터는 leader가 있는 동일 서버에 저장하고 두번째부터는 나머지 서버에 랜덤하게 저장된다.

3.2. 컨슈머 그룹

Producer는 여러 파티션을 통해 메시지를 생상하고 있는데, 이에 대응하는 Consumer 입장이다. 아무래도 여러 소비자가 읽어가면 효율적이기 때문이다. 즉 동일 기능을 수행하는 애플리케이션을 여러개 실행하여 메시지 처리를 할 수 있다. 즉 인스턴스간 메시지 부하 분산이 된다.

오프셋 관리가 되기 때문에 컨슈머 그룹 스케일인/스케일아웃에 따른 파티션 리밸런싱 시에도 메시지가 중복 처리되지 않는다. 컨슈머에서 분산 스트림 처리도 고려되어 있다. 단일 어플리케이션 내에서 여러 컨슈머가 단일 토픽 혹은 여러 파티션으로부터 메시지를 취득하는 방식으로 컨슈머 그룹이라는 개념이 존재한다. 

다만 자신이 읽고 있는 파티션은 그룹 내 다른 컨슈머가 읽을 수 없다는 규칙이 있다. 즉 하나의 파티션에 대해 컨슈머 그룹 내 하나의 컨슈머 인스턴스만 접근 가능하다. (ordering 보장)

만약 토픽 내 파티션이 4개이고, 컨슈머 그룹 내 컨슈머가 3개라면 

그래서, 따라서, 일반적으로 파티션 수와 컨슈머 수는 동일하게 하는 것을 추천한다.

카프카 클러스터 전체에서 글로벌 ID를 컨슈머 그룹 전체에서 공유하고, 여러 컨슈머는 자신이 소속한 컨슈머 그룹을 식별하여 읽어들일 파티션을 분류하고 재시도를 제어한다.

3.3. 오프셋

3.3.1. 오프셋이란?

"카프카는 오프셋 관리를 통한 메시지 재처리, 중복처리 방지 기능 지원"

각 파티션이 수신한 메시지에는 각각의 일련번호가 있다. 그래서 파티션 단위로 메시지 위치를 나타내는 오프셋이라는 정보를 통해 컨슈머가 취득하는 메시지의 범위 및 재시도를 제어한다. 제어를 위한 오프셋 종류는 다음과 같다.

3.3.2. 컨슈머 입장의 오프셋

이제 컨슈머의 Position에 대한 이야기를 하려고 한다.

물론 컨슈머는 오프셋을 Offset Topic에 저장해야만 하는 것은 아니다. 다른 저장소도 가능하다는 말이다. 따라서 다른 저장소에 레코드 처리 결과를 저장하는 것과 오프셋 저장 행위를 원자적으로(atomic) 처리할 수 있다. RDBMS를 써서.. 말이다. 만약 이렇게 오프셋을 직접 관리하면 다음과 같은 설정이 필요하다.

파티션 할당이 자동으로 이루어지는 경우를 고려해야 한다. 이때는 ConsumerRebalaceListener를 사용한다.

3.3.3. ConsumerRebalanceListener

파티션이 재조정될 때 실행되는 콜백 메소드를 가진 인터페이스다.

컨슈머가 직접 파티션을 할당하지 않고 컨슈머 그룹 개념을 통해 파티션이 자동으로 할당되도록 했을 때만 사용할 수 있다. 만약 직접 파티션을 할당했다면.. 재조정 발생하지 않고 콜백도 호출되지 않는다.

앞서 말했지만 ConsumerRebalaceListener 사용은 오프셋을 DB등 다른 저장소에 사용할 때이다.

콜백 메소드는 파티션이 재조정될 때 poll() 내부에서 호출된다. 

3.3.4. High Watermark

복제 사용 시 오프셋 관리를 위해 LEO 외에 High Watermark라는 개념이 있다. High Watermark는 복제가 완료된 오프셋이다. 그래서 LEO와 동일하거나 혹은 오래된 오프셋이 된다.

컨슈머 입장에서는 High Watermark까지 기록된 메시지를 취득할 수 있다. 만약 LEO에 기록되어 있지만 복제가 완료되지 않은 메시지(즉 High Watermark에 기록되지 않은 메시지)를 취득했을 때, 애매한 타이밍에 문제가 생기면(예: leader가 복제 내리기 전에 죽음) 그 메시지는 재취득할 수 없는 상태가 된다.

3.4. commit 관리

데이터를 가져오는 컨슈머 입장이다.

3.4.1. 자동 커밋 

auto commit이 true이면 poll() 호출 시마다 commit할 타이밍인지 확인한다. (timer expire 기반)

그런데 문제는 중복이 발생할 수 있다는 것이다. 인터벌이 5초라고 보자.

  1. poll()을 통해 데이터 200개 가져옴 (그리고 오프셋 commit됨)
  2. 레코드 200개 중 100개 처리 완료
  3. 이 때 재조정 발생 (이유는 파티션 수 증가, 혹은 컨슈머 증감)
  4. 컨슈머들이 리어사인
  5. 컨슈머는 1. 에서 commit된 오프셋부터 polling
  6. 중복처리 (...)

대응방안으로는 poll() 다음에 close()를 호출한다. close()를 호출하면 ConsumerCoorinator.maybeAutoCommitOffsetsSync()를 호출하기 때문이다.

3.4.2. 수동 커밋

3.4.3. commitSync

void   commitSync​()    
void    commitSync​(java.time.Duration timeout)  
void    commitSync​(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)   
void    commitSync​(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.time.Duration timeout)

commitSync를 통해 메시지 처리가 완료되었고 메시지를 가져온 것으로 처리하기 때문에 commitSync를 수행하지 않으면 실패한 현재 오프셋부터 다시 가져올 것이라고 생각할 수 있다. 하지만 그냥 이후 오프셋부터 가져온다.

실패했던 레코드의 오프셋을 다시 poll 하기 위해 seek() 메소드 사용한다. (해당 레코드로 되돌아감)


4. 복제본 관리

하나의 토픽은 기본적으로 3개의 replica 구성되어 있다. ISR(In-Sync Replica)는 leader와 동기 상태를 유지하고 있는 복제 서버를 뜻한다. 이 상태의 유지는 replica.lag.time.max.ms 설정과 관련이 있는데 replica.lag.time.max.ms보다 오랜 시간 복제의 요청이나 복제가 수행되지 않은 경우 복제 상태를 유지하고 있지 않은 것으로 간주한다.

replica와 ISR의 수는 설정을 통해 변경 가능하다. 최소 ISR 수는 min.insync.replica를 통해 설정한다. 

물론 ISR이 아닌 replica도 있다. 모든 replica가 ISR로 되어 있지 않은 파티션은 Under Replicated Partition이라고 한다.  


5. 가용성 & 확장성

5.1. Failover

메시지 처리가 지연되거나 하면 Failover된다. 그리고 오류난 서버가 정상화되면 Failback된다. 관련하여 주요 컨슈머 설정은 다음과 같다,

5.2. acks

acks 동작
0 Producer는 ack 안받고 계속 전송한다
장애 시 메시지 유실 큼
1

leader에만 전달되면 ack 반환
물론 acks=0 보다는 안정적임

※ 그런데 단순히 leader가 다운되는 것보다 주키퍼 및 타 노드와 단절, 고립이 더 큰 문제를 야기한다.
    계속 데이터는 받아놓고 버려지기 때문이다.

all 모든 ISR에 복제되면 ack 반환
데이터 손실 없음

 

다음은 브로커 4, replica 3이고 브로커 장애인 경우의 시나리오이다.

브로거 장애 시 1) 데이터를 잃지 않을 것인지 2) 처리를 계속할 것인지의 문제다. 뭐가 낫다고 하긴 어렵다. 시스템 요구사항이나 제약조건에 의해 판단하라.

5.3. scale-out

메시지를 중계하는 역할을 브로커를 여러대 둘 수 있다. 처리량 증가.

5.4. 그 외

만약 Producer에서 카프카 연결이 안된다면,


6. 데이터 보관 기간

카프카는 수신 메시지를 디스크에 영속화하고, 컨슈머는 브로커에 보관된 데이터에 한해서는 과거 데이터를 마음껏 읽을 수 있다. 물론 스토리지 용량의 제한이 있기 때문에 무제한 보관은 어렵다. 다음은 데이터 삭제를 위한 정책이다.

5.1. 오래된 메시지 삭제

삭제의 트리거는 다음과 같다.

5.2. 압축

최신 키 데이터만 남겨두고 중복하는 키의 오래된 메시지를 삭제한다. 그래서 동일 키에 대해서는 최신 값만 얻으면 되는 업무에 적합하다.