Połączenie danych usługi Azure Event Hubs

Azure Event Hubs to platforma przesyłania strumieniowego danych big data i usługa pozyskiwania zdarzeń. Usługa Azure Data Explorer oferuje ciągłe pozyskiwanie z usługi Event Hubs zarządzanej przez klienta.

Potok pozyskiwania usługi Event Hubs przesyła zdarzenia do usługi Azure Data Explorer w kilku krokach. Najpierw utworzysz centrum zdarzeń w Azure Portal. Następnie utworzysz tabelę docelową na platformie Azure Data Explorer, w której dane w określonym formacie zostaną pozyskane przy użyciu podanych właściwości pozyskiwania. Połączenie usługi Event Hubs musi znać routing zdarzeń. Dane mogą być osadzone z wybranymi właściwościami zgodnie z właściwościami systemu zdarzeń. Utwórz połączenie z usługą Event Hubs, aby utworzyć centrum zdarzeń i wysłać zdarzenia. Ten proces można zarządzać za pomocą Azure Portal, programowo za pomocą języka C# lub Języka Python lub szablonu usługi Azure Resource Manager.

Aby uzyskać ogólne informacje na temat pozyskiwania danych w usłudze Azure Data Explorer, zobacz Omówienie pozyskiwania danych w usłudze Azure Data Explorer.

Mechanizmy uwierzytelniania połączeń danych usługi Azure Data Explorer

Przestroga

Jeśli uprawnienia tożsamości zarządzanej zostaną usunięte ze źródła danych, połączenie danych jest wyłączone i nie może pobrać danych ze źródła danych.

  • Połączenie danych oparte na kluczach: jeśli tożsamość zarządzana nie jest określona w połączeniu danych, połączenie automatycznie domyślnie do uwierzytelniania opartego na kluczach. Połączenia oparte na kluczach pobierają dane przy użyciu parametry połączenia zasobów, takich jak Azure Event Hubs parametry połączenia. Usługa Azure Data Explorer generuje parametry połączenia zasobu dla określonego zasobu i bezpiecznie zapisuje go w połączeniu danych. Następnie parametry połączenia służy do pobierania danych ze źródła danych.

Przestroga

Jeśli klucz jest obracany, połączenie danych jest wyłączone i nie może pobrać danych ze źródła danych. Aby rozwiązać ten problem, zaktualizuj lub ponownie utwórz połączenie danych.

Format danych

  • Dane są odczytywane z centrum zdarzeń w postaci obiektów EventData .

  • Zobacz obsługiwane formaty.

    Uwaga

  • Dane można kompresować przy użyciu algorytmu GZip kompresji. Można określić Compression dynamicznie przy użyciu właściwości pozyskiwania lub w ustawieniach statycznego połączenia danych.

    Uwaga

    Kompresja danych nie jest obsługiwana w przypadku skompresowanych formatów (Avro, Parquet, ORC, ApacheAvro i W3CLOGFILE). Niestandardowe kodowanie i osadzone właściwości systemu nie są obsługiwane w przypadku skompresowanych danych.

Właściwości usługi Event Hubs

Usługa Azure Data Explorer obsługuje następujące właściwości usługi Event Hubs:

Uwaga

Pozyskiwanie właściwości niestandardowych usługi Event Hubs używanych do kojarzenia metadanych z zdarzeniami nie jest obsługiwane. Jeśli musisz pozyskać właściwości niestandardowe, wyślij je w treści danych zdarzenia. Aby uzyskać więcej informacji, zobacz Pozyskiwanie właściwości niestandardowych.

Właściwości pozyskiwania

Właściwości pozyskiwania intruują proces pozyskiwania, gdzie należy kierować dane i jak je przetwarzać. Właściwości pozyskiwania zdarzeń można określić przy użyciu właściwości EventData.Properties. Można ustawić następujące właściwości:

Uwaga

Nazwy właściwości są uwzględniane w wielkości liter.

