Azure Event Hubs

Az Azure Event Hubs egy hiperméretű telemetriai betöltési szolgáltatás, amely események millióit gyűjti, alakítja át és tárolja. Elosztott streaming-platformként közel valós idejű adatelérést és konfigurálható idejű adatmegőrzést biztosít, lehetővé téve nagy mennyiségű telemetriai adat feltöltését a felhőbe, és azok beolvasását számos alkalmazásból közzétételi és előfizetési szemantika használatával.

Ez a cikk bemutatja, hogyan használható a strukturált streamelés az Azure Event Hubs és az Azure Databricks-fürtök használatával.

Megjegyzés:

Az Azure Event Hubs az Apache Kafkával kompatibilis végpontot biztosít, amelyet a Databricks Runtime-ban elérhető strukturált streamelési Kafka-összekötővel használhat az Azure Event Hubs üzeneteinek feldolgozásához. A Databricks a strukturált streamelési Kafka-összekötő használatát javasolja az Azure Event Hubsból érkező üzenetek feldolgozásához.

Requirements

Az aktuális kiadási támogatásért tekintse meg az Azure Event Hubs Spark Csatlakozás or projekt olvasási fájljának legújabb kiadásait.

  1. Hozzon létre egy kódtárat az Azure Databricks-munkaterületen a Maven koordináta com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17használatával.

    Megjegyzés:

    Ez az összekötő rendszeresen frissül, és egy újabb verzió is elérhető lehet: javasoljuk, hogy kérje le a legújabb összekötőt a Maven-adattárból

  2. Telepítse a létrehozott kódtárat a fürtbe.

Schema

A rekordok sémája a következő:

Column Type
body Bináris
partition sztring
offset sztring
sequenceNumber hosszú
enqueuedTime timestamp
publisher sztring
partitionKey sztring
properties map[sztring;json]

A body mindig bájttömbként van megadva. Az oszlop explicit deszerializálására body használhatócast("string").

Első lépések

Kezdjük egy gyors példával: WordCount. Az alábbi jegyzetfüzet mindössze annyit igényel, hogy a WordCount strukturált streamelést használjon az Azure Event Hubs használatával.

Azure Event Hubs WordCount strukturált streamelési jegyzetfüzettel

Jegyzetfüzet beszerzése

Konfiguráció

Ez a szakasz az Event Hubs használatához szükséges konfigurációs beállításokat ismerteti.

A strukturált streamelés Azure Event Hubskal való konfigurálásával kapcsolatos részletes útmutatásért tekintse meg a Microsoft által kifejlesztett strukturált streamelési és Azure Event Hubs-integrációs útmutatót .

A strukturált streamelés használatával kapcsolatos részletes útmutatásért lásd : Streamelés az Azure Databricksben.

Kapcsolati sztring

Az Event Hubs szolgáltatáshoz való csatlakozáshoz egy Event Hubs-kapcsolati sztring szükséges. Az Event Hubs-példányhoz tartozó kapcsolati sztring az Azure Portalról vagy a ConnectionStringBuilder tárból szerezheti be.

Azure Portal

Amikor lekéri a kapcsolati sztring az Azure Portalról, lehet, hogy nem rendelkezik a EntityPath kulccsal. Consider:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Az EventHubshoz EntityPath való csatlakozáshoz jelen kell lennie. Ha a kapcsolati sztring nem rendelkezik ilyenel, ne aggódjon. Ez gondoskodik róla:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Csatlakozás ionStringBuilder

Másik lehetőségként használhatja a ConnectionStringBuilder kapcsolati sztring.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Az Event Hubshoz kapcsolódó összes konfiguráció az Ön EventHubsConfszámítógépén történik. Ha létre szeretne hozni egy EventHubsConfkapcsolati sztring:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Az érvényes kapcsolati sztring beszerzéséről további információt a Csatlakozás ion-sztringben talál.

A konfigurációk teljes listáját az EventHubsConfban találja. Íme a konfigurációk egy részhalmaza az első lépésekhez:

Lehetőség Érték Alapértelmezett Lekérdezés típusa Leírás
consumerGroup Sztring "$Default" Streamelés és köteg A fogyasztói csoport egy teljes eseményközpont nézete. A felhasználói csoportok révén több felhasználó alkalmazás rendelkezhet az eseménystream külön nézetével, és a többitől függetlenül saját tempójában és saját eltolásával olvashatja a streamet. További információ a Microsoft dokumentációjában érhető el.
startingPosition EventPosition A stream kezdete Streamelés és köteg A strukturált streamelési feladat kezdőpozíciója. A beállítások olvasási sorrendjéről további információt a kezdőpozíciókban talál.
maxEventsPerTrigger hosszú partitionCount

* 1000
Streamelési lekérdezés Az eseményindító-intervallumonként feldolgozott események maximális számának sebességkorlátja. A megadott események teljes száma arányosan oszlik meg a különböző kötetek partíciói között.

Minden beállításhoz létezik egy megfelelő beállítás a következőben EventHubsConf: . Például:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf lehetővé teszi, hogy a felhasználók kezdő (és záró) pozíciót adjanak meg az EventPosition osztályhoz. EventPosition Egy esemény pozícióját határozza meg egy Event Hub-partícióban. A pozíció lehet egy lekérdezett idő, eltolás, sorszám, a stream kezdete vagy a stream vége.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Ha egy adott pozícióban szeretné elindítani (vagy befejezni), egyszerűen hozza létre a megfelelőt EventPosition , és állítsa be a EventHubsConfkövetkező helyre:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Production Structured Streaming with Azure Event Hubs

Amikor streamelési lekérdezéseket futtat éles környezetben, valószínűleg nagyobb robusztusságot és üzemidő-garanciát szeretne, mint ami akkor lenne, ha egyszerűen egy jegyzetfüzetet csatol egy fürthöz, és interaktívan futtatja a streamelési lekérdezéseket. Importálja és futtassa a következő jegyzetfüzetet a strukturált stream éles környezetben való konfigurálásának és futtatásának bemutatásához az Azure Event Hubs és az Azure Databricks használatával.

További információkért tekintse meg a strukturált streamelés éles környezettel kapcsolatos szempontjait.

Production Structured Streaming with Azure Event Hubs notebook

Jegyzetfüzet beszerzése