다음을 통해 공유


Azure Event Hubs 및 Kafka 데이터 흐름 엔드포인트 구성

Important

Azure IoT 작업 미리 보기 - Azure Arc에서 지원되는 Azure IoT 작업은 현재 preview로 제공됩니다. 프로덕션 환경에서는 이 미리 보기 소프트웨어를 사용하면 안 됩니다.

일반적으로 릴리스되는 릴리스가 제공되면 새로운 Azure IoT 작업 설치를 배포해야 합니다. 미리 보기 설치는 업그레이드할 수 없습니다.

베타, 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 약관은 Microsoft Azure 미리 보기에 대한 추가 사용 약관을 참조하세요.

Azure IoT Operations Preview와 Apache Kafka broker 간에 양방향 통신을 설정하려면 데이터 흐름 엔드포인트를 구성할 수 있습니다. 이 구성을 사용하면 엔드포인트, TLS(전송 계층 보안), 인증 및 기타 설정을 지정할 수 있습니다.

필수 조건

Azure Event Hubs

Azure Event Hubs는 Kafka 프로토콜 과 호환되며 몇 가지 제한 사항이 있는 데이터 흐름과 함께 사용할 수 있습니다.

Azure Event Hubs 네임스페이스 및 이벤트 허브 만들기

먼저 Kafka 지원 Azure Event Hubs 네임스페이스를 만듭니다.

다음으로 네 임스페이스에 이벤트 허브를 만듭니다. 각 개별 이벤트 허브는 Kafka 토픽에 해당합니다. 동일한 네임스페이스에 여러 이벤트 허브를 만들어 여러 Kafka 토픽을 나타낼 수 있습니다.

Event Hubs 네임스페이스에 관리 ID 할당

Kafka 엔드포인트에 대한 데이터 흐름 엔드포인트를 구성하려면 Azure Arc 지원 Kubernetes 클러스터의 관리 ID를 사용하는 것이 좋습니다. 이 방법은 안전하며 비밀 관리가 필요하지 않습니다.

  1. Azure Portal에서 Azure IoT Operations 인스턴스로 이동하여 개요를 선택합니다.
  2. Azure IoT Operations Arc 확장 후에 나열된 확장의 이름을 복사합니다. 예를 들어 azure-iot-operations-xxxx7입니다.
  3. 확장 이름을 사용하여 Azure Portal에서 관리 ID를 검색합니다. 예를 들어 azure-iot-operations-xxxx7을 검색합니다.
  4. 또는 Azure Event Hubs Data Receiver 역할이 있는 Event Hubs 네임스페이스에 Azure IoT Operations Arc 확장 관리 ID를 Azure Event Hubs Data Sender 할당합니다.

데이터 흐름 엔드포인트 만들기

마지막으로 DataflowEndpoint 리소스를 만듭니다. 사용자 고유의 값을 사용하여 자리 표시자 값(예: <ENDPOINT_NAME>.)을 대체합니다.

  1. 작업 환경에서 데이터 흐름 엔드포인트 탭을 선택합니다.

  2. 새 데이터 흐름 엔드포인트 만들기에서 Azure Event Hubs 새로 만들기를>선택합니다.

    작업 환경을 사용하여 Azure Event Hubs 데이터 흐름 엔드포인트를 만드는 스크린샷

  3. 엔드포인트에 대해 다음 설정을 입력합니다.

    설정 Description
    이름 데이터 흐름 엔드포인트의 이름입니다.
    호스트 Kafka broker의 호스트 이름입니다 <NAMEPSACE>.servicebus.windows.net:9093. Event Hubs에 대한 호스트 설정에 포트 번호를 9093 포함합니다.
    인증 방법 인증에 사용되는 방법입니다. 시스템 할당 관리 ID 선택
  4. 적용을 선택하여 엔드포인트를 프로비전합니다.

참고 항목

Kafka 토픽 또는 개별 이벤트 허브는 나중에 데이터 흐름을 만들 때 구성됩니다. Kafka 토픽은 데이터 흐름 메시지의 대상입니다.

Event Hubs 인증에 연결 문자열 사용

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 기본 탭을 선택한 다음, 인증 방법>SASL을 선택합니다.

엔드포인트에 대해 다음 설정을 입력합니다.

