Przesyłanie strumieniowe danych jako danych wejściowych do usługi Stream Analytics

Usługa Stream Analytics oferuje pierwszą klasę integracji ze strumieniami danych platformy Azure jako dane wejściowe z trzech rodzajów zasobów:

Te zasoby wejściowe mogą znajdować się w tej samej subskrypcji platformy Azure co zadanie usługi Stream Analytics lub inna subskrypcja.

Kompresja

Usługa Stream Analytics obsługuje kompresję dla wszystkich źródeł wejściowych. Obsługiwane typy kompresji to: None, Gzip i Deflate. Obsługa kompresji nie jest dostępna dla danych referencyjnych. Jeśli dane wejściowe są kompresowane w usłudze Avro, usługa Stream Analytics obsługuje je w sposób niewidoczny. Nie trzeba określać typu kompresji z serializacji Avro.

Tworzenie, edytowanie lub testowanie danych wejściowych

Możesz użyć witryny Azure Portal, programu Visual Studio i programu Visual Studio Code, aby dodać i wyświetlić i edytować istniejące dane wejściowe w zadaniu przesyłania strumieniowego. Możesz również przetestować połączenia wejściowe i przetestować zapytania z przykładowych danych z witryny Azure Portal, programu Visual Studio i programu Visual Studio Code. Podczas pisania zapytania należy wyświetlić dane wejściowe w klauzuli FROM. Listę dostępnych danych wejściowych można uzyskać na stronie Zapytanie w portalu. Jeśli chcesz użyć wielu danych wejściowych, JOIN lub napiszesz wiele SELECT zapytań.

Uwaga

Zdecydowanie zalecamy używanie narzędzi usługi Stream Analytics dla programu Visual Studio Code w celu uzyskania najlepszego środowiska programowania lokalnego. Istnieją znane luki w funkcjach w narzędziach usługi Stream Analytics dla programu Visual Studio 2019 (wersja 2.6.3000.0) i nie będzie ona ulepszana w przyszłości.

Przesyłanie strumieniowe danych z usługi Event Hubs

Usługa Azure Event Hubs to wysoce skalowalny moduł ingestor zdarzeń publikowania i subskrybowania. Centrum zdarzeń może zbierać miliony zdarzeń na sekundę, aby umożliwić przetwarzanie i analizowanie ogromnych ilości danych generowanych przez połączone urządzenia i aplikacje. Razem usługi Event Hubs i Stream Analytics mogą zapewnić kompleksowe rozwiązanie do analizy w czasie rzeczywistym. Usługa Event Hubs umożliwia przesyłanie zdarzeń na platformę Azure w czasie rzeczywistym, a zadania usługi Stream Analytics mogą przetwarzać te zdarzenia w czasie rzeczywistym. Możesz na przykład wysyłać kliknięcia internetowe, odczyty czujników lub zdarzenia dziennika online do usługi Event Hubs. Następnie można utworzyć zadania usługi Stream Analytics, aby używać usługi Event Hubs dla danych wejściowych na potrzeby filtrowania, agregowania i korelacji w czasie rzeczywistym.

EventEnqueuedUtcTime to sygnatura czasowa przybycia zdarzenia w centrum zdarzeń i jest domyślnym znacznikiem czasu zdarzeń pochodzących z usługi Event Hubs do usługi Stream Analytics. Aby przetworzyć dane jako strumień przy użyciu znacznika czasu w ładunku zdarzenia, należy użyć słowa kluczowego TIMESTAMP BY .

Grupy odbiorców usługi Event Hubs

Każde dane wejściowe centrum zdarzeń należy skonfigurować tak, aby miało własną grupę odbiorców. Jeśli zadanie zawiera samosprzężenia lub ma wiele danych wejściowych, niektóre dane wejściowe mogą być odczytywane przez więcej niż jednego podrzędnego czytnika. Ta sytuacja ma wpływ na liczbę czytelników w jednej grupie odbiorców. Aby uniknąć przekroczenia limitu pięciu czytelników na grupę odbiorców na partycję, najlepszym rozwiązaniem jest wyznaczenie grupy odbiorców dla każdego zadania usługi Stream Analytics. Istnieje również limit 20 grup odbiorców dla centrum zdarzeń w warstwie Standardowa. Aby uzyskać więcej informacji, zobacz Rozwiązywanie problemów z danymi wejściowymi usługi Azure Stream Analytics.

Tworzenie danych wejściowych z usługi Event Hubs

W poniższej tabeli opisano każdą właściwość na stronie Nowe dane wejściowe w witrynie Azure Portal w celu przesyłania strumieniowego danych wejściowych z centrum zdarzeń:

Właściwości opis
Alias danych wejściowych Przyjazna nazwa używana w zapytaniu zadania do odwoływanie się do tych danych wejściowych.
Subskrypcja Wybierz subskrypcję platformy Azure, w której istnieje zasób centrum zdarzeń.
Przestrzeń nazw centrum zdarzeń Przestrzeń nazw usługi Event Hubs jest kontenerem dla centrów zdarzeń. Podczas tworzenia centrum zdarzeń przestrzeń nazw jest również tworzona.
Nazwa centrum zdarzeń Nazwa centrum zdarzeń do użycia jako dane wejściowe.
Grupa odbiorców centrum zdarzeń (zalecane) Zalecamy używanie odrębnej grupy odbiorców dla każdego zadania usługi Stream Analytics. Ten ciąg identyfikuje grupę odbiorców do pozyskiwania danych z centrum zdarzeń. Jeśli grupa odbiorców nie zostanie określona, zadanie usługi Stream Analytics używa $Default grupy odbiorców.
Tryb uwierzytelniania Określ typ uwierzytelniania, którego chcesz użyć do nawiązania połączenia z centrum zdarzeń. Do uwierzytelniania w centrum zdarzeń można użyć parametry połączenia lub tożsamości zarządzanej. W przypadku opcji tożsamości zarządzanej można utworzyć tożsamość zarządzaną przypisaną przez system do zadania usługi Stream Analytics lub tożsamość zarządzaną przypisaną przez użytkownika do uwierzytelniania w centrum zdarzeń. W przypadku korzystania z tożsamości zarządzanej tożsamość zarządzana musi być członkiem ról odbiornika danych usługi Azure Event Hubs lub właściciela danych usługi Azure Event Hubs.
Nazwa zasad centrum zdarzeń Zasady dostępu współdzielonego, które zapewniają dostęp do usługi Event Hubs. Każda zasada dostępu współdzielonego ma nazwę, uprawnienia ustawione i klucze dostępu. Ta opcja jest wypełniana automatycznie, chyba że zostanie wybrana opcja ręcznego udostępnienia ustawień usługi Event Hubs.
Klucz partycji Jest to pole opcjonalne, które jest dostępne tylko wtedy, gdy zadanie jest skonfigurowane do używania poziomu zgodności 1.2 lub nowszego. Jeśli dane wejściowe są partycjonowane przez właściwość, możesz dodać tutaj nazwę tej właściwości. Jest on używany do poprawy wydajności zapytania, jeśli zawiera klauzulę PARTITION BY lub GROUP BY dla tej właściwości. Jeśli to zadanie używa poziomu zgodności 1.2 lub nowszego, to pole domyślnie ma wartość domyślną PartitionId.
Format serializacji zdarzeń Format serializacji (JSON, CSV, Avro, Parquet lub Other (Protobuf, XML, zastrzeżony...)) przychodzącego strumienia danych. Upewnij się, że format JSON jest zgodny ze specyfikacją i nie zawiera wiodących wartości 0 dla liczb dziesiętnych.
Kodowanie UTF-8 jest obecnie jedynym obsługiwanym formatem kodowania.
Typ kompresji zdarzeń Typ kompresji używany do odczytywania przychodzącego strumienia danych, takiego jak Brak (wartość domyślna), Gzip lub Deflate.
Rejestr schematów (wersja zapoznawcza) Możesz wybrać rejestr schematów ze schematami dla danych zdarzeń otrzymanych z centrum zdarzeń.

Gdy dane pochodzą z danych wejściowych strumienia usługi Event Hubs, masz dostęp do następujących pól metadanych w zapytaniu usługi Stream Analytics:

Właściwości opis
EventProcessedUtcTime Data i godzina przetwarzania zdarzenia przez usługę Stream Analytics.
EventEnqueuedUtcTime Data i godzina odebrania zdarzeń przez usługę Event Hubs.
Partitionid Identyfikator partycji opartej na zera dla karty wejściowej.

Na przykład przy użyciu tych pól można napisać zapytanie podobne do następującego przykładu:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Uwaga

W przypadku korzystania z usługi Event Hubs jako punktu końcowego dla tras usługi IoT Hub można uzyskać dostęp do metadanych usługi IoT Hub przy użyciu funkcji GetMetadataPropertyValue.

Przesyłanie strumieniowe danych z usługi IoT Hub

Usługa Azure IoT Hub to wysoce skalowalne zdarzenie publikowania-subskrybowania zoptymalizowane pod kątem scenariuszy IoT.

Domyślny znacznik czasu zdarzeń pochodzących z usługi IoT Hub w usłudze Stream Analytics to sygnatura czasowa, którą zdarzenie dotarło do usługi IoT Hub, czyli EventEnqueuedUtcTime. Aby przetworzyć dane jako strumień przy użyciu znacznika czasu w ładunku zdarzenia, należy użyć słowa kluczowego TIMESTAMP BY .

Grupy konsumentów usługi Iot Hub

Każde dane wejściowe usługi Stream Analytics IoT Hub należy skonfigurować tak, aby miało własną grupę odbiorców. Gdy zadanie zawiera samosprzężenia lub zawiera wiele danych wejściowych, niektóre dane wejściowe mogą być odczytywane przez więcej niż jednego podrzędnego czytnika. Ta sytuacja ma wpływ na liczbę czytelników w jednej grupie odbiorców. Aby uniknąć przekroczenia limitu usługi Azure IoT Hub wynoszącego pięć czytelników na grupę odbiorców na partycję, najlepszym rozwiązaniem jest wyznaczenie grupy odbiorców dla każdego zadania usługi Stream Analytics.

Konfigurowanie usługi IoT Hub jako danych wejściowych strumienia danych

W poniższej tabeli opisano każdą właściwość na stronie Nowe dane wejściowe w witrynie Azure Portal podczas konfigurowania usługi IoT Hub jako danych wejściowych strumienia.

Właściwości opis
Alias danych wejściowych Przyjazna nazwa używana w zapytaniu zadania do odwoływanie się do tych danych wejściowych.
Subskrypcja Wybierz subskrypcję, w której istnieje zasób usługi IoT Hub.
IoT Hub Nazwa usługi IoT Hub do użycia jako dane wejściowe.
Grupa konsumentów Zalecamy użycie innej grupy odbiorców dla każdego zadania usługi Stream Analytics. Grupa odbiorców służy do pozyskiwania danych z usługi IoT Hub. Usługa Stream Analytics używa grupy odbiorców $Default, chyba że określono inaczej.
Nazwa zasad dostępu współdzielonego Zasady dostępu współdzielonego, które zapewniają dostęp do usługi IoT Hub. Każda zasada dostępu współdzielonego ma nazwę, uprawnienia ustawione i klucze dostępu.
Klucz zasad dostępu współdzielonego Klucz dostępu współdzielonego używany do autoryzowania dostępu do usługi IoT Hub. Ta opcja jest wypełniana automatycznie, chyba że wybierzesz opcję ręcznego udostępnienia ustawień usługi Iot Hub.
Punkt końcowy Punkt końcowy usługi IoT Hub.
Klucz partycji Jest to pole opcjonalne, które jest dostępne tylko wtedy, gdy zadanie jest skonfigurowane do używania poziomu zgodności 1.2 lub nowszego. Jeśli dane wejściowe są partycjonowane przez właściwość, możesz dodać tutaj nazwę tej właściwości. Służy do poprawy wydajności zapytania, jeśli zawiera klauzulę PARTITION BY lub GROUP BY dla tej właściwości. Jeśli to zadanie używa poziomu zgodności 1.2 lub nowszego, to pole domyślnie ma wartość "PartitionId".
Format serializacji zdarzeń Format serializacji (JSON, CSV, Avro, Parquet lub Other (Protobuf, XML, zastrzeżony...)) przychodzącego strumienia danych. Upewnij się, że format JSON jest zgodny ze specyfikacją i nie zawiera wiodących wartości 0 dla liczb dziesiętnych.
Kodowanie UTF-8 jest obecnie jedynym obsługiwanym formatem kodowania.
Typ kompresji zdarzeń Typ kompresji używany do odczytywania przychodzącego strumienia danych, takiego jak Brak (wartość domyślna), Gzip lub Deflate.

Gdy używasz danych strumienia z usługi IoT Hub, masz dostęp do następujących pól metadanych w zapytaniu usługi Stream Analytics:

Właściwości opis
EventProcessedUtcTime Data i godzina przetworzenia zdarzenia.
EventEnqueuedUtcTime Data i godzina odebrania zdarzenia przez usługę IoT Hub.
Partitionid Identyfikator partycji opartej na zera dla karty wejściowej.
IoTHub.MessageId Identyfikator używany do skorelowania dwukierunkowej komunikacji w usłudze IoT Hub.
IoTHub.CorrelationId Identyfikator używany w odpowiedziach komunikatów i opiniach w usłudze IoT Hub.
IoTHub. Połączenie ionDeviceId Identyfikator uwierzytelniania używany do wysyłania tej wiadomości. Ta wartość jest oznaczana komunikatami powiązanymi z usługą przez usługę IoT Hub.
IoTHub. Połączenie ionDeviceGenerationId Identyfikator generacji uwierzytelnionego urządzenia, który został użyty do wysłania tej wiadomości. Ta wartość jest oznaczana komunikatami dla ruchu przychodzącego usługi przez usługę IoT Hub.
IoTHub.EnqueuedTime Czas odebrania komunikatu przez usługę IoT Hub.

Przesyłanie strumieniowe danych z usługi Blob Storage lub Data Lake Storage Gen2

W scenariuszach z dużymi ilościami danych bez struktury do przechowywania w chmurze usługa Azure Blob Storage lub Usługa Azure Data Lake Storage Gen2 oferuje ekonomiczne i skalowalne rozwiązanie. Dane w usłudze Blob Storage lub Azure Data Lake Storage Gen2 są traktowane jako dane magazynowane. Jednak te dane mogą być przetwarzane jako strumień danych przez usługę Stream Analytics.

Przetwarzanie dzienników jest często używanym scenariuszem używania takich danych wejściowych z usługą Stream Analytics. W tym scenariuszu pliki danych telemetrycznych są przechwytywane z systemu i muszą zostać przeanalizowane i przetworzone w celu wyodrębnienia znaczących danych.

Domyślny znacznik czasu zdarzenia usługi Blob Storage lub Azure Data Lake Storage Gen2 w usłudze Stream Analytics to znacznik czasu ostatniej modyfikacji, czyli BlobLastModifiedUtcTime. Jeśli obiekt blob zostanie przekazany do konta magazynu o godzinie 13:00, a zadanie usługi Azure Stream Analytics zostanie uruchomione przy użyciu opcji Teraz o godzinie 13:01, nie zostanie odebrane, ponieważ jego zmodyfikowany czas spadnie poza okres wykonywania zadania.

Jeśli obiekt blob zostanie przekazany do kontenera konta magazynu o godzinie 13:00, a zadanie usługi Azure Stream Analytics zostanie uruchomione przy użyciu czasu niestandardowego o godzinie 13:00 lub starszej, obiekt blob zostanie odebrany, gdy jego zmodyfikowany czas spadnie w okresie wykonywania zadania.

Jeśli zadanie usługi Azure Stream Analytics zostanie uruchomione przy użyciu usługi Now o godzinie 13:00, a obiekt blob zostanie przekazany do kontenera konta magazynu o godzinie 13:01, usługa Azure Stream Analytics pobiera obiekt blob. Sygnatura czasowa przypisana do każdego obiektu blob jest oparta tylko na .BlobLastModifiedTime Folder, w którym znajduje się obiekt blob, nie ma relacji z przypisanym znacznikem czasu. Jeśli na przykład istnieje obiekt blob 2019/10-01/00/b1.txt z wartością BlobLastModifiedTime2019-11-11, znacznik czasu przypisany do tego obiektu blob to 2019-11-11.