Właściwość Opis
baza danych Uwzględniana wielkość liter nazwa docelowej bazy danych. Domyślnie dane są pozyskiwane do docelowej bazy danych skojarzonej z połączeniem danych. Użyj tej właściwości, aby zastąpić domyślną bazę danych i wysłać dane do innej bazy danych. W tym celu należy najpierw skonfigurować połączenie jako połączenie z wieloma bazami danych.
Tabela Uwzględniana wielkość liter w istniejącej tabeli docelowej. Table Zastępuje zestaw w okienkuData Connection.
Format Format danych. Data format Zastępuje zestaw w okienkuData Connection.
IngestionMappingReference Nazwa istniejącego mapowania pozyskiwania do użycia. Column mapping Zastępuje zestaw w okienkuData Connection.
Kompresja Kompresja danych ( None wartość domyślna) lub GZip kompresja.
Encoding Kodowanie danych, wartość domyślna to UTF8. Może to być dowolne z obsługiwanych kodowań platformy .NET.
Tagi Lista tagów do skojarzenia z pozyskanymi danymi, sformatowana jako ciąg tablicy JSON. W przypadku używania tagów występują implikacje dotyczące wydajności .
RawHeaders Wskazuje, że źródłem zdarzeń jest platforma Kafka i usługa Azure Data Explorer musi używać deserializacji tablic bajtowych do odczytywania innych właściwości routingu. Wartość jest ignorowana.

Uwaga

Pozyskane są tylko zdarzenia w kolejce po utworzeniu połączenia danych.

Routing zdarzeń

Podczas tworzenia połączenia danych z klastrem można określić routing dla miejsca wysyłania pozyskanych danych. Domyślnym routingiem jest tabela docelowa określona w parametry połączenia skojarzonej z docelową bazą danych. Domyślny routing danych jest również nazywany routingiem statycznym. Możesz określić alternatywny routing dla danych, ustawiając właściwości danych zdarzenia wymienionych powyżej.

Kierowanie danych zdarzeń do alternatywnej bazy danych

Routing danych do alternatywnej bazy danych jest domyślnie wyłączony. Aby wysłać dane do innej bazy danych, należy najpierw ustawić połączenie jako połączenie z wieloma bazami danych. Można to zrobić w Azure Portal Azure Portal, C#, Python lub szablon usługi ARM. Użytkownik, grupa, jednostka usługi lub tożsamość zarządzana używana do zezwalania na routing bazy danych musi mieć co najmniej rolę współautora i uprawnienia do zapisu w klastrze.

Aby określić alternatywną bazę danych, ustaw właściwość Pozyskiwaniebazy danych.

Ostrzeżenie

Określenie alternatywnej bazy danych bez ustawienia połączenia jako połączenia danych z wieloma bazami danych spowoduje niepowodzenie pozyskiwania.

Kierowanie danych zdarzeń do tabeli alternatywnej

Aby określić tabelę alternatywną dla każdego zdarzenia, ustaw właściwości Tabela, Format, Kompresja i Mapowanie pozyskiwania. Połączenie dynamicznie kieruje pozyskane dane zgodnie z wartością określoną w właściwościach EventData.Properties, przesłaniając właściwości statyczne dla tego zdarzenia.

W poniższym przykładzie pokazano, jak ustawić szczegóły centrum zdarzeń i wysłać dane metryk pogody do alternatywnej bazy danych (MetricsDB) i tabeli (WeatherMetrics). Dane są w formacie JSON i mapowanie1 jest wstępnie zdefiniowane w tabeli WeatherMetrics.

// This sample uses Azure.Messaging.EventHubs which is a .Net Framework library.
await using var producerClient = new EventHubProducerClient("<eventHubConnectionString>");
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(
    new { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }
)));
eventData.Properties.Add("Database", "MetricsDB");
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['myDataTag']");
var events = new[] { eventData };
// Send events
await producerClient.SendAsync(events);

Mapowanie właściwości systemu zdarzeń

