Azure Event Hubs

Azure Event Hubs 는 수백만 개의 이벤트를 수집, 변환 및 저장하는 하이퍼 스케일 원격 분석 수집 서비스입니다. 분산형 스트리밍 플랫폼인 Azure Event Hubs는 대기 시간을 줄이고 구성 가능한 시간을 제공하므로 엄청난 양의 원격 측정을 클라우드에 입력할 수 있게 합니다. 또한 게시-구독 의미 체계를 사용하여 여러 애플리케이션에서 데이터를 읽을 수 있게 해줍니다.

이 문서에서는 Azure Event Hubs 및 Azure Databricks 클러스터에서 구조적 스트리밍을 사용하는 방법을 설명합니다.

참고 항목

Azure Event Hubs는 Databricks Runtime에서 Azure Event Hubs 메시지를 처리하는 데 사용할 수 있는 구조적 스트리밍 Kafka 커넥터와 함께 Apache Kafka와 호환되는 엔드포인트를 제공합니다. Databricks는 구조적 스트리밍 Kafka 커넥터를 사용하여 Azure Event Hubs에서 메시지를 처리하는 것이 좋습니다.

요구 사항

현재 릴리스 지원은 Azure Event Hubs Spark Connector 프로젝트 추가 정보 파일의 “최신 릴리스”를 참조하세요.

  1. Maven 좌표 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17을 사용하여 Azure Databricks 작업 영역에 라이브러리를 만듭니다.

    참고 항목

    이 커넥터는 정기적으로 업데이트되며, 최신 버전이 사용 가능할 수도 있습니다. Maven 리포지토리에서 최신 커넥터를 끌어오는 것이 좋습니다.

  2. 만든 라이브러리를 클러스터에 설치합니다.

스키마

레코드의 스키마는 다음과 같습니다.

Column Type
body binary
partition string
offset string
sequenceNumber long
enqueuedTime timestamp
publisher string
partitionKey string
properties map[string,json]

body는 항상 바이트 배열로 제공됩니다. body 열을 명시적으로 역직렬화하려면 cast("string")을 사용합니다.

빠른 시작

빠른 예제인 WordCount를 시작해 봅시다. 다음 Notebook만 있으면 Azure Event Hubs에서 구조적 스트리밍을 사용하여 WordCount를 실행할 수 있습니다.

구조적 스트리밍을 사용하는 Azure Event Hubs WordCount Notebook

전자 필기장 가져오기

구성

이 섹션에서는 Event Hubs를 사용하는 데 필요한 구성 설정에 대해 설명합니다.

Azure Event Hubs를 사용하여 구조적 스트리밍을 구성하는 방법에 대한 자세한 지침은 Microsoft에서 개발한 구조적 스트리밍 및 Azure Event Hubs 통합 가이드를 참조하세요.

구조적 스트리밍 사용에 대한 자세한 지침은 Azure Databricks의 스트리밍을 참조 하세요.

Connection string

Event Hubs 서비스에 연결하려면 Event Hubs 연결 문자열이 필요합니다. Azure Portal에서 또는 라이브러리의 ConnectionStringBuilder를 사용하여 Event Hubs 인스턴스의 연결 문자열을 가져올 수 있습니다.

Azure Portal

Azure Portal에서 연결 문자열을 가져올 때 EntityPath 키가 있을 수도 있고, 없을 수도 있습니다. 고려할 사항은 다음과 같습니다.

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

EventHubs에 연결하려면 EntityPath가 있어야 합니다. 연결 문자열에 EntityPath가 없더라도 걱정하지 마세요. 다음에서 처리합니다.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

또는 ConnectionStringBuilder를 사용하여 연결 문자열을 만들 수 있습니다.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Event Hubs와 관련된 모든 구성은 EventHubsConf에서 수행됩니다. EventHubsConf를 만들려면 연결 문자열을 전달해야 합니다.

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

유효한 연결 문자열을 가져오는 방법에 대한 자세한 내용은 연결 문자열을 참조하세요.

전체 구성 목록은 EventHubsConf를 참조하세요. 시작하는 데 필요한 구성의 하위 집합은 다음과 같습니다.

옵션 기본값 쿼리 유형 설명
consumerGroup 문자열 “$Default” 스트리밍 및 일괄 처리 소비자 그룹은 전체 이벤트 허브 뷰입니다. 소비자 그룹은 여러 소비 애플리케이션을 사용하여 이벤트 스트림의 별도 보기가 있으며 자신의 속도 및 자신의 오프셋으로 독립적으로 스트림을 읽을 수 있습니다. 자세한 내용은 Microsoft 설명서에서 확인할 수 있습니다.
startingPosition EventPosition 스트림 시작 스트리밍 및 일괄 처리 구조적 스트리밍 작업의 시작 위치입니다. 옵션을 읽는 순서에 대한 자세한 내용은 startingPositions를 참조하세요.
maxEventsPerTrigger long partitionCount

* 1000
스트리밍 쿼리 트리거 간격당 처리되는 최대 이벤트 수에 대한 속도 제한입니다. 지정한 총 이벤트 수가 다른 볼륨의 파티션에 비례 분할됩니다.

EventHubsConf에는 각 옵션에 해당하는 설정이 있습니다. 예시:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf를 사용하면 사용자가 EventPosition 클래스로 시작(및 종료) 위치를 지정할 수 있습니다. EventPosition은 이벤트 허브 파티션의 이벤트 위치를 정의합니다. 위치는 큐에 넣은 시간, 오프셋, 시퀀스 번호, 스트림 시작 또는 스트림 끝일 수 있습니다.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

특정 위치에서 시작(또는 종료)하려는 경우 올바른 EventPosition을 만들고 EventHubsConf에서 설정하기만 하면 됩니다.

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Azure Event Hubs를 사용하는 프로덕션 구조적 스트리밍

프로덕션 환경에서 스트리밍 쿼리를 실행하는 경우 단순히 클러스터에 Notebook을 연결하고 스트리밍 쿼리를 대화형으로 실행할 때보다 더 견고하고 가동 시간이 보장되기를 원할 것입니다. Azure Event Hubs 및 Azure Databricks를 사용하여 프로덕션 환경에서 구조적 스트리밍을 구성하고 실행하는 방법에 대한 데모를 보려면 다음 Notebook을 가져와서 실행합니다.

자세한 내용은 구조적 스트리밍에 대한 프로덕션 고려 사항을 참조하세요.

Azure Event Hubs를 사용하는 프로덕션 구조적 스트리밍 Notebook

전자 필기장 가져오기