Aby przetworzyć dane jako strumień przy użyciu znacznika czasu w ładunku zdarzenia, należy użyć słowa kluczowego TIMESTAMP BY . Zadanie usługi Stream Analytics ściąga dane z usługi Azure Blob Storage lub Azure Data Lake Storage Gen2 co sekundę, jeśli plik obiektu blob jest dostępny. Jeśli plik obiektu blob jest niedostępny, istnieje wycofywanie wykładnicze z maksymalnym opóźnieniem czasu wynoszącym 90 sekund.

Uwaga

Usługa Stream Analytics nie obsługuje dodawania zawartości do istniejącego pliku obiektów blob. Usługa Stream Analytics będzie wyświetlać każdy plik tylko raz, a wszelkie zmiany występujące w pliku po odczytaniu danych przez zadanie nie są przetwarzane. Najlepszym rozwiązaniem jest przekazanie wszystkich danych dla pliku obiektu blob jednocześnie, a następnie dodanie kolejnych nowszych zdarzeń do innego, nowego pliku obiektów blob.

W scenariuszach, w których wiele obiektów blob jest stale dodawanych, a usługa Stream Analytics przetwarza obiekty blob w miarę ich dodawania, istnieje możliwość pomijania niektórych obiektów blob w rzadkich przypadkach ze względu na stopień szczegółowości .BlobLastModifiedTime Ten przypadek można rozwiązać, przekazując obiekty blob co najmniej dwie sekundy. Jeśli ta opcja nie jest możliwa, możesz użyć usługi Event Hubs do przesyłania strumieniowego dużych ilości zdarzeń.

Konfigurowanie usługi Blob Storage jako danych wejściowych strumienia

W poniższej tabeli opisano każdą właściwość na stronie Nowe dane wejściowe w witrynie Azure Portal podczas konfigurowania usługi Blob Storage jako danych wejściowych strumienia.

Właściwości opis
Alias danych wejściowych Przyjazna nazwa używana w zapytaniu zadania do odwoływanie się do tych danych wejściowych.
Subskrypcja Wybierz subskrypcję, w której istnieje zasób magazynu.
Konto magazynu Nazwa konta magazynu, na którym znajdują się pliki obiektów blob.
Klucz konta magazynu Klucz tajny skojarzony z kontem magazynu. Ta opcja jest wypełniana automatycznie, chyba że wybierzesz opcję ręcznego podania ustawień.
Kontener Kontenery zapewniają logiczne grupowanie obiektów blob. Możesz wybrać opcję Użyj istniejącego kontenera lub Utwórz nowy , aby utworzyć nowy kontener.
Tryb uwierzytelniania Określ typ uwierzytelniania, którego chcesz użyć do nawiązania połączenia z kontem magazynu. Do uwierzytelniania przy użyciu konta magazynu można użyć parametry połączenia lub tożsamości zarządzanej. W przypadku opcji tożsamości zarządzanej można utworzyć tożsamość zarządzaną przypisaną przez system do zadania usługi Stream Analytics lub tożsamość zarządzaną przypisaną przez użytkownika w celu uwierzytelnienia przy użyciu konta magazynu. W przypadku korzystania z tożsamości zarządzanej tożsamość zarządzana musi być członkiem odpowiedniej roli na koncie magazynu.
Wzorzec ścieżki (opcjonalnie) Ścieżka pliku używana do lokalizowania obiektów blob w określonym kontenerze. Jeśli chcesz odczytywać obiekty blob z katalogu głównego kontenera, nie ustawiaj wzorca ścieżki. W ścieżce można określić co najmniej jedno wystąpienie następujących trzech zmiennych: {date}, lub {time}{partition}

Przykład 1: cluster1/logs/{date}/{time}/{partition}

Przykład 2: cluster1/logs/{date}

