Azure Event Hubs

Azure Event Hubs , milyonlarca olayı toplayan, dönüştüren ve depolayan hiper ölçekli bir telemetri alma hizmetidir. Dağıtılan bir akış platformu olarak sunduğu düşük gecikme süresi ve yapılandırılabilir saklama süresi ile Azure Event Hubs, çok yüksek miktarda telemetriyi buluta girmenize ve yayımla-abone ol semantik özelliklerini kullanarak birden çok uygulamadan veri okumanıza olanak tanır.

Bu makalede Azure Event Hubs ve Azure Databricks kümeleriyle Yapılandırılmış Akış'ın nasıl kullanılacağı açıklanmaktadır.

Dekont

Azure Event Hubs, Azure Event Hubs'dan gelen iletileri işlemek için Databricks Runtime'da bulunan Yapılandırılmış Akış Kafka bağlayıcısı ile kullanabileceğiniz Apache Kafka ile uyumlu bir uç nokta sağlar. Databricks, Azure Event Hubs'dan gelen iletileri işlemek için Yapılandırılmış Akış Kafka bağlayıcısının kullanılmasını önerir.

Gereksinimler

Geçerli sürüm desteği için Bkz. Azure Event Hubs Spark Bağlan veya proje benioku dosyasındaki "En Son Sürümler".

  1. Maven koordinatını com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17kullanarak Azure Databricks çalışma alanınızda bir kitaplık oluşturun.

    Dekont

    Bu bağlayıcı düzenli olarak güncelleştirilir ve daha yeni bir sürüm kullanılabilir: Maven deposundan en son bağlayıcıyı çekmenizi öneririz

  2. Oluşturulan kitaplığı kümenize yükleyin.

Şema

Kayıtların şeması:

Sütun Tür
body ikili
partition Dize
offset Dize
sequenceNumber uzun
enqueuedTime timestamp
publisher Dize
partitionKey Dize
properties map[string,json]

body her zaman bayt dizisi olarak sağlanır. Sütunu açıkça seri durumdan body çıkarmak için kullanıncast("string").

Hızlı Başlangıç

Hızlı bir örnekle başlayalım: WordCount. Aşağıdaki not defteri, Azure Event Hubs ile Yapılandırılmış Akış kullanarak WordCount'u çalıştırmak için gereken tek işlemdir.

Yapılandırılmış Akış not defteri ile Azure Event Hubs WordCount

Not defterini alma

Yapılandırma

Bu bölümde Event Hubs ile çalışmak için ihtiyacınız olan yapılandırma ayarları açıklanmıştır.

Azure Event Hubs ile Yapılandırılmış Akış yapılandırma hakkında ayrıntılı yönergeler için bkz . Microsoft tarafından geliştirilen Yapılandırılmış Akış ve Azure Event Hubs Tümleştirme Kılavuzu .

Yapılandırılmış Akış kullanma hakkında ayrıntılı yönergeler için bkz . Azure Databricks'te akış.

Connection string

Event Hubs hizmetine bağlanmak için bir Event Hubs bağlantı dizesi gerekir. Event Hubs örneğiniz için bağlantı dizesi Azure portalından veya kitaplığında kullanarak ConnectionStringBuilder alabilirsiniz.

Azure portalı

Azure portalından bağlantı dizesi aldığınızda anahtara sahip EntityPath olabilir veya olmayabilir. Aşağıdakileri dikkate alın:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

EventHubs'ınıza bağlanmak için mevcut olması EntityPath gerekir. bağlantı dizesi yoksa endişelenmeyin. Bu işlem bununla ilgilenir:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Bağlan ionStringBuilder

Alternatif olarak, öğesini kullanarak ConnectionStringBuilder bağlantı dizesi yapabilirsiniz.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Event Hubs ile ilgili tüm yapılandırmalar sizin EventHubsConfiçinde gerçekleşir. oluşturmak için bir EventHubsConfbağlantı dizesi geçirmeniz gerekir:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Geçerli bir bağlantı dizesi alma hakkında daha fazla bilgi için bkz. Bağlan ion Dizesi.

Yapılandırmaların tam listesi için bkz . EventHubsConf. Başlamanıza yönelik yapılandırmaların bir alt kümesi aşağıdadır:

Seçenek Değer Varsayılan Sorgu türü Açıklama
consumerGroup String "$Default" Akış ve toplu iş Tüketici grubu, olay hub'ının tamamının görünümüdür. Tüketici grupları birden çok tüketen uygulamayı her biri olay akışının ayrı bir görünümüne sahip olacak ve akışı kendi hızlarında ve kendi sapmalarıyla bağımsız bir şekilde okuyacak şekilde etkinleştirir. Microsoft belgelerinde daha fazla bilgi bulabilirsiniz.
startingPosition EventPosition Akışın başlangıcı Akış ve toplu iş Yapılandırılmış Akış işinizin başlangıç konumu. Seçeneklerin okunma sırası hakkında bilgi için bkz . startingPositions .
maxEventsPerTrigger uzun partitionCount

* 1000
Akış sorgusu Tetikleyici aralığı başına işlenen en fazla olay sayısı için hız sınırı. Belirtilen toplam olay sayısı orantılı olarak farklı birim bölümlerine bölünür.

Her seçenek için içinde buna karşılık gelen bir ayar EventHubsConfvardır. Örneğin:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf kullanıcıların sınıfla başlangıç (ve bitiş) konumlarını belirtmesine EventPosition olanak tanır. EventPosition olay Hub'ı bölümündeki bir olayın konumunu tanımlar. Konum sıralanmış bir zaman, uzaklık, sıra numarası, akışın başlangıcı veya akışın sonu olabilir.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Belirli bir konumda başlatmak (veya sonlandırmak) istiyorsanız, doğru EventPosition öğeyi oluşturup içinde ayarlamanız yeterlidir EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Azure Event Hubs ile Üretim Yapılandırılmış Akışı

Üretimde akış sorguları çalıştırdığınızda, büyük olasılıkla bir kümeye not defteri ekleyip akış sorgularınızı etkileşimli olarak çalıştırdığınızda elde edeceğinizden daha fazla sağlamlık ve çalışma süresi garantisi istersiniz. Azure Event Hubs ve Azure Databricks ile üretimde Yapılandırılmış Akış'ı yapılandırma ve çalıştırma hakkında bir tanıtım için aşağıdaki not defterini içeri aktarın ve çalıştırın.

Daha fazla bilgi için bkz . Yapılandırılmış Akış için üretimde dikkat edilmesi gerekenler.

Azure Event Hubs not defteriyle Üretim Yapılandırılmış Akışı

Not defterini alma