MirrorMaker 2.0(MM2)은 한 Kafka 클러스터에서 다른 Kafka 클러스터로 토픽을 보다 쉽게 미러링하거나 복제할 수 있도록 설계되었습니다. Kafka Connect 프레임워크를 사용하여 구성 및 크기 조정을 간소화합니다. 토픽의 변경 내용을 동적으로 탐지하고 오프셋 및 파티션을 포함하여 원본 및 대상 토픽 속성이 동기화되도록 합니다.
이 문서에서는 데이터 마이그레이션/복제 및 사용 사례에서 Kafka MirrorMaker 2.0을 사용하는 방법을 알아봅니다.
필수 구성 요소
- HDI Kafka 클러스터가 두 개 이상 있는 환경.
- 2.4보다 높은 Kafka 버전(HDI 4.0)
- 원본 클러스터에는 MirrorMaker 2.0 복제 프로세스의 다양한 기능을 테스트하기 위한 데이터 요소 및 토픽이 있어야 합니다.
사용 사례
HDInsight의 두 Kafka 클러스터 간에 데이터 요소/오프셋을 복제하는 MirrorMaker 2.0 시뮬레이션. 재해 복구, 클라우드 적응, 지역 복제, 데이터 격리 및 데이터 집계와 같은 두 개 이상의 Kafka 클러스터 간의 필수 데이터 복제와 같은 시나리오에도 동일한 기능을 사용할 수 있습니다.
MirrorMaker 2.0을 사용한 오프셋 복제
MM2 내부
MirrorMaker 2.0 도구는 서로 다른 커넥터로 구성됩니다. 이러한 커넥터는 독립 실행형 또는 분산 모드에서 Kafka Connect와 함께 직접 사용할 수 있는 표준 Kafka Connect 커넥터입니다.
Broker 설치 프로세스의 요약은 다음과 같습니다.
MirrorSourceConnector :
- 단일 원본 클러스터의 원격 토픽, 토픽 ACL 및 구성을 복제합니다.
- 오프셋 동기화를 내부 토픽으로 내보냅니다.
MirrorSinkConnector:
- 기본 클러스터에서 사용하고 토픽을 단일 대상 클러스터로 복제합니다.
MirrorCheckpointConnector:
- 오프셋 동기화를 사용합니다.
- 장애 조치(failover) 지점을 사용하도록 검사점을 내보냅니다.
MirrorHeartBeatConnector:
- 원격 클러스터에 하트비트를 내보내 복제 프로세스를 모니터링할 수 있습니다.
배포
Kafka 라이브러리와 함께 번들로 제공되는 Connect-mirror-maker.sh 스크립트는 구성 파일을 기반으로 내부적으로 Connect 작업자를 관리하는 분산 MM2 클러스터를 구현합니다. 내부적으로 MirrorMaker 드라이버는 MirrorSourceConnector, MirrorSinkConnector, MirrorCheckpoint 커넥터 및 MirrorHeartbeatConnector와 같은 각 커넥터의 쌍을 만들고 처리합니다.
MirrorMaker 2.0을 시작합니다.
./bin/connect-mirror-maker.sh ./config/mirror-maker.properties
참고
Kerberos 사용 클러스터의 경우 JAAS 구성을 KAFKA_OPTS로 내보내거나 MM2 구성 파일에 지정해야 합니다.
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas.conf>"
샘플 MirrorMaker 2.0 구성 파일
# specify any number of cluster aliases
clusters = source, destination
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for example. "A_host1:9092, A_host2:9092, A_host3:9092" and you can see the exact host name on Ambari > Hosts
source.bootstrap.servers = wn0-src-kafka.bx.internal.cloudapp.net:9092,wn1-src-kafka.bx.internal.cloudapp.net:9092,wn2-src-kafka.bx.internal.cloudapp.net:9092
destination.bootstrap.servers = wn0-dest-kafka.bx.internal.cloudapp.net:9092,wn1-dest-kafka.bx.internal.cloudapp.net:9092,wn2-dest-kafka.bx.internal.cloudapp.net:9092
# enable and configure individual replication flows
source->destination.enabled = true
# regex which defines which topics gets replicated. For eg "foo-.*"
source->destination.topics = toa.evehicles-latest-dev
groups=.*
topics.blacklist="*.internal,__.*"
# Setting replication factor of newly created remote topics
replication.factor=3
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
SSL 구성
설치에 SSL 구성이 필요한 경우
destination.security.protocol=SASL_SSL
destination.ssl.truststore.password=<password>
destination.ssl.truststore.location=/path/to/kafka.server.truststore.jks
#keystore location in case client.auth is set to required
destination.ssl.keystore.password=<password>
destination.ssl.keystore.location=/path/to/kafka.server.keystore.jks
destination.sasl.mechanism=GSSAPI
전역 구성
속성 | 기본값 | 설명 |
---|---|---|
name | required | 커넥터의 이름(예: "us-west->us-east") |
topics | 빈 문자열 | 복제할 토픽의 regex(예: "topic1, topic2, topic3"). 쉼표로 구분된 목록도 지원됩니다. |
topics.blacklist | ". .internal, .. 복제본, __consumer_offsets" 또는 이와 유사 | 복제에서 제외할 토픽 |
groups | 빈 문자열 | 복제할 그룹의 regex(예: ".*") |
groups.blacklist | 빈 문자열 | 복제에서 제외할 그룹 |
source.cluster.alias | required | 복제되는 클러스터의 이름 |
target.cluster.alias | required | 다운스트림 Kafka 클러스터의 이름 |
source.cluster.bootstrap.servers | required | 복제할 업스트림 클러스터 |
target.cluster.bootstrap.servers | required | 다운스트림 클러스터 |
sync.topic.configs.enabled | true | 구성 변경에 대한 원본 클러스터 모니터링 여부 |
sync.topic.acls.enabled | true | 원본 클러스터 ACL에서 변경 내용을 모니터링할지 여부 |
emit.heartbeats.enabled | true | 커넥터는 주기적으로 하트비트를 내보내야 함 |
emit.heartbeats.interval.seconds | true | 하트비트 빈도 |
emit.checkpoints.enabled | true | 커넥터는 주기적으로 소비자 오프셋 정보를 내보내야 함 |
emit.checkpoints.interval.seconds | 5(초) | 검사점 빈도 |
refresh.topics.enabled | true | 커넥터는 새 소비자 그룹을 주기적으로 확인해야 함 |
refresh.topics.interval.seconds | 5(초) | 새 소비자 그룹에 대한 원본 클러스터를 확인하는 빈도 |
refresh.groups.enabled | true | 커넥터는 새 소비자 그룹을 주기적으로 확인해야 함 |
refresh.groups.interval.seconds | 5(초) | 새 소비자 그룹에 대한 원본 클러스터를 확인하는 빈도 |
readahead.queue.capacity | 500(레코드) | 소비자가 생산자보다 앞서나갈 수 있도록 하는 레코드 수 |
replication.policy.class | org.apache.kafka.connect.mirror.DefaultReplicationPolicy | LegacyReplicationPolicy를 사용하여 레거시 MirrorMaker 모방 |
heartbeats.topic.retention.ms | 1일 | 하트비트 토픽을 처음 만들 때 사용됨 |
checkpoints.topic.retention.ms | 1일 | 검사점 토픽을 처음 만들 때 사용됨 |
offset.syncs.topic.retention.ms | 최대 길이 | 오프셋 동기화 토픽을 처음 만들 때 사용됨 |
replication.factor | 2 | 원격 토픽을 만들 때 사용됨 |
질문과 대답
토픽 복제 후 원본 및 대상 클러스터의 마지막 오프셋에 차이가 표시되는 이유는 무엇인가요?
실제 레코드 수가 마지막 오프셋 값보다 작기 때문에 원본 토픽의 데이터 요소가 제거되었을 수 있습니다. 이렇게 하면 복제가 항상 대상 클러스터의 offset-0에서 시작되므로 원본 및 대상 클러스터 사후 복제의 마지막 오프셋 간에 차이가 발생합니다.
대상 클러스터가 데이터 요소에 대한 다른 오프셋 매핑을 가질 수 있는 경우 마이그레이션에서 소비자는 어떻게 동작하나요?
MirrorMaker 2.0 MirrorCheckpointConnector 기능은 소비자 그룹의 소비자 그룹 오프셋 검사점을 원본 클러스터에 자동으로 저장합니다. 각 검사점에는 원본 클러스터의 각 그룹에 대해 마지막으로 커밋된 오프셋을 대상 클러스터의 해당 오프셋 매핑이 포함됩니다. 따라서 마이그레이션 시 대상 클러스터의 동일한 토픽에서 사용을 시작하는 소비자는 원본 클러스터에서 커밋된 마지막 오프셋에서 메시지 수신을 다시 시작할 수 있습니다.
원본 별칭 앞에 복제된 모든 토픽이 접두사로 추가되므로 대상 클러스터에서 정확한 토픽 이름을 유지하려면 어떻게 해야 하나요?
이는 복잡한 미러링 토폴로지에서 데이터가 재정의되지 않도록 하는 MirrorMaker 2.0의 기본 동작입니다. 데이터 손실을 방지하기 위해 복제 흐름 디자인 및 토픽 관리 측면에서 이 작업을 신중하게 사용자 지정해야 합니다. 이 작업은 "replication.policy.class"에 대해 사용자 지정 복제 정책 클래스를 사용하여 수행할 수 있습니다.
원본 및 대상 Kafka에서 만든 새 내부 토픽이 표시되는 이유는 무엇인가요?
MirrorMaker 2.0 내부 토픽은 복제 프로세스, 모니터링, 오프셋 매핑 및 검사점 지정을 추적하기 위해 커넥터에 의해 만들어집니다.
원본에 더 많은 항목이 있는 반면 MirrorMaker가 대상 클러스터에 토픽의 복제본을 두 개만 만드는 이유는 무엇인가요?
MirrorMaker 2는 토픽의 복제 요소를 대상 클러스터에 복제하지 않습니다. 이는 필요한 수의 "replication.factor"를 지정하여 MM2 구성에서 제어할 수 있습니다. 동일한 기본값은 2입니다.
MirrorMaker 2.0에서 사용자 지정 복제 정책을 사용하는 방법은 무엇인가요?
사용자 지정 복제 정책은 아래 인터페이스를 구현하여 만들 수 있습니다.
/** Defines which topics are "remote topics", e.g. "us-west.topic1". */
public interface ReplicationPolicy {
/** How to rename remote topics; generally should be like us-west.topic1. */
String formatRemoteTopic(String sourceClusterAlias, String topic);
/** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
Returns null if not a remote topic.
*/
String topicSource(String topic);
/** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
Topics may be replicated multiple hops, so the immediately upstream topic
may itself be a remote topic.
Returns null if not a remote topic.
*/
String upstreamTopic(String topic);
/** The name of the original source-topic, which may have been replicated multiple hops.
Returns the topic if it is not a remote topic.
*/
String originalTopic(String topic);
/** Internal topics are never replicated. */
boolean isInternalTopic(String topic);
}
MM2 속성의 replication.policy.class에 대해 클래스 참조를 사용하려면 구현을 Kafka 클래스 경로에 추가해야 합니다.