Azure-eseményközpontok
Az Azure Event Hubs egy hiperméretű telemetriai betöltési szolgáltatás, amely események millióit gyűjti, alakítja át és tárolja. Elosztott streaming-platformként közel valós idejű adatelérést és konfigurálható idejű adatmegőrzést biztosít, lehetővé téve nagy mennyiségű telemetriai adat feltöltését a felhőbe, és azok beolvasását számos alkalmazásból közzétételi és előfizetési szemantika használatával.
Ez a cikk bemutatja, hogyan használható a strukturált streamelés az Azure Event Hubs és az Azure Databricks-fürtök használatával.
Feljegyzés
Az Azure Event Hubs az Apache Kafkával kompatibilis végpontot biztosít, amelyet a Databricks Runtime-ban elérhető strukturált streamelési Kafka-összekötővel használhat az Azure Event Hubs üzeneteinek feldolgozásához. A Databricks a strukturált streamelési Kafka-összekötő használatát javasolja az Azure Event Hubsból érkező üzenetek feldolgozásához.
Követelmények
Az aktuális kiadási támogatásért tekintse meg az Azure Event Hubs Spark Csatlakozás or projekt olvasási fájljának legújabb kiadásait.
Hozzon létre egy kódtárat az Azure Databricks-munkaterületen a Maven koordináta
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
használatával.Feljegyzés
Ez az összekötő rendszeresen frissül, és egy újabb verzió is elérhető lehet: javasoljuk, hogy kérje le a legújabb összekötőt a Maven-adattárból
Telepítse a létrehozott kódtárat a fürtbe.
Séma
A rekordok sémája a következő:
Oszlop | Típus |
---|---|
body |
Bináris |
partition |
húr |
offset |
húr |
sequenceNumber |
hosszú |
enqueuedTime |
időbélyeg |
publisher |
húr |
partitionKey |
húr |
properties |
map[sztring;json] |
A body
mindig bájttömbként van megadva. Az oszlop explicit deszerializálására body
használhatócast("string")
.
Konfiguráció
Ez a szakasz az Event Hubs használatához szükséges konfigurációs beállításokat ismerteti.
A strukturált streamelés Azure Event Hubskal való konfigurálásával kapcsolatos részletes útmutatásért tekintse meg a Microsoft által kifejlesztett strukturált streamelési és Azure Event Hubs-integrációs útmutatót .
A strukturált streamelés használatával kapcsolatos részletes útmutatásért lásd : Streamelés az Azure Databricksben.
Kapcsolati sztring
Az Event Hubs szolgáltatáshoz való csatlakozáshoz egy Event Hubs-kapcsolati sztring szükséges. Az Event Hubs-példányhoz tartozó kapcsolati sztring az Azure Portalról vagy a ConnectionStringBuilder
tárból szerezheti be.
Azure Portal
Amikor lekéri a kapcsolati sztring az Azure Portalról, lehet, hogy nem rendelkezik a EntityPath
kulccsal. Megfontolandó szempontok:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
Az EventHubshoz EntityPath
való csatlakozáshoz jelen kell lennie. Ha a kapcsolati sztring nem rendelkezik ilyenel, ne aggódjon.
Ez gondoskodik róla:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
Csatlakozás ionStringBuilder
Másik lehetőségként használhatja a ConnectionStringBuilder
kapcsolati sztring.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
Az Event Hubshoz kapcsolódó összes konfiguráció az Ön EventHubsConf
számítógépén történik. Ha létre szeretne hozni egy EventHubsConf
kapcsolati sztring:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
Az érvényes kapcsolati sztring beszerzéséről további információt a Csatlakozás ion-sztringben talál.
A konfigurációk teljes listáját az EventHubsConfban találja. Íme a konfigurációk egy részhalmaza az első lépésekhez:
Lehetőség | Érték | Alapértelmezett | Lekérdezés típusa | Leírás |
---|---|---|---|---|
consumerGroup |
Sztring | "$Default" | Streamelés és köteg | A fogyasztói csoport egy teljes eseményközpont nézete. A felhasználói csoportok révén több felhasználó alkalmazás rendelkezhet az eseménystream külön nézetével, és a többitől függetlenül saját tempójában és saját eltolásával olvashatja a streamet. További információ a Microsoft dokumentációjában érhető el. |
startingPosition |
EventPosition | A stream kezdete | Streamelés és köteg | A strukturált streamelési feladat kezdőpozíciója. A beállítások olvasási sorrendjéről további információt a kezdőpozíciókban talál. |
maxEventsPerTrigger |
hosszú | partitionCount * 1000 |
Streamelési lekérdezés | Az eseményindító-intervallumonként feldolgozott események maximális számának sebességkorlátja. A megadott események teljes száma arányosan oszlik meg a különböző kötetek partíciói között. |
Minden beállításhoz létezik egy megfelelő beállítás a következőben EventHubsConf
: . Példa:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
lehetővé teszi, hogy a felhasználók kezdő (és záró) pozíciót adjanak meg az EventPosition
osztályhoz. EventPosition
Egy esemény pozícióját határozza meg egy Event Hub-partícióban. A pozíció lehet egy lekérdezett idő, eltolás, sorszám, a stream kezdete vagy a stream vége.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
Ha egy adott pozícióban szeretné elindítani (vagy befejezni), egyszerűen hozza létre a megfelelőt EventPosition
, és állítsa be a EventHubsConf
következő helyre:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: