Megosztás a következőn keresztül:


Azure-eseményközpontok

Az Azure Event Hubs egy hiperméretű telemetriai betöltési szolgáltatás, amely események millióit gyűjti, alakítja át és tárolja. Elosztott streaming-platformként közel valós idejű adatelérést és konfigurálható idejű adatmegőrzést biztosít, lehetővé téve nagy mennyiségű telemetriai adat feltöltését a felhőbe, és azok beolvasását számos alkalmazásból közzétételi és előfizetési szemantika használatával.

Ez a cikk bemutatja, hogyan használható a strukturált streamelés az Azure Event Hubs és az Azure Databricks-fürtök használatával.

Feljegyzés

Az Azure Event Hubs az Apache Kafkával kompatibilis végpontot biztosít, amelyet a Databricks Runtime-ban elérhető strukturált streamelési Kafka-összekötővel használhat az Azure Event Hubs üzeneteinek feldolgozásához. A Databricks a strukturált streamelési Kafka-összekötő használatát javasolja az Azure Event Hubsból érkező üzenetek feldolgozásához.

Követelmények

Az aktuális kiadási támogatásért tekintse meg az Azure Event Hubs Spark Csatlakozás or projekt olvasási fájljának legújabb kiadásait.

  1. Hozzon létre egy kódtárat az Azure Databricks-munkaterületen a Maven koordináta com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17használatával.

    Feljegyzés

    Ez az összekötő rendszeresen frissül, és egy újabb verzió is elérhető lehet: javasoljuk, hogy kérje le a legújabb összekötőt a Maven-adattárból

  2. Telepítse a létrehozott kódtárat a fürtbe.

Séma

A rekordok sémája a következő:

Oszlop Típus
body Bináris
partition húr
offset húr
sequenceNumber hosszú
enqueuedTime időbélyeg
publisher húr
partitionKey húr
properties map[sztring;json]

A body mindig bájttömbként van megadva. Az oszlop explicit deszerializálására body használhatócast("string").

Konfiguráció

Ez a szakasz az Event Hubs használatához szükséges konfigurációs beállításokat ismerteti.

A strukturált streamelés Azure Event Hubskal való konfigurálásával kapcsolatos részletes útmutatásért tekintse meg a Microsoft által kifejlesztett strukturált streamelési és Azure Event Hubs-integrációs útmutatót .

A strukturált streamelés használatával kapcsolatos részletes útmutatásért lásd : Streamelés az Azure Databricksben.

Kapcsolati sztring

Az Event Hubs szolgáltatáshoz való csatlakozáshoz egy Event Hubs-kapcsolati sztring szükséges. Az Event Hubs-példányhoz tartozó kapcsolati sztring az Azure Portalról vagy a ConnectionStringBuilder tárból szerezheti be.

Azure Portal

Amikor lekéri a kapcsolati sztring az Azure Portalról, lehet, hogy nem rendelkezik a EntityPath kulccsal. Megfontolandó szempontok:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Az EventHubshoz EntityPath való csatlakozáshoz jelen kell lennie. Ha a kapcsolati sztring nem rendelkezik ilyenel, ne aggódjon. Ez gondoskodik róla:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

Csatlakozás ionStringBuilder

Másik lehetőségként használhatja a ConnectionStringBuilder kapcsolati sztring.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Az Event Hubshoz kapcsolódó összes konfiguráció az Ön EventHubsConfszámítógépén történik. Ha létre szeretne hozni egy EventHubsConfkapcsolati sztring:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Az érvényes kapcsolati sztring beszerzéséről további információt a Csatlakozás ion-sztringben talál.

A konfigurációk teljes listáját az EventHubsConfban találja. Íme a konfigurációk egy részhalmaza az első lépésekhez:

Lehetőség Érték Alapértelmezett Lekérdezés típusa Leírás
consumerGroup Sztring "$Default" Streamelés és köteg A fogyasztói csoport egy teljes eseményközpont nézete. A felhasználói csoportok révén több felhasználó alkalmazás rendelkezhet az eseménystream külön nézetével, és a többitől függetlenül saját tempójában és saját eltolásával olvashatja a streamet. További információ a Microsoft dokumentációjában érhető el.
startingPosition EventPosition A stream kezdete Streamelés és köteg A strukturált streamelési feladat kezdőpozíciója. A beállítások olvasási sorrendjéről további információt a kezdőpozíciókban talál.
maxEventsPerTrigger hosszú partitionCount

* 1000
Streamelési lekérdezés Az eseményindító-intervallumonként feldolgozott események maximális számának sebességkorlátja. A megadott események teljes száma arányosan oszlik meg a különböző kötetek partíciói között.

Minden beállításhoz létezik egy megfelelő beállítás a következőben EventHubsConf: . Példa:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf lehetővé teszi, hogy a felhasználók kezdő (és záró) pozíciót adjanak meg az EventPosition osztályhoz. EventPosition Egy esemény pozícióját határozza meg egy Event Hub-partícióban. A pozíció lehet egy lekérdezett idő, eltolás, sorszám, a stream kezdete vagy a stream vége.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Ha egy adott pozícióban szeretné elindítani (vagy befejezni), egyszerűen hozza létre a megfelelőt EventPosition , és állítsa be a EventHubsConfkövetkező helyre:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)