Azure Event Hubs
Azure Event Hubs , milyonlarca olayı toplayan, dönüştüren ve depolayan hiper ölçekli bir telemetri alma hizmetidir. Dağıtılan bir akış platformu olarak sunduğu düşük gecikme süresi ve yapılandırılabilir saklama süresi ile Azure Event Hubs, çok yüksek miktarda telemetriyi buluta girmenize ve yayımla-abone ol semantik özelliklerini kullanarak birden çok uygulamadan veri okumanıza olanak tanır.
Bu makalede Azure Event Hubs ve Azure Databricks kümeleriyle Yapılandırılmış Akış'ın nasıl kullanılacağı açıklanmaktadır.
Not
Azure Event Hubs, Azure Event Hubs'dan gelen iletileri işlemek için Databricks Runtime'da bulunan Yapılandırılmış Akış Kafka bağlayıcısı ile kullanabileceğiniz Apache Kafka ile uyumlu bir uç nokta sağlar. Databricks, Azure Event Hubs'dan gelen iletileri işlemek için Yapılandırılmış Akış Kafka bağlayıcısının kullanılmasını önerir.
Gereksinimler
Geçerli sürüm desteği için Bkz. Azure Event Hubs Spark Bağlayıcısı proje benioku dosyasındaki "En Son Sürümler".
-
Not
Bu bağlayıcı düzenli olarak güncelleştirilir ve daha yeni bir sürüm kullanılabilir: Maven deposundan en son bağlayıcıyı çekmenizi öneririz
Oluşturulan kitaplığı kümenize yükleyin.
Şema
Kayıtların şeması:
Sütun | Tür |
---|---|
body |
ikili |
partition |
Dize |
offset |
Dize |
sequenceNumber |
uzun |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
Dize |
properties |
map[string,json] |
body
her zaman bayt dizisi olarak sağlanır. Sütunu açıkça seri durumdan body
çıkarmak için kullanıncast("string")
.
Yapılandırma
Bu bölümde Event Hubs ile çalışmak için ihtiyacınız olan yapılandırma ayarları açıklanmıştır.
Azure Event Hubs ile Yapılandırılmış Akış yapılandırma hakkında ayrıntılı yönergeler için bkz . Microsoft tarafından geliştirilen Yapılandırılmış Akış ve Azure Event Hubs Tümleştirme Kılavuzu .
Yapılandırılmış Akış kullanma hakkında ayrıntılı yönergeler için bkz . Azure Databricks'te akış.
Connection string
Event Hubs hizmetine bağlanmak için bir Event Hubs bağlantı dizesi gerekir. Event Hubs örneğiniz için bağlantı dizesi Azure portalından veya kitaplığında kullanarak ConnectionStringBuilder
alabilirsiniz.
Azure portal
Azure portalından bağlantı dizesi aldığınızda anahtara sahip EntityPath
olabilir veya olmayabilir. Aşağıdakileri dikkate alın:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
EventHubs'ınıza bağlanmak için mevcut olması EntityPath
gerekir. bağlantı dizesi yoksa endişelenmeyin.
Bu işlem bununla ilgilenir:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Alternatif olarak, öğesini kullanarak ConnectionStringBuilder
bağlantı dizesi yapabilirsiniz.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Event Hubs ile ilgili tüm yapılandırmalar sizin EventHubsConf
içinde gerçekleşir. oluşturmak için bir EventHubsConf
bağlantı dizesi geçirmeniz gerekir:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Geçerli bir bağlantı dizesi alma hakkında daha fazla bilgi için bkz. Bağlantı Dizesi.
Yapılandırmaların tam listesi için bkz . EventHubsConf. Başlamanıza yönelik yapılandırmaların bir alt kümesi aşağıdadır:
Seçenek | Değer | Varsayılan | Sorgu türü | Açıklama |
---|---|---|---|---|
consumerGroup |
String | "$Default" | Akış ve toplu iş | Tüketici grubu, olay hub'ının tamamının görünümüdür. Tüketici grupları birden çok tüketen uygulamayı her biri olay akışının ayrı bir görünümüne sahip olacak ve akışı kendi hızlarında ve kendi sapmalarıyla bağımsız bir şekilde okuyacak şekilde etkinleştirir. Microsoft belgelerinde daha fazla bilgi bulabilirsiniz. |
startingPosition |
EventPosition | Akışın başlangıcı | Akış ve toplu iş | Yapılandırılmış Akış işinizin başlangıç konumu. Seçeneklerin okunma sırası hakkında bilgi için bkz . startingPositions . |
maxEventsPerTrigger |
uzun | partitionCount - 1000 |
Akış sorgusu | Tetikleyici aralığı başına işlenen en fazla olay sayısı için hız sınırı. Belirtilen toplam olay sayısı orantılı olarak farklı birim bölümlerine bölünür. |
Her seçenek için içinde buna karşılık gelen bir ayar EventHubsConf
vardır. Örneğin:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
kullanıcıların sınıfla başlangıç (ve bitiş) konumlarını belirtmesine EventPosition
olanak tanır. EventPosition
olay Hub'ı bölümündeki bir olayın konumunu tanımlar. Konum sıralanmış bir zaman, uzaklık, sıra numarası, akışın başlangıcı veya akışın sonu olabilir.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Belirli bir konumda başlatmak (veya sonlandırmak) istiyorsanız, doğru EventPosition
öğeyi oluşturup içinde ayarlamanız yeterlidir EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)