다음을 통해 공유


Azure Cosmos DB용 Kafka Connect - 싱크 커넥터 v2

적용 대상: NoSQL

Azure Cosmos DB용 Kafka Connect는 Azure Cosmos DB의 데이터를 읽고 쓰는 커넥터입니다. Azure Cosmos DB 싱크 커넥터를 사용하여 Apache Kafka 토픽의 데이터를 Azure Cosmos DB 데이터베이스로 내보낼 수 있습니다. 커넥터는 Kafka의 데이터를 폴링하여 토픽 구독을 기반으로 데이터베이스의 컨테이너에 씁니다.

필수 조건

  • 작업할 완전한 환경을 제공하므로 Confluent Platform 설정으로 시작합니다. Confluent Platform을 사용하지 않으려면 Zookeeper, Apache Kafka, Kafka Connect를 직접 설치하고 구성해야 합니다. 또한 Azure Cosmos DB 커넥터를 수동으로 설치하고 구성해야 합니다.
  • Azure Cosmos DB 계정 만들기, 컨테이너 설정 가이드
  • Bash 셸 - 이 셸은 Cloud Shell 또는 WSL1에서 작동하지 않습니다.
  • Java 11 이상 다운로드
  • Maven 다운로드

싱크 커넥터 설치

권장 되는 Confluent 플랫폼 설정을 사용하는 경우 Azure Cosmos DB 싱크 커넥터가 설치에 포함되며 이 단계를 건너뛸 수 있습니다.

이외의 경우에는 최신 릴리스에서 JAR 파일을 다운로드하거나 이 리포지토리를 패키지하여 새 JAR 파일을 만들 수 있습니다. JAR 파일을 사용하여 커넥터를 수동으로 설치하려면 다음 지침을 참조하세요. 소스 코드에서 새 JAR 파일을 패키지할 수도 있습니다.

# clone the azure-sdk-for-java repo if you haven't done so already
git clone https://github.com/Azure/azure-sdk-for-java.git
cd sdk/cosmos

mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install

# include the following JAR file in Kafka Connect installation
ls target/azure-cosmos-kafka-connect-*.jar

Kafka 토픽 만들기 및 데이터 쓰기

Confluent Platform을 사용하는 경우 Kafka 토픽을 만드는 가장 쉬운 방법은 제공된 제어 센터 UX를 사용하는 것입니다. 이외의 경우에는 다음 구문을 사용하여 Kafka 토픽을 수동으로 만들 수 있습니다.

./kafka-topics.sh --create --zookeeper <ZOOKEEPER_URL:PORT> --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic <TOPIC_NAME>

이 시나리오에서는 "hotels"라는 Kafka 토픽을 만들고 스키마가 포함되지 않은 JSON 데이터를 토픽에 씁니다. 제어 센터 내에 토픽을 만들려면 Confluent 가이드를 참조하세요.

다음으로 Kafka 콘솔 생산자를 시작하여 "호텔" 토픽에 몇 가지 레코드를 작성합니다.

# Option 1: If using Codespaces, use the built-in CLI utility
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 2: Using this repo's Confluent Platform setup, first exec into the broker container
docker exec -it broker /bin/bash
kafka-console-producer --broker-list localhost:9092 --topic hotels

# Option 3: Using your Confluent Platform setup and CLI install
<path-to-confluent>/bin/kafka-console-producer --broker-list <kafka broker hostname> --topic hotels

콘솔 생산자에서 다음을 입력합니다.

{"id": "h1", "HotelName": "Marriott", "Description": "Marriott description"}
{"id": "h2", "HotelName": "HolidayInn", "Description": "HolidayInn description"}
{"id": "h3", "HotelName": "Motel8", "Description": "Motel8 description"}

입력된 세 개의 레코드는 "hotels" Kafka 토픽에 JSON 형식으로 게시됩니다.

싱크 커넥터 만들기