설정 설명
SASL 형식 Plain을 선택합니다.
동기화된 비밀 이름 연결 문자열 포함하는 Kubernetes 비밀의 이름입니다.
사용자 이름 참조 또는 토큰 비밀 SASL 인증에 사용되는 사용자 이름 또는 토큰 비밀에 대한 참조입니다.
토큰 비밀의 암호 참조 SASL 인증에 사용되는 암호 또는 토큰 비밀에 대한 참조입니다.

제한 사항

Azure Event Hubs는 Kafka에서 지원하는 모든 압축 유형을 지원하지 않습니다. GZIP 압축만 현재 Azure Event Hubs 프리미엄 및 전용 계층에서 지원됩니다. 다른 압축 형식을 사용하면 오류가 발생할 수 있습니다.

사용자 지정 Kafka broker

비 이벤트 허브 Kafka broker에 대한 데이터 흐름 엔드포인트를 구성하려면 필요에 따라 호스트, TLS, 인증 및 기타 설정을 지정합니다.

  1. 작업 환경에서 데이터 흐름 엔드포인트 탭을 선택합니다.

  2. 새 데이터 흐름 엔드포인트 만들기에서 사용자 지정 Kafka Broker>새로 만들기를 선택합니다.

    작업 환경을 사용하여 Kafka 데이터 흐름 엔드포인트를 만드는 스크린샷

  3. 엔드포인트에 대해 다음 설정을 입력합니다.

    설정 Description
    이름 데이터 흐름 엔드포인트의 이름입니다.
    호스트 Kafka broker의 호스트 이름입니다 <Kafa-broker-host>:xxxx. 호스트 설정에 포트 번호를 포함합니다.
    인증 방법 인증에 사용되는 방법입니다. SASL을 선택합니다.
    SASL 형식 SASL 인증의 유형입니다. 일반, ScramSha256 또는 ScramSha512를 선택합니다. SASL을 사용하는 경우 필요합니다.
    동기화된 비밀 이름 비밀의 이름입니다. SASL을 사용하는 경우 필요합니다.
    토큰 비밀의 사용자 이름 참조 SASL 토큰 비밀의 사용자 이름에 대한 참조입니다. SASL을 사용하는 경우 필요합니다.
  4. 적용을 선택하여 엔드포인트를 프로비전합니다.

참고 항목

현재 작업 환경은 Kafka 데이터 흐름 엔드포인트를 원본으로 사용하는 것을 지원하지 않습니다. Kubernetes 또는 Bicep을 사용하여 원본 Kafka 데이터 흐름 엔드포인트를 사용하여 데이터 흐름을 만들 수 있습니다.

엔드포인트 설정을 사용자 지정하려면 다음 섹션을 사용하여 자세한 내용을 확인하세요.

사용 가능한 인증 방법

Kafka broker 데이터 흐름 엔드포인트에 사용할 수 있는 인증 방법은 다음과 같습니다. Azure Key Vault를 구성하고 워크로드 ID를 사용하도록 설정하여 보안 설정을 사용하도록 설정하는 방법에 대한 자세한 내용은 Azure IoT Operations 미리 보기 배포에서 보안 설정 사용을 참조하세요.

SASL

인증에 SASL을 사용하려면 SASL 인증 방법을 지정하고 SASL 토큰이 포함된 비밀 이름으로 SASL 형식 및 비밀 참조를 구성합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 기본 탭을 선택한 다음, 인증 방법>SASL을 선택합니다.

엔드포인트에 대해 다음 설정을 입력합니다.

설정 설명
SASL 형식 사용할 SASL 인증 유형입니다. 지원되는 형식은 Plain, ScramSha256ScramSha512입니다.
동기화된 비밀 이름 SASL 토큰을 포함하는 Kubernetes 비밀의 이름입니다.
사용자 이름 참조 또는 토큰 비밀 SASL 인증에 사용되는 사용자 이름 또는 토큰 비밀에 대한 참조입니다.
토큰 비밀의 암호 참조 SASL 인증에 사용되는 암호 또는 토큰 비밀에 대한 참조입니다.

지원되는 SASL 형식은 다음과 같습니다.

  • Plain
  • ScramSha256
  • ScramSha512

비밀은 Kafka 데이터 흐름 엔드포인트와 동일한 네임스페이스에 있어야 합니다. 비밀에는 키-값 쌍으로 SASL 토큰이 있어야 합니다. 예시:

시스템 할당 관리 ID