Znak * nie jest dozwoloną wartością prefiksu ścieżki. Dozwolone są tylko prawidłowe znaki obiektu blob platformy Azure. Nie dołączaj nazw kontenerów ani nazw plików.
Format daty (opcjonalnie) Jeśli używasz zmiennej daty w ścieżce, format daty, w którym są zorganizowane pliki. Przykład: YYYY/MM/DD

Gdy dane wejściowe obiektu blob znajdują {date} się w ścieżce lub {time} w jej ścieżce, foldery są przeglądane w kolejności rosnącej czasu.
Format czasu (opcjonalnie) Jeśli używasz zmiennej czasowej w ścieżce, format czasu, w którym są zorganizowane pliki. Obecnie jedyną obsługiwaną wartością jest HH liczba godzin.
Klucz partycji Jest to pole opcjonalne, które jest dostępne tylko wtedy, gdy zadanie jest skonfigurowane do używania poziomu zgodności 1.2 lub nowszego. Jeśli dane wejściowe są partycjonowane przez właściwość, możesz dodać tutaj nazwę tej właściwości. Służy do poprawy wydajności zapytania, jeśli zawiera klauzulę PARTITION BY lub GROUP BY dla tej właściwości. Jeśli to zadanie używa poziomu zgodności 1.2 lub nowszego, to pole domyślnie ma wartość "PartitionId".
Liczba partycji wejściowych To pole jest obecne tylko wtedy, gdy element {partition} jest obecny we wzorcu ścieżki. Wartość tej właściwości jest liczbą całkowitą >=1. Gdziekolwiek {partition} pojawia się w pathPattern, zostanie użyta liczba z zakresu od 0 do wartości tego pola -1.
Format serializacji zdarzeń Format serializacji (JSON, CSV, Avro, Parquet lub Other (Protobuf, XML, zastrzeżony...)) przychodzącego strumienia danych. Upewnij się, że format JSON jest zgodny ze specyfikacją i nie zawiera wiodących wartości 0 dla liczb dziesiętnych.
Kodowanie W przypadku plików CSV i JSON format UTF-8 jest obecnie jedynym obsługiwanym formatem kodowania.
Kompresja Typ kompresji używany do odczytywania przychodzącego strumienia danych, takiego jak Brak (wartość domyślna), Gzip lub Deflate.

Gdy dane pochodzą ze źródła usługi Blob Storage, masz dostęp do następujących pól metadanych w zapytaniu usługi Stream Analytics:

Właściwości opis
Nazwa obiektu blob Nazwa wejściowego obiektu blob, z którego pochodzi zdarzenie.
EventProcessedUtcTime Data i godzina przetwarzania zdarzenia przez usługę Stream Analytics.
BlobLastModifiedUtcTime Data i godzina ostatniej modyfikacji obiektu blob.
Partitionid Identyfikator partycji opartej na zera dla karty wejściowej.

Na przykład przy użyciu tych pól można napisać zapytanie podobne do następującego przykładu:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Przesyłanie strumieniowe danych z platformy Apache Kafka

Usługa Azure Stream Analytics umożliwia bezpośrednie łączenie się z klastrami platformy Apache Kafka w celu pozyskiwania danych. Rozwiązanie to niski kod i całkowicie zarządzane przez zespół usługi Azure Stream Analytics w firmie Microsoft, co pozwala na spełnianie standardów zgodności biznesowej. Dane wejściowe platformy Kafka są zgodne z poprzednimi wersjami i obsługują wszystkie wersje z najnowszą wersją klienta, począwszy od wersji 0.10. Użytkownicy mogą łączyć się z klastrami platformy Kafka w sieci wirtualnej i klastrami Kafka z publicznym punktem końcowym w zależności od konfiguracji. Konfiguracja opiera się na istniejących konwencjach konfiguracji platformy Kafka. Obsługiwane typy kompresji to None, Gzip, Snappy, LZ4 i Zstd.

Aby uzyskać więcej informacji, zobacz Przesyłanie strumieniowe danych z platformy Kafka do usługi Azure Stream Analytics (wersja zapoznawcza).

Następne kroki