Właściwości systemu przechowują właściwości ustawione przez usługę Event Hubs w momencie w kolejce zdarzenia. Połączenie danych z centrum zdarzeń może osadzić wybrany zestaw właściwości systemowych do danych pozyskanych do tabeli na podstawie danego mapowania.

Uwaga

  • Właściwości systemu osadzania są obsługiwane w formatach json i tabelarycznych (np. JSON, , PSVMultiJSONTSVSCsvCSV, ). TSVESOHsv
  • W przypadku korzystania z nieobsługiwanego formatu (i.e. TXT lub skompresowanych formatów, takich jak Parquet, Avro itp.), dane będą nadal pozyskiwane, ale właściwości zostaną zignorowane.
  • Osadzanie właściwości systemu nie jest obsługiwane, gdy jest ustawiona kompresja komunikatów centrum zdarzeń. W takich scenariuszach zostanie wyemitowany odpowiedni błąd, a dane nie zostaną pozyskane.
  • W przypadku danych tabelarycznych właściwości systemu są obsługiwane tylko w przypadku komunikatów o zdarzeniach z jednym rekordem.
  • W przypadku danych json właściwości systemu są również obsługiwane w przypadku komunikatów o zdarzeniach z wieloma rekordami. W takich przypadkach właściwości systemu są dodawane tylko do pierwszego rekordu komunikatu zdarzenia.
  • W przypadku CSV mapowania właściwości są dodawane na początku rekordu w kolejności wymienionej w tworzeniu połączenia danych. Nie polegaj na kolejności tych właściwości, ponieważ może ulec zmianie w przyszłości.
  • W przypadku JSON mapowania właściwości są dodawane zgodnie z nazwami właściwości w tabeli Właściwości systemu .

Usługa Event Hubs uwidacznia następujące właściwości systemu:

Właściwość Typ danych Opis
x-opt-enqueued-time datetime Godzina UTC, kiedy zdarzenie zostało w kolejce
x-opt-sequence-number long Numer sekwencji logicznej zdarzenia w strumieniu partycji centrum zdarzeń
przesunięcie x-opt-offset string Przesunięcie zdarzenia ze strumienia partycji centrum zdarzeń. Identyfikator przesunięcia jest unikatowy w partycji strumienia centrum zdarzeń
x-opt-publisher string Nazwa wydawcy, jeśli wiadomość została wysłana do punktu końcowego wydawcy
x-opt-partition-key string Klucz partycji odpowiadającej partycji, która przechowywała zdarzenie

Podczas pracy z centrami zdarzeń usługi IoT Central można również osadzać właściwości systemu IoT Hub w ładunku. Aby uzyskać pełną listę, zobacz IoT Hub właściwości systemu.

Jeśli w sekcji Źródło danych tabeli wybrano właściwości systemu zdarzeń, należy uwzględnić właściwości w schemacie tabeli i mapowaniu.

Przykłady mapowania schematów

Przykład mapowania schematu tabeli

Jeśli dane zawierają trzy kolumny (Timespan, Metric, i Value) oraz właściwości, które uwzględnisz, to x-opt-enqueued-time i x-opt-offset, utwórz lub zmień schemat tabeli przy użyciu następującego polecenia:

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

Przykład mapowania CSV

Uruchom następujące polecenia, aby dodać dane na początku rekordu. Zanotuj wartości porządkowe.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

Przykład mapowania JSON

Dane są dodawane przy użyciu mapowania właściwości systemowych. Uruchom następujące polecenia:

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

Mapowanie schematu dla plików Avro przechwytywania centrum zdarzeń

Jednym ze sposobów korzystania z danych centrum zdarzeń jest przechwytywanie zdarzeń za pośrednictwem Azure Event Hubs w Azure Blob Storage lub Azure Data Lake Storage. Następnie można pozyskać pliki przechwytywania podczas ich zapisywania przy użyciu połączenia danych usługi Event Grid w usłudze Azure Data Explorer.