인증에 시스템 할당 관리 ID를 사용하려면 Event Hubs에서 메시지를 보내고 받을 수 있는 권한을 부여하는 Azure IoT Operation 관리 ID에 역할을 할당합니다.

  1. Azure Portal에서 Azure IoT Operations 인스턴스로 이동하여 개요를 선택합니다.
  2. Azure IoT Operations Arc 확장 후에 나열된 확장의 이름을 복사합니다. 예를 들어 azure-iot-operations-xxxx7입니다.
  3. 확장 이름을 사용하여 Azure Portal에서 관리 ID를 검색합니다. 예를 들어 azure-iot-operations-xxxx7을 검색합니다.
  4. Azure Event Hubs 데이터 소유자, Azure Event Hubs 데이터 발신자 또는 Azure Event Hubs 데이터 수신기와 같은 메시지를 보내고 받을 수 있는 권한을 부여하는 Azure IoT Operations Arc 확장 관리 ID에 역할을 할당합니다. 자세한 내용은 Event Hubs 리소스에 액세스하려면 Microsoft Entra ID로 애플리케이션 인증을 참조하세요.
  5. Kafka 설정에서 관리 ID 인증 방법을 지정합니다. 대부분의 경우 다른 설정을 지정할 필요가 없습니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 기본 탭을 선택한 다음 인증 방법>시스템 할당 관리 ID를 선택합니다.

이 구성은 기본 대상 그룹을 사용하여 관리 ID를 만듭니다. 이 ID는 Event Hubs 네임스페이스 호스트 값과 같은 형식입니다 https://<NAMESPACE>.servicebus.windows.net. 그러나 기본 대상 그룹을 재정의해야 하는 경우 필드를 원하는 값으로 설정할 audience 수 있습니다.

작업 환경에서 지원되지 않습니다.

사용자 할당 관리 ID

인증에 사용자 관리 ID를 사용하려면 먼저 보안 설정을 사용하도록 설정된 Azure IoT Operations를 배포해야 합니다. 자세한 내용은 Azure IoT Operations 미리 보기 배포에서 보안 설정 사용을 참조하세요.

그런 다음 Kafka 설정에서 관리 ID의 클라이언트 ID 및 테넌트 ID와 함께 사용자 할당 관리 ID 인증 방법을 지정합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 기본 탭을 선택한 다음, 인증 방법>사용자 할당 관리 ID를 선택합니다.

여기서 범위는 관리 ID의 대상 그룹입니다. 기본값은 Event Hubs 네임스페이스 호스트 값 https://<NAMESPACE>.servicebus.windows.net과 같습니다. 그러나 기본 대상 그룹을 재정의해야 하는 경우 Bicep 또는 Kubernetes를 사용하여 범위 필드를 원하는 값으로 설정할 수 있습니다.

익명

익명 인증을 사용하려면 익명 메서드를 사용하도록 Kafka 설정의 인증 섹션을 업데이트합니다.

작업 환경에서 아직 지원되지 않습니다. 알려진 문제를 참조하세요.

고급 설정

TLS, 신뢰할 수 있는 CA 인증서, Kafka 메시징 설정, 일괄 처리 및 CloudEvents와 같은 Kafka 데이터 흐름 엔드포인트에 대한 고급 설정을 설정할 수 있습니다. 데이터 흐름 엔드포인트 고급 포털 탭 또는 데이터 흐름 엔드포인트 리소스 내에서 이러한 설정을 설정할 수 있습니다.

작업 환경에서 데이터 흐름 엔드포인트에 대한 고급 탭을 선택합니다.

작업 환경을 사용하여 Kafka 데이터 흐름 엔드포인트 고급 설정을 설정하는 스크린샷

TLS 설정

TLS 모드

Kafka 엔드포인트에 대해 TLS를 사용하거나 사용하지 않도록 설정하려면 TLS 설정에서 설정을 업데이트 mode 합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 TLS 모드 사용 옆의 확인란을 사용합니다.

TLS 모드를 설정하거나 Disabled.로 Enabled 설정할 수 있습니다. 모드가 설정된 Enabled경우 데이터 흐름은 Kafka broker에 대한 보안 연결을 사용합니다. 모드가 설정된 Disabled경우 데이터 흐름은 Kafka broker에 대한 안전하지 않은 연결을 사용합니다.

신뢰할 수 있는 CA 인증서

Kafka 엔드포인트에 대해 신뢰할 수 있는 CA 인증서를 구성하여 Kafka broker에 대한 보안 연결을 설정합니다. Kafka broker가 자체 서명된 인증서 또는 기본적으로 신뢰할 수 없는 사용자 지정 CA에서 서명한 인증서를 사용하는 경우 이 설정이 중요합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 신뢰할 수 있는 CA 인증서 구성 맵 필드를 사용하여 신뢰할 수 있는 CA 인증서가 포함된 ConfigMap을 지정합니다.

