Aracılığıyla paylaş


Azure Event Hubs

Azure Event Hubs , milyonlarca olayı toplayan, dönüştüren ve depolayan hiper ölçekli bir telemetri alma hizmetidir. Dağıtılan bir akış platformu olarak sunduğu düşük gecikme süresi ve yapılandırılabilir saklama süresi ile Azure Event Hubs, çok yüksek miktarda telemetriyi buluta girmenize ve yayımla-abone ol semantik özelliklerini kullanarak birden çok uygulamadan veri okumanıza olanak tanır.

Bu makalede Azure Event Hubs ve Azure Databricks kümeleriyle Yapılandırılmış Akış'ın nasıl kullanılacağı açıklanmaktadır.

Not

Azure Event Hubs, Azure Event Hubs'dan gelen iletileri işlemek için Databricks Runtime'da bulunan Yapılandırılmış Akış Kafka bağlayıcısı ile kullanabileceğiniz Apache Kafka ile uyumlu bir uç nokta sağlar. Databricks, Azure Event Hubs'dan gelen iletileri işlemek için Yapılandırılmış Akış Kafka bağlayıcısının kullanılmasını önerir.

Gereksinimler

Geçerli sürüm desteği için Bkz. Azure Event Hubs Spark Bağlayıcısı proje benioku dosyasındaki "En Son Sürümler".

  1. Maven koordinatını com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17kullanarak Azure Databricks çalışma alanınızda bir kitaplık oluşturun.

    Not

    Bu bağlayıcı düzenli olarak güncelleştirilir ve daha yeni bir sürüm kullanılabilir: Maven deposundan en son bağlayıcıyı çekmenizi öneririz

  2. Oluşturulan kitaplığı kümenize yükleyin.

Şema

Kayıtların şeması:

Sütun Tür
body ikili
partition Dize
offset Dize
sequenceNumber uzun
enqueuedTime timestamp
publisher string
partitionKey Dize
properties map[string,json]

body her zaman bayt dizisi olarak sağlanır. Sütunu açıkça seri durumdan body çıkarmak için kullanıncast("string").

Yapılandırma

Bu bölümde Event Hubs ile çalışmak için ihtiyacınız olan yapılandırma ayarları açıklanmıştır.

Azure Event Hubs ile Yapılandırılmış Akış yapılandırma hakkında ayrıntılı yönergeler için bkz . Microsoft tarafından geliştirilen Yapılandırılmış Akış ve Azure Event Hubs Tümleştirme Kılavuzu .

Yapılandırılmış Akış kullanma hakkında ayrıntılı yönergeler için bkz . Azure Databricks'te akış.

Connection string

Event Hubs hizmetine bağlanmak için bir Event Hubs bağlantı dizesi gerekir. Event Hubs örneğiniz için bağlantı dizesi Azure portalından veya kitaplığında kullanarak ConnectionStringBuilder alabilirsiniz.

Azure portal

Azure portalından bağlantı dizesi aldığınızda anahtara sahip EntityPath olabilir veya olmayabilir. Aşağıdakileri dikkate alın:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

EventHubs'ınıza bağlanmak için mevcut olması EntityPath gerekir. bağlantı dizesi yoksa endişelenmeyin. Bu işlem bununla ilgilenir:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Alternatif olarak, öğesini kullanarak ConnectionStringBuilder bağlantı dizesi yapabilirsiniz.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Event Hubs ile ilgili tüm yapılandırmalar sizin EventHubsConfiçinde gerçekleşir. oluşturmak için bir EventHubsConfbağlantı dizesi geçirmeniz gerekir:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Geçerli bir bağlantı dizesi alma hakkında daha fazla bilgi için bkz. Bağlantı Dizesi.

Yapılandırmaların tam listesi için bkz . EventHubsConf. Başlamanıza yönelik yapılandırmaların bir alt kümesi aşağıdadır:

Seçenek Değer Varsayılan Sorgu türü Açıklama
consumerGroup String "$Default" Akış ve toplu iş Tüketici grubu, olay hub'ının tamamının görünümüdür. Tüketici grupları birden çok tüketen uygulamayı her biri olay akışının ayrı bir görünümüne sahip olacak ve akışı kendi hızlarında ve kendi sapmalarıyla bağımsız bir şekilde okuyacak şekilde etkinleştirir. Microsoft belgelerinde daha fazla bilgi bulabilirsiniz.
startingPosition EventPosition Akışın başlangıcı Akış ve toplu iş Yapılandırılmış Akış işinizin başlangıç konumu. Seçeneklerin okunma sırası hakkında bilgi için bkz . startingPositions .
maxEventsPerTrigger uzun partitionCount

- 1000
Akış sorgusu Tetikleyici aralığı başına işlenen en fazla olay sayısı için hız sınırı. Belirtilen toplam olay sayısı orantılı olarak farklı birim bölümlerine bölünür.

Her seçenek için içinde buna karşılık gelen bir ayar EventHubsConfvardır. Örneğin:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf kullanıcıların sınıfla başlangıç (ve bitiş) konumlarını belirtmesine EventPosition olanak tanır. EventPosition olay Hub'ı bölümündeki bir olayın konumunu tanımlar. Konum sıralanmış bir zaman, uzaklık, sıra numarası, akışın başlangıcı veya akışın sonu olabilir.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Belirli bir konumda başlatmak (veya sonlandırmak) istiyorsanız, doğru EventPosition öğeyi oluşturup içinde ayarlamanız yeterlidir EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)