Kafka Connect에서 Azure Cosmos DB 싱크 커넥터를 만듭니다. 다음 JSON 본문은 싱크 커넥터의 구성을 정의합니다. 필수 구성 요소의 Azure Cosmos DB 설치 가이드에서 저장한 속성 및 azure.cosmos.account.key값을 azure.cosmos.account.endpoint 바꿔야 합니다.

이러한 각 구성 속성에 대한 자세한 내용은 싱크 속성을 참조하세요.

{
  "name": "cosmosdb-sink-connector-v2",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.CosmosSinkConnector",
    "tasks.max": "5",
    "topics": "{topic}",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "azure.cosmos.account.endpoint":"{endpoint}",
    "azure.cosmos.account.key":"{masterKey}",
    "azure.cosmos.applicationName": "{applicationName}",
    "azure.cosmos.sink.database.name":"{databaseName}",
    "azure.cosmos.sink.containers.topicMap":"{topic}#{container}"
  }
}

모든 값을 채운 후 JSON 파일을 로컬 위치에 저장합니다. REST API를 사용하여 커넥터를 만드는 데 이 파일을 사용할 수 있습니다.

제어 센터를 사용하여 커넥터 만들기

커넥터를 만드는 간편한 옵션은 제어 센터 웹 페이지를 통해 진행하는 것입니다. 이 설치 가이드에 따라 제어 센터에서 커넥터를 만듭니다. DatagenConnector 옵션을 사용하는 대신 CosmosSinkConnector 타일을 사용합니다. 싱크 커넥터를 구성할 때 JSON 파일에 채워진 값을 입력합니다.

또는 커넥터 페이지에서 커넥터 구성 파일 업로드 옵션을 사용하여 앞에서 만든 JSON 파일을 업로드할 수 있습니다.

커넥터 찾아보기 대화 상자에서 '커넥터 구성 파일 업로드' 옵션의 스크린샷

REST API를 사용하여 커넥터 만들기

Kafka Connect REST API를 사용하여 싱크 커넥터를 만듭니다.

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

Azure Cosmos DB에 기록된 데이터 확인

Azure Portal에 로그인하고 Azure Cosmos DB 계정으로 이동합니다. "호텔" 토픽의 세 레코드가 계정에 만들어지는지 확인합니다.

정리

제어 센터에서 커넥터를 삭제하려면 만든 싱크 커넥터로 이동하여 삭제 아이콘을 선택합니다.

싱크 커넥터 대화 상자에서 삭제 옵션의 스크린샷

또는 Kafka Connect REST API를 사용하여 다음을 삭제합니다.

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-sink-connector

Azure CLI를 사용하여 생성된 Azure Cosmos DB 서비스와 해당 리소스 그룹을 삭제하려면 이 단계를 참조하세요.

싱크 구성 속성

다음 설정은 Azure Cosmos DB Kafka 싱크 커넥터를 구성하는 데 사용됩니다. 이러한 구성 값은 사용할 Kafka 토픽, Azure Cosmos DB 컨테이너에 기록되는 항목 및 데이터를 직렬화하는 형식을 결정합니다. 기본값과 함께 예제 구성 파일에 관해서는 이 구성을 참조하세요.