Schemat plików przechwytywania różni się od schematu oryginalnego zdarzenia wysyłanego do centrum zdarzeń. Należy zaprojektować schemat tabeli docelowej z tą różnicą. W szczególności ładunek zdarzenia jest reprezentowany w pliku przechwytywania jako tablica bajtów, a ta tablica nie jest automatycznie dekodowana przez połączenie danych usługi Azure Data Explorer usługi Event Grid. Aby uzyskać bardziej szczegółowe informacje na temat schematu pliku dla danych przechwytywania avro centrum zdarzeń, zobacz Eksplorowanie przechwyconych plików Avro w Azure Event Hubs.

Aby poprawnie zdekodować ładunek zdarzenia:

  1. Zamapuj Body pole przechwyconego zdarzenia na kolumnę typu dynamic w tabeli docelowej.
  2. Zastosuj zasady aktualizacji, które konwertują tablicę bajtów na ciąg czytelny przy użyciu funkcji unicode_codepoints_to_string().

Pozyskiwanie właściwości niestandardowych

Podczas pozyskiwania zdarzeń z usługi Event Hubs dane są pobierane z body sekcji obiektu danych zdarzenia. Jednak właściwości niestandardowe usługi Event Hubs są definiowane w properties sekcji obiektu i nie są pozyskiwane. Aby pozyskać właściwości klienta, należy je osadzić w danych w body sekcji obiektu.

Poniższy przykład porównuje obiekt danych zdarzeń zawierający właściwość customProperty niestandardową zdefiniowaną przez usługę Event Hubs (po lewej) z właściwością osadzoną wymaganą do pozyskiwania (po prawej).

{
"body":{
"value": 42
},
"properties":{
"customProperty": "123456789"
}
}
{
"body":{
"value": 42,
"customProperty": "123456789"
}
}

Można użyć jednej z następujących metod, aby osadzić właściwości niestandardowe w danych w body sekcji obiektu danych zdarzenia:

Połączenie danych centrum zdarzeń między regionami

Aby uzyskać najlepszą wydajność, utwórz wszystkie następujące zasoby w tym samym regionie co klaster. Jeśli nie ma innego rozwiązania, rozważ użycie warstw Premium lub Dedykowanego centrum zdarzeń. Porównanie warstw usługi Event Hub można znaleźć tutaj.

Tworzenie centrum zdarzeń

Jeśli jeszcze go nie masz, utwórz centrum zdarzeń. Nawiązywanie połączenia z centrum zdarzeń można zarządzać za pomocą Azure Portal, programowo za pomocą języka C# lub Python albo szablonu usługi Azure Resource Manager.

Uwaga

  • Możliwość dynamicznego dodawania partycji po utworzeniu centrum zdarzeń jest dostępna tylko w warstwach Premium i Dedykowanych usługi Event Hubs. Podczas ustawiania liczby partycji należy wziąć pod uwagę długoterminową skalę.
  • Grupa odbiorców musi być unikatowa dla użytkownika. Utwórz grupę odbiorców dedykowaną do połączenia usługi Azure Data Explorer.

Wysyłanie zdarzeń

Zobacz przykładową aplikację , która generuje dane i wysyła ją do centrum zdarzeń.

Przykład generowania przykładowych danych można znaleźć w temacie Pozyskiwanie danych z centrum zdarzeń do usługi Azure Data Explorer

Konfigurowanie rozwiązania do odzyskiwania po awarii geograficznej

Centrum zdarzeń oferuje rozwiązanie odzyskiwania po awarii geograficznej . Usługa Azure Data Explorer nie obsługuje Alias przestrzeni nazw centrum zdarzeń. Aby zaimplementować odzyskiwanie po awarii geograficznej w rozwiązaniu, utwórz dwa połączenia danych centrum zdarzeń: jedno dla podstawowej przestrzeni nazw i jedną dla pomocniczej przestrzeni nazw. Usługa Azure Data Explorer będzie nasłuchiwać obu połączeń centrum zdarzeń.

Uwaga

Jest to odpowiedzialność użytkownika za zaimplementowanie trybu failover z podstawowej przestrzeni nazw do pomocniczej przestrzeni nazw.