다음을 통해 공유


Azure Cosmos DB용 Kafka Connect - 원본 커넥터 v2

적용 대상: NoSQL

Azure Cosmos DB용 Kafka Connect는 Azure Cosmos DB의 데이터를 읽고 쓰는 커넥터입니다. Azure Cosmos DB 원본 커넥터는 Azure Cosmos DB 변경 피드에서 데이터를 읽고 이 데이터를 Kafka 토픽에 게시하는 기능을 제공합니다.

필수 조건

  • 작업할 완전한 환경을 제공하므로 Confluent Platform 설정으로 시작합니다. Confluent Platform을 사용하지 않으려면 Zookeeper, Apache Kafka, Kafka Connect를 직접 설치하고 구성해야 합니다. 또한 Azure Cosmos DB 커넥터를 수동으로 설치하고 구성해야 합니다.
  • Azure Cosmos DB 계정 만들기, 컨테이너 설정 가이드
  • Bash 셸 - 이 셸은 Cloud Shell 또는 WSL1에서 작동하지 않습니다.
  • Java 11 이상 다운로드
  • Maven 다운로드

원본 커넥터 설치

권장되는 Confluent Platform 설정을 사용하면 Azure Cosmos DB 원본 커넥터가 설치에 포함되며 이 단계를 건너뛸 수 있습니다.

그렇지 않으면 최신 릴리스의 JAR 파일을 사용하고 커넥터를 수동으로 설치할 수 있습니다. 자세한 내용은 이 지침을 참조하세요. 소스 코드에서 새 JAR 파일을 패키지할 수도 있습니다.

# clone the azure-sdk-for-java repo if you haven't done so already
git clone https://github.com/Azure/azure-sdk-for-java.git
cd sdk/cosmos

mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos,azure-cosmos-tests -am clean install
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-kafka-connect clean install

# include the following JAR file in Kafka Connect installation
ls target/azure-cosmos-kafka-connect-*.jar

Kafka 토픽 만들기

Confluent Control Center를 사용하여 Kafka 토픽을 만듭니다. 이 시나리오에서는 "apparels"라는 Kafka 토픽을 만들고 비스케마 포함 JSON 데이터를 토픽에 씁니다. 제어 센터 내에서 토픽을 만들려면 Kafka 토픽 문서 만들기를 참조하세요.

원본 커넥터 만들기

Kafka Connect에서 원본 커넥터 만들기

Kafka Connect에서 Azure Cosmos DB 원본 커넥터를 만들려면 다음 JSON 구성을 사용합니다. 필수 구성 요소의 Azure Cosmos DB 설치 가이드에서 저장한 azure.cosmos.account.endpoint, azure.cosmos.account.key 속성에 대한 자리 표시자 값을 바꿔야 합니다.

{
  "name": "cosmosdb-source-connector-v2",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.CosmosSourceConnector",
    "tasks.max": "5",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "azure.cosmos.account.endpoint":"{endpoint}",
    "azure.cosmos.account.key":"{masterKey}",
    "azure.cosmos.application.name": "{applicationName}",
    "azure.cosmos.source.database.name":"{database}",
    "azure.cosmos.source.containers.includedList":"{container}",
    "azure.cosmos.source.changeFeed.maxItemCountHint":"500",
    "azure.cosmos.source.containers.topicMap":"{topic}#{container}",
    "azure.cosmos.source.metadata.storage.type":"Cosmos",
    "azure.cosmos.source.metadata.storage.name":"{metadataContainerName}"
  }
}

위의 각 구성 속성에 대한 자세한 내용은 원본 속성 섹션을 참조하세요. 모든 값을 채운 후 JSON 파일을 로컬 위치에 저장합니다. REST API를 사용하여 커넥터를 만드는 데 이 파일을 사용할 수 있습니다.

제어 센터를 사용하여 커넥터 만들기