구성 속성 이름 기본값 설명
connector.class None Azure Cosmos DB 원본의 클래스 이름입니다. com.azure.cosmos.kafka.connect.CosmosSinkConnector로 설정되어야 합니다.
azure.cosmos.account.endpoint None Cosmos DB 계정 엔드포인트 URI
azure.cosmos.account.environment Azure Cosmos DB 계정의 Azure 환경: Azure, AzureChina, AzureUsGovernmentAzureGermany.
azure.cosmos.account.tenantId "" Cosmos DB 계정의 tenantId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.auth.type MasterKey 현재 MasterKey지원되는 인증 유형은 두 가지입니다. (PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrincipal
azure.cosmos.account.key "" Cosmos DB 계정 키(필요한 경우에만 auth.type MasterKey)
azure.cosmos.auth.aad.clientId "" 서비스 주체의 clientId/ApplicationId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.auth.aad.clientSecret "" 서비스 주체의 클라이언트 암호/암호입니다.
azure.cosmos.mode.gateway false 게이트웨이 모드를 사용할지 여부를 나타내는 플래그입니다. 기본적으로 false입니다. 즉, SDK는 직접 모드를 사용합니다.
azure.cosmos.preferredRegionList [] 다중 지역 Cosmos DB 계정에 사용할 기본 설정 지역 목록입니다. 힌트로 사용할 쉼표로 구분된 값(예: [East US, West US] 또는 East US, West US제공된 기본 영역)입니다. Cosmos DB 계정과 함께 배치된 kafka 클러스터를 사용하고 kafka 클러스터 지역을 기본 지역으로 전달해야 합니다.
azure.cosmos.application.name "" 애플리케이션 이름. userAgent 접미사로 추가됩니다.
azure.cosmos.throughputControl.enabled false 처리량 제어를 사용할 수 있는지 여부를 나타내는 플래그입니다.
azure.cosmos.throughputControl.account.endpoint "" Cosmos DB 처리량 제어 계정 엔드포인트 Uri입니다.
azure.cosmos.throughputControl.account.environment Azure Cosmos DB 계정의 Azure 환경: Azure, AzureChina, AzureUsGovernmentAzureGermany.
azure.cosmos.throughputControl.account.tenantId "" Cosmos DB 계정의 tenantId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.throughputControl.auth.type MasterKey 현재 MasterKey지원되는 인증 유형은 두 가지입니다. (PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrincipal
azure.cosmos.throughputControl.account.key "" Cosmos DB 처리량 제어 계정 키(필요한 경우에만 throughputControl.auth.type MasterKey필요).
azure.cosmos.throughputControl.auth.aad.clientId "" 서비스 주체의 clientId/ApplicationId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.throughputControl.auth.aad.clientSecret "" 서비스 주체의 클라이언트 암호/암호입니다.
azure.cosmos.throughputControl.preferredRegionList [] 다중 지역 Cosmos DB 계정에 사용할 기본 설정 지역 목록입니다. 힌트로 사용할 기본 영역을 제공하는 쉼표로 구분된 값(예 [East US, West US] : 또는 East US, West US)입니다. Cosmos DB 계정과 함께 배치된 kafka 클러스터를 사용하고 kafka 클러스터 지역을 기본 지역으로 전달해야 합니다.
azure.cosmos.throughputControl.mode.gateway false 게이트웨이 모드를 사용할지 여부를 나타내는 플래그입니다. 기본적으로 false입니다. 즉, SDK는 직접 모드를 사용합니다.
azure.cosmos.throughputControl.group.name "" 처리량 제어 그룹 이름입니다. 고객이 컨테이너에 대해 많은 그룹을 만들 수 있으므로 이름은 고유해야 합니다.
azure.cosmos.throughputControl.targetThroughput -1 처리량 제어 그룹 대상 처리량입니다. 값은 0보다 커야 합니다.
azure.cosmos.throughputControl.targetThroughputThreshold -1 처리량 제어 그룹 대상 처리량 임계값입니다. 값은 (0,1] 사이여야 합니다.
azure.cosmos.throughputControl.priorityLevel None 처리량 제어 그룹 우선 순위 수준입니다. 값은 None, High 또는 Low일 수 있습니다.
azure.cosmos.throughputControl.globalControl.database.name "" 처리량 전역 제어에 사용할 데이터베이스입니다.
azure.cosmos.throughputControl.globalControl.container.name "" 처리량 전역 제어에 사용할 컨테이너입니다.
azure.cosmos.throughputControl.globalControl.renewIntervalInMS -1 이렇게 하면 클라이언트가 자체의 처리량 사용량을 업데이트하고 다른 클라이언트의 처리량 사용량에 따라 자체 처리량 공유를 조정하는 빈도를 제어합니다. 기본값은 5이고, 허용되는 최소값은 5s입니다.
azure.cosmos.throughputControl.globalControl.expireIntervalInMS -1 이렇게 하면 클라이언트가 오프라인 상태인 것을 빠르게 감지하여 다른 클라이언트에서 처리량 공유를 수행할 수 있습니다. 기본값은 11 s이고, 허용되는 최소값은 2 * renewIntervalInMS + 1입니다.
azure.cosmos.sink.database.name "" Cosmos DB 데이터베이스 이름입니다.
azure.cosmos.sink.containers.topicMap "" Cosmos 컨테이너에 매핑된 Kafka 토픽의 쉼표로 구분된 목록입니다. 예: topic1#con1, topic2#con2.
azure.cosmos.sink.errors.tolerance.level None 모든 재시도를 모두 소진한 후의 오류 허용 오차 수준입니다. None 오류가 발생했습니다. All 로그 및 계속
azure.cosmos.sink.bulk.enabled true 싱크 커넥터에 Cosmos DB 대량 모드를 사용할 수 있는지 여부를 나타내는 플래그입니다. 기본적으로 true입니다.
azure.cosmos.sink.bulk.maxConcurrentCosmosPartitions -1 Cosmos DB 최대 동시 Cosmos 파티션 지정하지 않으면 모든 일괄 처리에 모든 Cosmos 물리적 파티션의 데이터가 있어야 함을 나타내는 컨테이너의 실제 파티션 수에 따라 결정됩니다. 지정된 경우 각 일괄 처리 데이터에 대한 Cosmos DB 물리적 파티션 수를 나타냅니다. 따라서 각 일괄 처리의 입력 데이터가 각 일괄 처리에서 작성해야 하는 Cosmos 파티션 수와 균형을 맞추기 위해 다시 분할될 때 대량 처리를 보다 효율적으로 만드는 데 이 구성을 사용할 수 있습니다. 이는 대규모 컨테이너(수백 개의 실제 파티션이 있는)에 유용합니다.
azure.cosmos.sink.bulk.initialBatchSize 1 Cosmos DB 초기 대량 마이크로 일괄 처리 크기 - 큐에 추가된 문서 수가 이 크기를 초과할 때 백 엔드로 플러시될 마이크로 일괄 처리이거나 대상 페이로드 크기가 충족됩니다. 마이크로 일괄 처리 크기는 제한 속도에 따라 자동으로 조정됩니다. 기본적으로 초기 마이크로 일괄 처리 크기는 1입니다. 처음 몇 개의 요청이 너무 많은 RU를 사용하지 않도록 하려는 경우 이를 줄입니다.
azure.cosmos.sink.write.strategy ItemOverwrite Cosmos DB는 전략을 작성합니다. ItemOverwrite (upsert 사용), ItemAppend (create 사용, 기존 항목인 충돌( 충돌), ItemDelete (데이터 프레임의 ID/pk에 따라 삭제), ItemDeleteIfNotModified (id/pk를 수집한 이후 etag가 변경되지 않은 경우 데이터 프레임의 ID/pk에 따라 삭제), ItemOverwriteIfNotModified (etag가 비어 있는 경우 만들기를 사용하고, 그렇지 않으면 etag 사전 조건으로 업데이트/바꾸기, 문서가 업데이트된 경우 사전 조건 실패는 무시됨) 무시됩니다. ItemPatch (패치 구성을 기반으로 모든 문서 부분 업데이트)
azure.cosmos.sink.maxRetryCount 10 Cosmos DB는 싱크 커넥터에 대한 쓰기 실패 시 최대 재시도 횟수입니다. 기본적으로 커넥터는 최대 10회 동안 일시적인 쓰기 오류를 다시 시도합니다.
azure.cosmos.sink.id.strategy ProvidedInValueStrategy 문서를 .으로 채우는 데 사용되는 전략입니다 id. 유효한 전략은 다음과 같습니다. TemplateStrategy, FullKeyStrategy, ProvidedInKeyStrategyKafkaMetadataStrategy. ProvidedInValueStrategy 접두사로 접두사로 지정된id.strategy 구성 속성은 전략에 전달됩니다. 예를 들어 사용하는 id.strategy=TemplateStrategy 경우 속성 id.strategy.template 은 템플릿 전략으로 전달되고 템플릿을 생성하는 데 사용할 템플릿 문자열을 지정하는 id데 사용됩니다.
azure.cosmos.sink.write.patch.operationType.default Set 기본 Cosmos DB 패치 작업 유형입니다. 지원되는 항목에는 없음, 추가, 설정, 바꾸기, 제거, 증가가 포함됩니다. no-op에 대해 없음을 선택합니다.
azure.cosmos.sink.write.patch.property.configs "" Cosmos DB patch json 속성 구성. 쉼표로 구분된 다음 패턴과 일치하는 여러 정의를 포함할 수 있습니다. property(jsonProperty).op(operationType) 또는 property(jsonProperty).path(patchInCosmosdb).op(operationType) - 두 번째 패턴의 차이점은 다른 Cosmos DB 경로를 정의할 수도 있다는 점입니다. 참고: 중첩된 json 속성 구성은 지원하지 않습니다.
azure.cosmos.sink.write.patch.filter "" 조건부 패치에 사용됨

데이터는 스키마 없이 Azure Cosmos DB에 JSON으로 기록됩니다.

지원되는 데이터 유형

Azure Cosmos DB Kafka 싱크 커넥터는 싱크 레코드를 다음 스키마 유형을 지원하는 JSON 문서로 변환합니다.

스키마 유형 JSON 데이터 형식
배열 배열
부울 부울
Float32 숫자
Float64 숫자
Int8 숫자
Int16 숫자
Int32 숫자
Int64 숫자
지도 개체(JSON)
문자열 문자열
Null
구조체 개체(JSON)

싱크 커넥터는 다음 AVRO 논리 형식도 지원합니다.

스키마 유형 JSON 데이터 형식
날짜 숫자
Time 숫자
타임스탬프 숫자

참고 항목

바이트 역직렬화는 현재 Azure Cosmos DB 싱크 커넥터에서 지원되지 않습니다.

SMT(단일 메시지 변환)

싱크 커넥터 설정과 함께 SMT(단일 메시지 변환)를 사용하여 Kafka Connect 플랫폼을 통해 이동하는 메시지를 수정하도록 지정할 수 있습니다. 자세한 내용은 Confluent SMT 설명서를 참조하세요.

InsertUUID SMT 사용

InsertUUID SMT를 사용하여 항목 ID를 자동으로 추가할 수 있습니다. 사용자 지정 InsertUUID SMT를 사용하면 Azure Cosmos DB에 기록되기 전에 각 메시지에 대해 임의 UUID 값을 사용하여 id 필드를 삽입할 수 있습니다.

Warning

이 SMT는 메시지가 id 필드가 없는 경우에만 사용합니다. 그렇지 않으면 id 값을 덮어쓰며 데이터베이스에 중복 항목이 생성될 수 있습니다. 메시지 ID로 UUID를 사용하는 것이 빠르고 쉬울 수 있지만 Azure Cosmos DB에서 사용할 이상적인 파티션 키가 아닙니다.

SMT 설치

SMT를 InsertUUID 사용하려면 Confluent Platform 설정에서 이 변환을 설치해야 합니다. 이 리포지토리에서 Confluent Platform 설정을 사용하는 경우 변환이 설치에 이미 포함되어 있으며 이 단계를 건너뛸 수 있습니다.

또는 InsertUUID 원본을 패키지하여 새 JAR 파일을 만들 수 있습니다. JAR 파일을 사용하여 커넥터를 수동으로 설치하려면 다음 지침을 참조하세요.

# clone the kafka-connect-insert-uuid repo
https://github.com/confluentinc/kafka-connect-insert-uuid.git
cd kafka-connect-insert-uuid

# package the source code into a JAR file
mvn clean package

# include the following JAR file in Confluent Platform installation
ls target/*.jar

SMT 구성

싱크 커넥터 구성 내에 다음 속성을 추가하여 id를 설정합니다.

"transforms": "insertID",
"transforms.insertID.type": "com.github.cjmatta.kafka.connect.smt.InsertUuid$Value",
"transforms.insertID.uuid.field.name": "id"

이 SMT 사용에 관한 자세한 내용은 InsertUUID 리포지토리를 참조하세요.

SMT를 사용하여 TTL(Time to Live) 구성

InsertFieldCast SMT를 둘 다 사용하면 Azure Cosmos DB에 생성된 각 항목에서 TTL을 구성할 수 있습니다. 항목 수준에서 TTL을 사용하도록 설정하기 전에 컨테이너에서 TTL을 사용하도록 설정합니다. 자세한 내용은 TTL(Time to Live) 문서를 참조하세요.

싱크 커넥터 구성 내에 다음 속성을 추가하여 TTL(초)을 설정합니다. 다음 예제에서는 TTL이 100초로 설정됩니다. 메시지에 이미 필드가 TTL 포함된 경우 이러한 SMT는 값을 덮어씁 TTL 니다.

"transforms": "insertTTL,castTTLInt",
"transforms.insertTTL.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTTL.static.field": "ttl",
"transforms.insertTTL.static.value": "100",
"transforms.castTTLInt.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTTLInt.spec": "ttl:int32"

이 SMT에 관한 자세한 내용은 InsertFieldCast 설명서를 참조하세요.

일반적인 문제 해결

Kafka 싱크 커넥터를 사용할 때 발생할 수 있는 몇 가지 일반적인 문제에 대한 해결 방법은 다음과 같습니다.

JsonConverter를 통해 비 JSON 데이터 읽기

Kafka의 원본 토픽에 JSON이 아닌 데이터가 있고 이 데이터를 사용하여 JsonConverter읽으려고 하면 다음 예외가 표시됩니다.

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x1cfa7e2 (above 0x0010ffff) at char #1, byte #7)

Avro 또는 CSV 문자열과 같은 다른 형식으로 serialize되는 원본 토픽에서 이 오류가 발생할 수 있습니다.

해결 방법: 토픽 데이터가 AVRO 형식인 경우 아래와 같이 Kafka Connect 싱크 커넥터를 AvroConverter 사용하도록 변경합니다.

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

AvroConverter를 사용하여 avro가 아닌 데이터 읽기

이 시나리오는 Avro 변환기를 사용하여 Avro 형식이 아닌 토픽에서 데이터를 읽으려고 할 때 적용됩니다. 여기에는 고유한 통신 형식인 Confluent Schema Registry의 Avro 직렬 변환기가 아닌 Avro 직렬 변환기에서 작성한 데이터가 포함됩니다.

org.apache.kafka.connect.errors.DataException: my-topic-name
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
…
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

해결 방법: 원본 토픽의 serialization 형식을 확인합니다. 그런 다음 커넥터를 전환하여 올바른 변환기를 사용하거나 업스트림 형식을 Avro로 전환합니다.

예상된 스키마/페이로드 구조가 없는 JSON 메시지 읽기

Kafka Connect는 다음과 같이 페이로드와 스키마를 둘 다 포함하는 JSON 메시지의 특수 구조를 지원합니다.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "userid"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      }
    ]
  },
  "payload": {
    "userid": 123,
    "name": "Sam"
  }
}

이 구조의 데이터가 포함되지 않은 JSON 데이터를 읽으려고 하면 다음 오류가 발생합니다.

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

정확히 말하면, schemas.enable=true에 유효한 유일한 JSON 구조에는 위에 표시된 것처럼 스키마 및 페이로드 필드가 최상위 요소로 포함됩니다. 오류 메시지에 설명된 대로 일반 JSON 데이터만 있는 경우 커넥터의 구성을 다음으로 변경해야 합니다.

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

제한 사항

  • Azure Cosmos DB에서 데이터베이스 및 컨테이너 자동 만들기는 지원되지 않습니다. 데이터베이스와 컨테이너는 이미 존재해야 하며 올바르게 구성되어야 합니다.

다음 단계

다음 문서를 사용하여 V4 Java SDK의 대량 작업에 대해 자세히 알아볼 수 있습니다.