이 ConfigMap에는 PEM 형식의 CA 인증서가 포함되어야 합니다. ConfigMap은 Kafka 데이터 흐름 리소스와 동일한 네임스페이스에 있어야 합니다. 예시:

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

Azure Event Hubs에 연결할 때 Event Hubs 서비스는 기본적으로 신뢰할 수 있는 공용 CA에서 서명한 인증서를 사용하므로 CA 인증서가 필요하지 않습니다.

소비자 그룹 ID

소비자 그룹 ID는 데이터 흐름이 Kafka 토픽에서 메시지를 읽는 데 사용하는 소비자 그룹을 식별하는 데 사용됩니다. 소비자 그룹 ID는 Kafka broker 내에서 고유해야 합니다.

Important

Kafka 엔드포인트를 원본으로 사용하는 경우 소비자 그룹 ID가 필요합니다. 그렇지 않으면 데이터 흐름이 Kafka 토픽에서 메시지를 읽을 수 없으며 "Kafka 형식 원본 엔드포인트에 consumerGroupId가 정의되어 있어야 합니다." 오류가 발생합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음, 소비자 그룹 ID 필드를 사용하여 소비자 그룹 ID를 지정합니다.

이 설정은 엔드포인트가 원본으로 사용되는 경우에만 적용됩니다(즉, 데이터 흐름이 소비자임).

압축

압축 필드를 사용하면 Kafka 토픽으로 전송된 메시지에 대한 압축을 사용할 수 있습니다. 압축은 데이터 전송에 필요한 네트워크 대역폭 및 스토리지 공간을 줄이는 데 도움이 됩니다. 그러나 압축은 프로세스에 약간의 오버헤드와 대기 시간도 추가합니다. 지원되는 압축 형식은 다음 표에 열거되어 있습니다.

설명
None 압축 또는 일괄 처리가 적용되지 않습니다. 압축을 지정하지 않으면 기본값이 없습니다.
Gzip GZIP 압축 및 일괄 처리가 적용됩니다. GZIP은 압축 비율과 속도 간에 적절한 균형을 제공하는 범용 압축 알고리즘입니다. GZIP 압축만 현재 Azure Event Hubs 프리미엄 및 전용 계층에서 지원됩니다 .
Snappy Snappy 압축 및 일괄 처리가 적용됩니다. Snappy는 적당한 압축 비율과 속도를 제공하는 빠른 압축 알고리즘입니다. 이 압축 모드는 Azure Event Hubs에서 지원되지 않습니다.
Lz4 LZ4 압축 및 일괄 처리가 적용됩니다. LZ4는 낮은 압축 비율과 고속을 제공하는 빠른 압축 알고리즘입니다. 이 압축 모드는 Azure Event Hubs에서 지원되지 않습니다.

압축을 구성하려면 다음을 수행합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 압축 필드를 사용하여 압축 유형을 지정합니다.

이 설정은 엔드포인트가 데이터 흐름이 생산자인 대상으로 사용되는 경우에만 적용됩니다.

일괄 처리

압축 외에도 메시지를 Kafka 토픽으로 보내기 전에 메시지에 대한 일괄 처리를 구성할 수도 있습니다. 일괄 처리를 사용하면 여러 메시지를 함께 그룹화하고 단일 단위로 압축하여 압축 효율성을 높이고 네트워크 오버헤드를 줄일 수 있습니다.

필드 설명 필수
mode Enabled 또는 Disabled일 수 있습니다. 기본값은 Enabled Kafka에 일치하지 않는 메시징에 대한 개념 없기 때문입니다. 로 Disabled설정하면 일괄 처리가 최소화되어 매번 단일 메시지로 일괄 처리를 만듭니다. 아니요
latencyMs 메시지를 보내기 전에 버퍼링할 수 있는 최대 시간 간격(밀리초)입니다. 이 간격에 도달하면 버퍼링된 모든 메시지는 개수나 크기에 상관 없이, 일괄 처리로 전송됩니다. 설정하지 않으면 기본값은 5입니다. 아니요
maxMessages 메시지를 보내기 전에 버퍼링할 수 있는 최대 메시지 수입니다. 이 수에 도달하면 버퍼링된 모든 메시지는 버퍼링된 메시지의 양 또는 기간에 관계없이 일괄 처리로 전송됩니다. 설정하지 않으면 기본값은 100000입니다. 아니요
maxBytes 전송되기 전에 버퍼링할 수 있는 최대 크기(바이트)입니다. 이 크기에 도달하면 버퍼링된 메시지 수 또는 버퍼링 기간에 관계없이 모든 버퍼링된 메시지가 일괄 처리로 전송됩니다. 기본값은 1000000(1MB)입니다. 아니요