Confluent Control Center 포털에서 커넥터를 쉽게 만들 수 있습니다. Confluent 설정 가이드에 따라 제어 센터에서 커넥터를 만듭니다. 설정할 때 DatagenConnector 옵션을 사용하는 대신 CosmosDBSourceConnector 타일을 사용합니다. 원본 커넥터를 구성할 때 JSON 파일에 채워진 값을 입력합니다.

또는 커넥터 페이지에서 커넥터 구성 파일 업로드 옵션을 사용하여 이전 섹션에서 빌드한 JSON 파일을 업로드할 수 있습니다.

커넥터 찾아보기 대화 상자에서 '커넥터 구성 파일 업로드' 옵션의 스크린샷

REST API를 사용하여 커넥터 만들기

Kafka Connect REST API를 사용하여 원본 커넥터 만들기

# Curl to Kafka connect service
curl -H "Content-Type: application/json" -X POST -d @<path-to-JSON-config-file> http://localhost:8083/connectors

Azure Cosmos DB에 문서 삽입

  1. Azure Portal에 로그인하고 Azure Cosmos DB 계정으로 이동합니다.

  2. 데이터 탐색 탭을 열고 데이터베이스를 선택합니다.

  3. 이전에 만든 "kafkaconnect" 데이터베이스와 "kafka" 컨테이너를 엽니다.

  4. 새 JSON 문서를 만들려면 API for NoSQL 창에서 "kafka" 컨테이너를 확장하고 항목을 선택한 다음, 도구 모음에서 새 항목을 선택합니다.

  5. 이제 다음 구조를 사용하여 컨테이너에 문서를 추가합니다. 다음 샘플 JSON 블록을 항목 탭에 붙여넣어 현재 콘텐츠를 덮어씁니다.

    
    {
      "id": "2",
      "productId": "33218897",
      "category": "Women's Outerwear",
      "manufacturer": "Contoso",
      "description": "Black wool pea-coat",
      "price": "49.99",
      "shipping": {
        "weight": 2,
        "dimensions": {
          "width": 8,
          "height": 11,
          "depth": 3
        }
      }
    }
    
    
  6. 저장을 선택합니다.

  7. 왼쪽 메뉴에서 항목을 확인하여 문서가 저장되어 있는지 확인합니다.

Kafka 토픽에 기록된 데이터 확인

  1. http://localhost:9000에서 Kafka 토픽 UI를 엽니다.
  2. 만든 Kafka "apparels" 토픽을 선택합니다.
  3. 이전에 Azure Cosmos DB에 삽입한 문서가 Kafka 토픽에 나타나는지 확인합니다.

청소

Confluent 제어 센터에서 커넥터를 삭제하려면 만든 원본 커넥터로 이동하여 삭제 아이콘을 선택합니다.

원본 커넥터 대화 상자에서 삭제 옵션의 스크린샷

또는 커넥터의 REST API를 사용합니다.

# Curl to Kafka connect service
curl -X DELETE http://localhost:8083/connectors/cosmosdb-source-connector

Azure CLI를 사용하여 생성된 Azure Cosmos DB 서비스와 해당 리소스 그룹을 삭제하려면 이 단계를 참조하세요.

원본 구성 속성

다음 설정은 Kafka 원본 커넥터를 구성하는 데 사용됩니다. 이러한 구성 값은 사용되는 Azure Cosmos DB 컨테이너, Kafka 토픽이 작성된 데이터, 데이터를 직렬화할 형식을 결정합니다. 기본값이 있는 예는 이 구성 파일을 참조하세요.

구성 속성 이름 기본값 설명
connector.class 없음 Azure Cosmos DB 원본의 클래스 이름입니다. com.azure.cosmos.kafka.connect.CosmosSourceConnector로 설정되어야 합니다.
azure.cosmos.account.endpoint 없음 Cosmos DB 계정 엔드포인트 URI
azure.cosmos.account.environment Azure Cosmos DB 계정의 Azure 환경: Azure, AzureChina, AzureUsGovernmentAzureGermany.
azure.cosmos.account.tenantId "" Cosmos DB 계정의 tenantId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.auth.type MasterKey 현재 MasterKey지원되는 인증 유형은 두 가지입니다. (PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrincipal
azure.cosmos.account.key "" Cosmos DB 계정 키(auth.typeMasterKey인 경우에만 필요)
azure.cosmos.auth.aad.clientId "" 서비스 주체의 clientId/ApplicationId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.auth.aad.clientSecret "" 서비스 주체의 클라이언트 암호입니다.
azure.cosmos.mode.gateway false 게이트웨이 모드를 사용할지 여부를 나타내는 플래그입니다. 기본적으로 false입니다. 즉, SDK는 직접 모드를 사용합니다.
azure.cosmos.preferredRegionList [] 다중 지역 Cosmos DB 계정에 사용할 기본 설정 지역 목록입니다. 힌트로 사용할 쉼표로 구분된 값(예: [East US, West US] 또는 East US, West US)으로 제공된 선호 지역입니다. Cosmos DB 계정과 함께 배치된 kafka 클러스터를 사용하고 kafka 클러스터 지역을 기본 지역으로 전달해야 합니다.
azure.cosmos.application.name "" 애플리케이션 이름. 이는 userAgent 접미사로 추가됩니다.
azure.cosmos.throughputControl.enabled false 처리량 제어를 사용할 수 있는지 여부를 나타내는 플래그입니다.
azure.cosmos.throughputControl.account.endpoint "" Cosmos DB 처리량 제어 계정 엔드포인트 URI입니다.
azure.cosmos.throughputControl.account.environment Azure Cosmos DB 계정의 Azure 환경: Azure, AzureChina, AzureUsGovernmentAzureGermany.
azure.cosmos.throughputControl.account.tenantId "" Cosmos DB 계정의 tenantId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.throughputControl.auth.type MasterKey 현재 MasterKey지원되는 인증 유형은 두 가지입니다. (PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrincipal
azure.cosmos.throughputControl.account.key "" Cosmos DB 처리량 제어 계정 키(throughputControl.auth.typeMasterKey인 경우에만 필요).
azure.cosmos.throughputControl.auth.aad.clientId "" 서비스 주체의 clientId/ApplicationId입니다. 인증에 ServicePrincipal 필요합니다.
azure.cosmos.throughputControl.auth.aad.clientSecret "" 서비스 주체의 클라이언트 암호입니다.
azure.cosmos.throughputControl.preferredRegionList [] 다중 지역 Cosmos DB 계정에 사용할 기본 설정 지역 목록입니다. 힌트로 사용할 쉼표로 구분된 값(예: [East US, West US] 또는 East US, West US)으로 제공된 선호 지역입니다. Cosmos DB 계정과 함께 배치된 kafka 클러스터를 사용하고 kafka 클러스터 지역을 기본 지역으로 전달해야 합니다.
azure.cosmos.throughputControl.mode.gateway false 게이트웨이 모드를 사용할지 여부를 나타내는 플래그입니다. 기본적으로 false입니다. 즉, SDK는 직접 모드를 사용합니다.
azure.cosmos.throughputControl.group.name "" 처리량 제어 그룹 이름입니다. 고객이 컨테이너에 대해 많은 그룹을 만들 수 있으므로 이름은 고유해야 합니다.
azure.cosmos.throughputControl.targetThroughput -1 처리량 컨트롤 그룹 목표 처리량 값은 0보다 커야 합니다.
azure.cosmos.throughputControl.targetThroughputThreshold -1 처리량 제어 그룹 대상 처리량 임계값입니다. 값은 (0,1] 사이여야 합니다.
azure.cosmos.throughputControl.priorityLevel None 처리량 제어 그룹 우선 순위 수준입니다. 값은 None, High 또는 Low일 수 있습니다.
azure.cosmos.throughputControl.globalControl.database.name "" 처리량 전역 제어에 사용되는 데이터베이스입니다.
azure.cosmos.throughputControl.globalControl.container.name "" 처리량 전역 제어에 사용되는 컨테이너입니다.
azure.cosmos.throughputControl.globalControl.renewIntervalInMS -1 이렇게 하면 클라이언트가 자체의 처리량 사용량을 업데이트하고 다른 클라이언트의 처리량 사용량에 따라 자체 처리량 공유를 조정하는 빈도를 제어합니다. 기본값은 5이고, 허용되는 최소값은 5s입니다.
azure.cosmos.throughputControl.globalControl.expireIntervalInMS -1 이렇게 하면 클라이언트가 오프라인 상태인 것을 빠르게 감지하여 다른 클라이언트에서 처리량 공유를 수행할 수 있습니다. 기본값은 11 s이고, 허용되는 최소값은 2 * renewIntervalInMS + 1입니다.
azure.cosmos.source.database.name 없음 Cosmos DB 데이터베이스 이름입니다.
azure.cosmos.source.containers.includeAll false 모든 컨테이너에서 읽는지 여부를 나타내는 플래그입니다.
azure.cosmos.source.containers.includedList [] 컨테이너가 포함되어 있습니다. azure.cosmos.source.containers.includeAll이 true이면 이 구성은 무시됩니다.
azure.cosmos.source.containers.topicMap [] Cosmos 컨테이너에 매핑된 Kafka 토픽의 쉼표로 구분된 목록입니다. 예: topic1#con1, topic2#con2. 기본적으로 컨테이너 이름은 데이터를 게시할 kafka 토픽의 이름으로 사용되며, 이 속성을 사용하여 기본 구성을 재정의할 수 있습니다.
azure.cosmos.source.changeFeed.startFrom Beginning 설정에서 ChangeFeed 시작 시점 선택 (지금, 시작 또는 특정 시간(UTC), 예를 들어 2020-02-10T14:15:03) 기본값은 '시작'입니다.
azure.cosmos.source.changeFeed.mode LatestVersion ChangeFeed 모드(LatestVersion 또는 AllVersionsAndDeletes)입니다.
azure.cosmos.source.changeFeed.maxItemCountHint 1000 단일 변경 피드 요청에서 반환되는 최대 문서 수입니다. 그러나 여러 항목이 동일한 트랜잭션에 의해 변경되는 경우 받은 항목 수가 지정된 값보다 높을 수 있습니다.
azure.cosmos.source.metadata.poll.delay.ms 300000 메타데이터 변경 내용(컨테이너 분할/병합, 컨테이너 추가/제거/다시 만들기 포함)을 확인하는 빈도를 나타냅니다. 변경 내용이 검색되면 작업을 다시 구성합니다. 기본값은 5분입니다.
azure.cosmos.source.metadata.storage.type Kafka 메타데이터의 스토리지 유형입니다. Cosmos, Kafka의 두 가지 유형이 지원됩니다.
azure.cosmos.source.metadata.storage.name _cosmos.metadata.topic 메타데이터 스토리지의 리소스 이름입니다. 메타데이터 스토리지 유형이 Kafka 토픽인 경우 이 구성은 kafka 토픽 이름을 참조하고, 메타데이터 토픽이 아직 없는 경우 만들어지고, 그렇지 않으면 미리 만든 토픽을 사용합니다. 메타데이터 스토리지 유형이 Cosmos인 경우, 이 구성은 컨테이너 이름을 참조합니다. MasterKey 인증의 경우, 이 컨테이너는 아직 존재하지 않을 때 4,000RU로 AutoScale 생성됩니다. ServicePrincipal 인증을 위해서는 컨테이너가 사전에 만들어져 있어야 합니다.
azure.cosmos.source.messageKey.enabled true Kafka 레코드 메시지 키를 설정할지 여부.
azure.cosmos.source.messageKey.field id 메시지 키로 사용할 필드입니다.

지원되는 데이터 형식

Azure Cosmos DB 원본 커넥터는 JSON 문서를 스키마로 변환하고 다음 JSON 데이터 형식을 지원합니다.

JSON 데이터 형식 스키마 유형
배열 배열
불리언 (Boolean) 불리언 (Boolean)
숫자 Float32
Float64
Int8
Int16
Int32
Int64
없음 문자열
개체(JSON) 구조체
문자열 문자열

다음 단계