예를 들어 대기 시간M을 1000으로 설정하고 maxMessages를 100으로 설정하고 maxBytes를 1024로 설정하면 버퍼에 메시지가 100개 있거나 버퍼에 1,024바이트가 있는 경우 또는 마지막 전송 이후 1,000밀리초가 경과하면 메시지가 전송됩니다.

일괄 처리를 구성하려면 다음을 수행합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 Batching 사용 필드를 사용하여 일괄 처리를 사용하도록 설정합니다. 일괄 처리 대기 시간, 최대 바이트메시지 수 필드를 사용하여 일괄 처리 설정을 지정합니다.

이 설정은 엔드포인트가 데이터 흐름이 생산자인 대상으로 사용되는 경우에만 적용됩니다.

파티션 처리 전략

파티션 처리 전략은 Kafka 토픽으로 메시지를 보낼 때 Kafka 파티션에 메시지를 할당하는 방법을 제어합니다. Kafka 파티션은 병렬 처리 및 내결함성을 가능하게 하는 Kafka 토픽의 논리적 세그먼트입니다. Kafka 토픽의 각 메시지에는 메시지를 식별하고 정렬하는 데 사용되는 파티션과 오프셋이 있습니다.

이 설정은 엔드포인트가 데이터 흐름이 생산자인 대상으로 사용되는 경우에만 적용됩니다.

기본적으로 데이터 흐름은 라운드 로빈 알고리즘을 사용하여 임의 파티션에 메시지를 할당합니다. 그러나 MQTT 토픽 이름 또는 MQTT 메시지 속성과 같은 몇 가지 기준에 따라 다른 전략을 사용하여 파티션에 메시지를 할당할 수 있습니다. 이렇게 하면 부하 분산, 데이터 위치 또는 메시지 순서 지정을 향상할 수 있습니다.

설명
Default 라운드 로빈 알고리즘을 사용하여 임의 파티션에 메시지를 할당합니다. 전략이 지정되지 않은 경우 기본값입니다.
Static 데이터 흐름의 인스턴스 ID에서 파생된 고정 파티션 번호에 메시지를 할당합니다. 즉, 각 데이터 흐름 인스턴스는 메시지를 다른 파티션으로 보냅니다. 이것은 부하 분산 및 데이터 위치 향상에 도움이 될 수 있습니다.
Topic 데이터 흐름 원본의 MQTT 토픽 이름을 분할 키로 사용합니다. 즉, 동일한 MQTT 토픽 이름을 가진 메시지가 동일한 파티션으로 전송됩니다. 이것은 메시지 정렬 및 데이터 위치 향상에 도움이 될 수 있습니다.
Property 데이터 흐름 원본의 MQTT 메시지 속성을 분할 키로 사용합니다. partitionKeyProperty 필드에 속성의 이름을 지정합니다. 즉, 속성 값이 같은 메시지가 동일한 파티션으로 전송됩니다. 이것은 사용자 지정 기준에 따라 메시지 정렬 및 데이터 위치 향상에 도움이 될 수 있습니다.

예를 들어 파티션 처리 전략을 Property 설정하고 파티션 키 속성을 설정 device-id하면 동일한 device-id 속성이 있는 메시지가 동일한 파티션으로 전송됩니다.

파티션 처리 전략을 구성하려면 다음을 수행합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 파티션 처리 전략 필드를 사용하여 파티션 처리 전략을 지정합니다. 파티션 키 속성 필드를 사용하여 전략이 설정된 경우 분할에 사용되는 속성을 지정합니다Property.

Kafka 승인

Kafka 승인(acks)은 Kafka 토픽으로 전송되는 메시지의 내구성 및 일관성을 제어하는 데 사용됩니다. 생산자가 Kafka 토픽에 메시지를 보내면 Kafka broker에서 다양한 수준의 승인을 요청하여 메시지가 토픽에 성공적으로 기록되고 Kafka 클러스터에 복제되도록 할 수 있습니다.

이 설정은 엔드포인트가 대상으로 사용되는 경우에만 적용됩니다(즉, 데이터 흐름이 생산자임).

설명
None 데이터 흐름은 Kafka broker의 승인을 기다리지 않습니다. 이 설정은 가장 빠르지만 내구성이 가장 낮은 옵션입니다.
All 데이터 흐름은 메시지가 리더 파티션 및 모든 팔로워 파티션에 기록될 때까지 기다립니다. 이 설정은 가장 느리지만 가장 내구성이 뛰어난 옵션입니다. 이 설정도 기본 옵션입니다.
One 데이터 흐름은 메시지가 리더 파티션 및 하나 이상의 팔로워 파티션에 기록될 때까지 기다립니다.
Zero 데이터 흐름은 메시지가 리더 파티션에 기록될 때까지 기다리지만 팔로워의 승인을 기다리지 않습니다. 이는 내구성보다 One 빠르지만 내구성이 떨어집니다.

예를 들어 Kafka 승인을 All설정하는 경우 데이터 흐름은 다음 메시지를 보내기 전에 메시지가 리더 파티션 및 모든 팔로워 파티션에 기록될 때까지 기다립니다.

Kafka 승인을 구성하려면 다음을 수행합니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 Kafka 승인 필드를 사용하여 Kafka 승인 수준을 지정합니다.

이 설정은 엔드포인트가 데이터 흐름이 생산자인 대상으로 사용되는 경우에만 적용됩니다.

MQTT 속성 복사

기본적으로 복사 MQTT 속성 설정이 사용됩니다. 이러한 사용자 속성에는 메시지를 보내는 자산의 이름을 저장하는 subject 같은 값이 포함됩니다.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 MQTT 속성 복사 필드 옆의 확인란을 사용하여 MQTT 속성 복사를 사용하거나 사용하지 않도록 설정합니다.

다음 섹션에서는 설정을 사용할 때 MQTT 속성을 Kafka 사용자 헤더로 변환하는 방법과 그 반대로 변환하는 방법을 설명합니다.

Kafka 엔드포인트는 대상입니다.

Kafka 엔드포인트가 데이터 흐름 대상인 경우 모든 MQTT v5 사양 정의 속성은 Kafka 사용자 헤더로 변환됩니다. 예를 들어 Kafka에 전달되는 "콘텐츠 형식"이 있는 MQTT v5 메시지는 Kafka 사용자 헤더"Content Type":{specifiedValue}로 변환됩니다. 다음 표에 정의된 다른 기본 제공 MQTT 속성에도 유사한 규칙이 적용됩니다.

MQTT 속성 번역된 동작
페이로드 형식 표시기 키: "페이로드 형식 표시기"
값: "0"(페이로드는 바이트) 또는 "1"(페이로드는 UTF-8)
응답 항목 키: "응답 항목"
값: 원본 메시지에서 응답 토픽의 복사본입니다.
메시지 만료 간격 키: "메시지 만료 간격"
값: 메시지가 만료되기 전의 시간(초)의 UTF-8 표현입니다. 자세한 내용은 메시지 만료 간격 속성을 참조하세요.
상관 관계 데이터: 키: "상관 관계 데이터"
값: 원본 메시지의 상관 관계 데이터 복사입니다. UTF-8로 인코딩된 많은 MQTT v5 속성과 달리 상관 관계 데이터는 임의의 데이터일 수 있습니다.
콘텐츠 형식: 키: "콘텐츠 형식"
값: 원본 메시지에서 콘텐츠 형식의 복사본입니다.

MQTT v5 사용자 속성 키 값 쌍은 Kafka 사용자 헤더로 직접 변환됩니다. 메시지의 사용자 헤더 이름이 기본 제공 MQTT 속성과 동일한 경우(예: "상관 관계 데이터"라는 사용자 헤더) MQTT v5 사양 속성 값 전달 또는 사용자 속성이 정의되지 않았는지 여부입니다.

데이터 흐름은 MQTT Broker에서 이러한 속성을 수신하지 않습니다. 따라서 데이터 흐름은 전달되지 않습니다.

  • 토픽 별칭
  • 구독 식별자
메시지 만료 간격 속성

메시지 만료 간격메시지가 삭제되기 전에 MQTT Broker에 남아 있을 수 있는 기간을 지정합니다.

데이터 흐름이 메시지 만료 간격이 지정된 MQTT 메시지를 받으면 다음을 수행합니다.

  • 메시지를 받은 시간을 기록합니다.
  • 메시지를 대상으로 내보내기 전에 메시지에서 시간을 빼고 원래 만료 간격 시간에서 큐에 대기했습니다.
  • 메시지가 만료되지 않은 경우(위의 > 작업은 0) 메시지가 대상으로 내보내지고 업데이트된 메시지 만료 시간이 포함됩니다.
  • 메시지가 만료된 경우(위의 <작업은 = 0) 대상이 메시지를 내보내지 않습니다.

예:

  • 데이터 흐름은 메시지 만료 간격 = 3600초의 MQTT 메시지를 받습니다. 해당 대상의 연결이 일시적으로 끊어졌지만 다시 연결할 수 있습니다. 이 MQTT 메시지가 대상으로 전송되기까지 1,000초가 지났습니다. 이 경우 대상의 메시지에 메시지 만료 간격이 2600초(3600-1000)로 설정됩니다.
  • 데이터 흐름은 메시지 만료 간격 = 3600초가 포함된 MQTT 메시지를 받습니다. 해당 대상의 연결이 일시적으로 끊어졌지만 다시 연결할 수 있습니다. 그러나 이 경우 다시 연결하는 데 4,000초가 걸립니다. 메시지가 만료되고 데이터 흐름이 이 메시지를 대상으로 전달하지 않습니다.

Kafka 엔드포인트는 데이터 흐름 원본입니다.

참고 항목

Kafka 헤더가 MQTT로 변환될 때 손상되는 데이터 흐름 원본으로 Event Hubs 엔드포인트를 사용하는 경우 알려진 문제가 있습니다. 이는 이벤트 허브를 사용하는 경우에만 발생하지만, 이벤트 허브 클라이언트는 아래의 AMQP를 사용합니다. 예를 들어 "foo"="bar"의 경우 "foo"가 변환되지만 값은 "\xa1\x03bar"가 됩니다.

Kafka 엔드포인트가 데이터 흐름 원본인 경우 Kafka 사용자 헤더는 MQTT v5 속성으로 변환됩니다. 다음 표에서는 Kafka 사용자 헤더를 MQTT v5 속성으로 변환하는 방법을 설명합니다.

Kafka 헤더 번역된 동작
키: "키"
값: 원본 메시지에서 키의 복사본입니다.
Timestamp 키: "타임스탬프"
값: Kafka 타임스탬프의 UTF-8 인코딩(Unix epoch 이후 밀리초)입니다.

Kafka 사용자 헤더 키/값 쌍은 모두 UTF-8로 인코딩된 경우 MQTT 사용자 키/값 속성으로 직접 변환됩니다.

UTF-8/ 이진 불일치

MQTT v5는 UTF-8 기반 속성만 지원할 수 있습니다. 데이터 흐름이 하나 이상의 UTF-8이 아닌 헤더를 포함하는 Kafka 메시지를 수신하는 경우 데이터 흐름은 다음과 같습니다.

  • 잘못된 속성 또는 속성을 제거합니다.
  • 이전 규칙에 따라 메시지의 나머지 부분을 전달합니다.

Kafka 원본 헤더에서 이진 전송이 필요한 애플리케이션 => MQTT 대상 속성은 먼저 UTF-8로 인코딩해야 합니다(예: Base64를 통해).

>=64KB 속성 불일치

MQTT v5 속성은 64KB보다 작아야 합니다. 데이터 흐름이 = 64KB인 하나 이상의 헤더 >를 포함하는 Kafka 메시지를 수신하는 경우 데이터 흐름은 다음과 같습니다.

  • 잘못된 속성 또는 속성을 제거합니다.
  • 이전 규칙에 따라 메시지의 나머지 부분을 전달합니다.
AMQP를 사용하는 Event Hubs 및 생산자를 사용하는 경우 속성 변환

클라이언트 전달 메시지가 있는 경우 Kafka 데이터 흐름 원본 엔드포인트에서 다음 작업을 수행합니다.

  • Azure.Messaging.EventHubs와 같은 클라이언트 라이브러리를 사용하여 Event Hubs에 메시지 보내기
  • AMQP 직접 사용

알아야 할 속성 번역 뉘앙스가 있습니다.

다음 중 하나를 수행해야 합니다.

  • 속성 전송 방지
  • 속성을 보내야 하는 경우 UTF-8로 인코딩된 값을 보냅니다.

Event Hubs가 AMQP에서 Kafka로 속성을 변환하는 경우 메시지에 기본 AMQP 인코딩 형식이 포함됩니다. 동작에 대한 자세한 내용은 다른 프로토콜을 사용하여 소비자와 생산자 간에 이벤트 교환을 참조 하세요.

다음 코드 예제에서는 데이터 흐름 엔드포인트가 값을 "foo":"bar"받으면 속성을 다음과 같이 <0xA1 0x03 "bar">받습니다.

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

데이터가 UTF-8이 아니므로 데이터 흐름 엔드포인트는 MQTT 메시지에 페이로드 속성을 <0xA1 0x03 "bar"> 전달할 수 없습니다. 그러나 UTF-8 문자열을 지정하는 경우 데이터 흐름 엔드포인트는 MQTT로 보내기 전에 문자열을 변환합니다. UTF-8 문자열을 사용하는 경우 MQTT 메시지에는 사용자 속성이 있습니다 "foo":"bar" .

UTF-8 헤더만 변환됩니다. 예를 들어 속성이 float로 설정된 다음 시나리오를 고려하면 다음과 같습니다.

Properties = 
{
  {"float-value", 11.9 },
}

데이터 흐름 엔드포인트는 필드가 포함된 "float-value" 패킷을 삭제합니다.

propertyEventData.correlationId를 포함한 모든 이벤트 데이터 속성이 전달되지는 않습니다. 자세한 내용은 이벤트 사용자 속성을 참조하세요.

CloudEvents

CloudEvents 는 이벤트 데이터를 일반적인 방식으로 설명하는 방법입니다. CloudEvents 설정은 CloudEvents 형식으로 메시지를 보내거나 받는 데 사용됩니다. 서로 다른 서비스가 동일하거나 다른 클라우드 공급자에서 서로 통신해야 하는 이벤트 기반 아키텍처에 CloudEvents를 사용할 수 있습니다.

CloudEventAttributes 옵션은 다음과CreateOrRemap 같습니다Propagate.

작업 환경 데이터 흐름 엔드포인트 설정 페이지에서 고급 탭을 선택한 다음 클라우드 이벤트 특성 필드를 사용하여 CloudEvents 설정을 지정합니다.

다음 섹션에서는 CloudEvent 속성이 전파되거나 생성되고 다시 매핑되는 방법을 설명합니다.

전파 설정

CloudEvent 속성은 필수 속성을 포함하는 메시지에 대해 전달됩니다. 메시지에 필수 속성이 포함되어 있지 않으면 메시지가 있는 그대로 전달됩니다. 필수 속성이 있으면 ce_ CloudEvent 속성 이름에 접두사를 추가합니다.

속성 Required 샘플 값 출력 이름 출력 값
specversion 1.0 ce-specversion 있는 그대로 전달됨
type ms.aio.telemetry ce-type 있는 그대로 전달됨
source aio://mycluster/myoven ce-source 있는 그대로 전달됨
id A234-1234-1234 ce-id 있는 그대로 전달됨
subject 아니요 aio/myoven/telemetry/temperature ce-subject 있는 그대로 전달됨
time 아니요 2018-04-05T17:31:00Z ce-time 있는 그대로 전달됩니다. 그것은 휴식되지 않습니다.
datacontenttype 아니요 application/json ce-datacontenttype 선택적 변환 단계 이후 출력 데이터 콘텐츠 형식으로 변경되었습니다.
dataschema 아니요 sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema 변환 구성 dataschema 에서 출력 데이터 변환 스키마가 지정된 경우 출력 스키마로 변경됩니다.

CreateOrRemap 설정

CloudEvent 속성은 필수 속성을 포함하는 메시지에 대해 전달됩니다. 메시지에 필수 속성이 포함되어 있지 않으면 속성이 생성됩니다.

속성 Required 출력 이름 누락된 경우 생성된 값
specversion ce-specversion 1.0
type ce-type ms.aio-dataflow.telemetry
source ce-source aio://<target-name>
id ce-id 대상 클라이언트에서 생성된 UUID
subject 아니요 ce-subject 메시지가 전송되는 출력 항목
time 아니요 ce-time 대상 클라이언트에서 RFC 3339로 생성됨
datacontenttype 아니요 ce-datacontenttype 선택적 변환 단계 이후 출력 데이터 콘텐츠 형식으로 변경됨
dataschema 아니요 ce-dataschema 스키마 레지스트리에 정의된 스키마

다음 단계

데이터 흐름에 대한 자세한 내용은 데이터 흐름 만들기를 참조 하세요.