Udostępnij za pośrednictwem


Przesyłanie danych do usługi Stream Analytics jako danych wejściowych

Usługa Stream Analytics oferuje pierwszą klasę integracji ze strumieniami danych Azure jako danymi wejściowymi z czterech rodzajów zasobów:

Te zasoby wejściowe mogą znajdować się w tej samej subskrypcji Azure co zadanie usługi Stream Analytics albo w innej subskrypcji.

Kompresja

Usługa Stream Analytics obsługuje kompresję dla wszystkich źródeł wejściowych. Obsługiwane typy kompresji to: None, Gzip i Deflate. Obsługa kompresji nie jest dostępna dla danych referencyjnych. Jeśli dane wejściowe to skompresowane dane Avro, usługa Stream Analytics obsługuje je przezroczysto. Nie trzeba określać typu kompresji podczas używania serializacji Avro.

Tworzenie, edytowanie lub testowanie danych wejściowych

Możesz użyć portalu Azure, Visual Studio i Visual Studio Code, aby dodać i wyświetlić i edytować istniejące dane wejściowe w zadaniu przesyłania strumieniowego. Możesz również przetestować połączenia wejściowe i przetestować zapytania z przykładowych danych z portalu Azure, Visual Studio i Visual Studio Code. Podczas pisania zapytania należy wyświetlić dane wejściowe w klauzuli FROM. Listę dostępnych danych wejściowych można uzyskać na stronie Zapytanie w portalu. Jeśli chcesz użyć wielu danych wejściowych, połącz je lub napisz wiele zapytań .

Uwaga

Zdecydowanie zalecamy używanie narzędzi Stream Analytics w Visual Studio Code w celu uzyskania najlepszego lokalnego środowiska programistycznego. Istnieją znane luki w funkcjach w narzędziach usługi Stream Analytics dla Visual Studio 2019 (wersja 2.6.3000.0) i nie zostaną one ulepszone w przyszłości.

Przesyłanie strumieniowe danych z usługi Event Hubs

Azure Event Hubs jest wysoce skalowalnym modułem ingestor zdarzeń publikowania i subskrybowania. Centrum zdarzeń może zbierać miliony zdarzeń na sekundę, aby umożliwić przetwarzanie i analizowanie ogromnych ilości danych generowanych przez połączone urządzenia i aplikacje. Razem usługi Event Hubs i Stream Analytics mogą zapewnić kompleksowe rozwiązanie do analizy w czasie rzeczywistym. Usługa Event Hubs umożliwia przesyłanie zdarzeń do Azure w czasie rzeczywistym, a zadania usługi Stream Analytics mogą przetwarzać te zdarzenia w czasie rzeczywistym. Możesz na przykład wysyłać kliknięcia internetowe, odczyty czujników lub zdarzenia dziennika online do usługi Event Hubs. Następnie można utworzyć zadania usługi Stream Analytics, aby używać usługi Event Hubs dla danych wejściowych na potrzeby filtrowania, agregowania i korelacji w czasie rzeczywistym.

to znacznik czasu przybycia zdarzenia do centrum zdarzeniowego i stanowi domyślny znacznik czasu dla zdarzeń przychodzących z usługi Event Hubs do usługi Stream Analytics. Aby przetworzyć dane jako strumień przy użyciu znacznika czasu w ładunku zdarzenia, należy użyć słowa kluczowego TIMESTAMP BY .

Grupy odbiorców usługi Event Hubs

Każde wejście centrum zdarzeń należy skonfigurować tak, aby miało własną grupę konsumentów. Jeśli zadanie zawiera połączenie z samym sobą lub ma wiele danych wejściowych, niektóre dane wejściowe mogą być odczytywane przez więcej niż jednego czytnika na dalszym etapie. Ta sytuacja ma wpływ na liczbę czytelników w jednej grupie odbiorców. Aby uniknąć przekroczenia limitu pięciu czytelników w Event Hubs na jedną grupę odbiorców na jedną partycję, dobrą praktyką jest wyznaczenie oddzielnej grupy odbiorców dla każdego zadania usługi Stream Analytics. Istnieje również limit 20 grup odbiorców dla centrum zdarzeń w warstwie Standard. Aby uzyskać więcej informacji, zobacz Rozwiązywanie problemów z danymi wejściowymi Azure Stream Analytics.

Tworzenie danych wejściowych z usługi Event Hubs

W poniższej tabeli opisano każdą właściwość na stronie Nowe dane wejściowe w portalu Azure w celu strumieniowego przesyłania danych wejściowych z centrum zdarzeń:

Właściwości opis
Alias danych wejściowych Przyjazna nazwa, którą używasz w zapytaniu zadania, aby odwołać się do tego wejścia.
Subskrypcja Wybierz subskrypcję Azure, w której istnieje zasób centrum zdarzeń.
Przestrzeń nazw centrum zdarzeń Przestrzeń nazw usługi Event Hubs jest kontenerem dla centrów zdarzeń. Podczas tworzenia centrum zdarzeń, tworzysz również przestrzeń nazw.
Nazwa centrum zdarzeń Nazwa centrum zdarzeń do użycia jako dane wejściowe.
Grupa konsumentów Event Huba (zalecane) Zalecamy używanie odrębnej grupy odbiorców dla każdego zadania usługi Stream Analytics. Ten ciąg identyfikuje grupę odbiorców do pobierania danych z centrum wydarzeń. Jeśli grupa odbiorców nie zostanie określona, zadanie usługi Stream Analytics używa grupy odbiorców.
Tryb uwierzytelniania Określ typ uwierzytelniania, którego chcesz użyć do nawiązania połączenia z centrum zdarzeń. Do uwierzytelniania w centrum zdarzeń można użyć parametry połączenia lub tożsamości zarządzanej. W przypadku opcji tożsamości zarządzanej można utworzyć tożsamość zarządzaną przypisaną przez system dla zadania usługi Stream Analytics lub tożsamość zarządzaną przypisaną przez użytkownika do uwierzytelniania w centrum zdarzeń. Kiedy używasz tożsamości zarządzanej, tożsamość zarządzana musi należeć do ról Azure Event Hubs Data Receiver lub Azure Event Hubs Data Owner.
Nazwa polityki centrum zdarzeń Zasady dostępu współdzielonego, które zapewniają dostęp do usługi Event Hubs. Każda zasada dostępu współdzielonego ma nazwę, uprawnienia ustawione i klucze dostępu. Ta opcja jest wypełniana automatycznie, chyba że zostanie wybrana opcja ręcznego udostępnienia ustawień usługi Event Hubs.
Klucz partycji Jest to pole opcjonalne, które jest dostępne tylko wtedy, gdy zadanie jest skonfigurowane do używania poziomu zgodności 1.2 lub nowszego. Jeśli dane wejściowe są partycjonowane przez właściwość, możesz dodać tutaj nazwę tej właściwości. Jest on używany do poprawy wydajności zapytania, jeśli zawiera klauzulę lub dla tej właściwości. Jeśli to zadanie używa poziomu zgodności 1.2 lub nowszego, to pole domyślnie ma wartość domyślną
Format serializacji zdarzeń Format serializacji (JSON, CSV, Avro) przychodzącego strumienia danych. Upewnij się, że format JSON jest zgodny ze specyfikacją i nie zawiera wiodących wartości 0 dla liczb dziesiętnych.
Kodowanie UTF-8 jest obecnie jedynym obsługiwanym formatem kodowania.
Typ kompresji zdarzeń Typ kompresji używany do odczytywania przychodzącego strumienia danych, takiego jak Brak (wartość domyślna), Gzip lub Deflate.
Rejestr schematów Możesz wybrać rejestr schematów ze schematami dla danych zdarzeń otrzymanych z centrum zdarzeń.

Gdy dane pochodzą z danych wejściowych strumienia usługi Event Hubs, masz dostęp do następujących pól metadanych w zapytaniu usługi Stream Analytics:

Właściwości opis
EventProcessedUtcTime Data i godzina przetwarzania zdarzenia przez usługę Stream Analytics.
EventEnqueuedUtcTime Data i godzina odebrania zdarzeń przez usługę Event Hubs.
Identyfikator partycji Identyfikator partycji opartej na zera dla karty wejściowej.

Na przykład przy użyciu tych pól można napisać zapytanie podobne do następującego przykładu:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Uwaga

W przypadku używania usługi Event Hubs jako punktu końcowego dla tras IoT Hub można uzyskać dostęp do metadanych IoT Hub przy użyciu funkcji GetMetadataPropertyValue.

Przesyłanie strumieniowe danych z IoT Hub

Azure IoT Hub to mechanizm przetwarzania zdarzeń typu publish-subscribe, wysoce skalowalny i zoptymalizowany pod kątem scenariuszy IoT.

Domyślny znacznik czasu zdarzeń pochodzących z IoT Hub w usłudze Stream Analytics to sygnatura czasowa, którą zdarzenie dotarło do IoT Hub, czyli EventEnqueuedUtcTime. Aby przetworzyć dane jako strumień przy użyciu znacznika czasu w ładunku zdarzenia, należy użyć słowa kluczowego TIMESTAMP BY .

IoT Hub grupy konsumentów

Należy skonfigurować każde wejście IoT Hub w usłudze Stream Analytics, aby miało własną grupę konsumentów. Gdy zadanie zawiera połączenie ze sobą lub ma wiele danych wejściowych, niektóre dane wejściowe mogą być odczytywane przez więcej niż jednego czytnika w dalszej części procesu. Ta sytuacja ma wpływ na liczbę czytelników w jednej grupie odbiorców. Aby uniknąć przekroczenia limitu Azure IoT Hub pięciu czytelników na grupę odbiorców na partycję, najlepszym rozwiązaniem jest wyznaczenie grupy odbiorców dla każdego zadania usługi Stream Analytics.

Skonfiguruj IoT Hub jako strumień danych wejściowych

W poniższej tabeli opisano każdą właściwość na stronie Nowe dane wejściowe w portalu Azure przy konfigurowaniu IoT Hub jako strumieniowe dane wejściowe.

Właściwości opis
Alias danych wejściowych Przyjazna nazwa, którą używasz w zapytaniu zadania, aby odwołać się do tego wejścia.
Subskrypcja Wybierz subskrypcję, w której istnieje zasób IoT Hub.
IoT Hub Nazwa IoT Hub, która ma być używana jako dane wejściowe.
Grupa konsumentów Zalecamy użycie innej grupy odbiorców dla każdego zadania usługi Stream Analytics. Grupa konsumentów jest używana do pobierania danych z IoT Hub. Usługa Stream Analytics używa grupy odbiorców $Default, chyba że określono inaczej.
Nazwa zasad dostępu współdzielonego Zasady dostępu współdzielonego, które zapewniają dostęp do IoT Hub. Każda zasada dostępu współdzielonego ma nazwę, uprawnienia ustawione i klucze dostępu.
Klucz zasad dostępu współdzielonego Klucz dostępu współużytkowanego używany do autoryzowania dostępu do IoT Hub. Ta opcja jest wypełniana automatycznie, chyba że wybierzesz opcję ręcznego podania ustawień IoT Hub.
Punkt końcowy Punkt końcowy dla IoT Hub.
Klucz partycji Jest to pole opcjonalne, które jest dostępne tylko wtedy, gdy zadanie jest skonfigurowane do używania poziomu zgodności 1.2 lub nowszego. Jeśli dane wejściowe są partycjonowane przez właściwość, możesz dodać tutaj nazwę tej właściwości. Służy do poprawy wydajności zapytania, jeśli zawiera klauzulę PARTITION BY lub GROUP BY dla tej właściwości. Jeśli to zadanie używa poziomu zgodności 1.2 lub nowszego, to pole domyślnie ma wartość "PartitionId".
Format serializacji zdarzeń Format serializacji (JSON, CSV, Avro) przychodzącego strumienia danych. Upewnij się, że format JSON jest zgodny ze specyfikacją i nie zawiera wiodących wartości 0 dla liczb dziesiętnych.
Kodowanie UTF-8 jest obecnie jedynym obsługiwanym formatem kodowania.
Typ kompresji zdarzeń Typ kompresji używany do odczytywania przychodzącego strumienia danych, takiego jak Brak (wartość domyślna), Gzip lub Deflate.

Kiedy używasz danych strumieniowych z IoT Hub, masz dostęp do następujących pól metadanych w zapytaniu Stream Analytics.

Właściwości opis
EventProcessedUtcTime Data i czas, w którym zdarzenie zostało przetworzone.
EventEnqueuedUtcTime Data i godzina odebrania zdarzenia przez IoT Hub.
Identyfikator partycji Identyfikator partycji opartej na zera dla karty wejściowej.
IoTHub.MessageId Identyfikator, który jest używany do korelowania dwukierunkowej komunikacji w IoT Hub.
IoTHub.CorrelationId Identyfikator używany w odpowiedziach na wiadomości i informacjach zwrotnych w IoT Hub.
IoTHub.ConnectionDeviceId Identyfikator uwierzytelniania używany do wysyłania tej wiadomości. Ta wartość jest umieszczana na komunikatach powiązanych z usługą przez IoT Hub.
IoTHub.ConnectionDeviceGenerationId Identyfikator generacji uwierzytelnionego urządzenia, który został użyty do wysłania tej wiadomości. Ta wartość jest oznaczana na komunikatach związanych z usługą przez Centrum IoT.
IoTHub.EnqueuedTime Godzina odebrania komunikatu przez IoT Hub.

Strumieniowanie danych z Blob Storage lub Data Lake Storage Gen2

W przypadku scenariuszy z dużymi ilościami danych bez struktury do przechowywania w chmurze Azure Blob Storage lub Azure Data Lake Storage Gen2 oferuje ekonomiczne i skalowalne rozwiązanie. Dane w Azure Blob Storage lub Azure Data Lake Storage Gen2 są uważane za dane spoczynkowe. Jednak te dane mogą być przetwarzane jako strumień danych przez usługę Stream Analytics.

Przetwarzanie dzienników jest często używanym scenariuszem używania takich danych wejściowych z usługą Stream Analytics. W tym scenariuszu pliki danych telemetrycznych są przechwytywane z systemu i muszą zostać przeanalizowane i przetworzone w celu wyodrębnienia znaczących danych.

Domyślny znacznik czasu zdarzenia usługi Blob Storage lub Azure Data Lake Storage Gen2 w usłudze Stream Analytics to znacznik czasu ostatniej modyfikacji, który jest BlobLastModifiedUtcTime. Jeśli obiekt blob zostanie przekazany do konta magazynu o godzinie 13:00, a zadanie Azure Stream Analytics zostanie uruchomione przy użyciu opcji Now o godzinie 13:01, nie zostanie ono odebrane, ponieważ jego zmodyfikowany czas spadnie poza okresem wykonywania zadania.

Jeśli obiekt blob zostanie przekazany do kontenera konta magazynu o godzinie 13:00, a zadanie Azure Stream Analytics zostanie uruchomione z użyciem Czasu niestandardowego o godzinie 13:00 lub wcześniej, obiekt blob zostanie odebrany, ponieważ jego zmodyfikowany czas mieści się w okresie wykonywania zadania.

Jeśli zadanie Azure Stream Analytics zostanie uruchomione przy użyciu Now o godzinie 13:00, a blob zostanie umieszczony w kontenerze konta magazynowego o godzinie 13:01, Azure Stream Analytics pobiera blob. Sygnatura czasowa przypisana do każdego obiektu blob jest oparta tylko na parametrze . Folder, w którym znajduje się obiekt blob, nie ma relacji z przypisanym znacznikem czasu. Jeśli na przykład istnieje obiekt blob z wartością , znacznik czasu przypisany do tego obiektu blob to .

Aby przetworzyć dane jako strumień przy użyciu znacznika czasu w ładunku zdarzenia, należy użyć słowa kluczowego TIMESTAMP BY . Zadanie usługi Stream Analytics ściąga dane z usługi Azure Blob Storage lub Azure Data Lake Storage Gen2 jako dane wejściowe co sekundę, pod warunkiem że plik obiektu blob jest dostępny. Jeśli plik obiektu blob jest niedostępny, istnieje wycofywanie wykładnicze z maksymalnym opóźnieniem czasu wynoszącym 90 sekund.

Uwaga

Usługa Stream Analytics nie obsługuje dodawania treści do istniejącego pliku blob. Usługa Stream Analytics będzie wyświetlać każdy plik tylko raz, a wszelkie zmiany występujące w pliku po odczytaniu danych przez zadanie nie są przetwarzane. Najlepszym rozwiązaniem jest przekazanie wszystkich danych dla pliku obiektu blob jednocześnie, a następnie dodanie kolejnych nowszych zdarzeń do innego, nowego pliku obiektów blob.

W scenariuszach, w których wiele obiektów blob jest stale dodawanych, a usługa Stream Analytics przetwarza je w miarę dodawania, istnieje możliwość pominięcia niektórych obiektów w rzadkich przypadkach ze względu na stopień szczegółowości . Ten przypadek można rozwiązać, przesyłając bloby w odstępach co najmniej dwóch sekund. Jeśli ta opcja nie jest możliwa, możesz użyć usługi Event Hubs do przesyłania strumieniowego dużych ilości zdarzeń.

Konfigurowanie usługi Blob Storage jako danych wejściowych strumienia

W poniższej tabeli opisano każdą właściwość w Nowe dane wejściowe stronie w portalu Azure podczas konfigurowania magazynu obiektów blob jako danych wejściowych strumienia.

Właściwości opis
Alias danych wejściowych Przyjazna nazwa, którą używasz w zapytaniu zadania, aby odwołać się do tego wejścia.
Subskrypcja Wybierz subskrypcję, w której istnieje zasób pamięci masowej.
Konto magazynu Nazwa konta magazynu, na którym znajdują się pliki obiektów blob.
Klucz konta magazynu Klucz tajny skojarzony z kontem magazynowym. Ta opcja jest wypełniana automatycznie, chyba że wybierzesz opcję ręcznego podania ustawień.
Kontener Kontenery stosuje się do logicznego grupowania obiektów blob. Możesz wybrać opcję Użyj istniejącego kontenera lub Utwórz nowy , aby utworzyć nowy kontener.
Tryb uwierzytelniania Określ typ uwierzytelniania, którego chcesz użyć do nawiązania połączenia z kontem magazynowania. Aby się uwierzytelnić do konta przechowywania, można użyć ciągu połączenia lub tożsamości zarządzanej. W przypadku opcji tożsamości zarządzanej można utworzyć tożsamość zarządzaną przypisaną przez system do zadania Stream Analytics lub przypisaną przez użytkownika tożsamość zarządzaną do uwierzytelniania przy użyciu konta magazynu. W przypadku korzystania z tożsamości zarządzanej tożsamość zarządzana musi być członkiem odpowiedniej roli na koncie magazynu.
Wzorzec ścieżki (opcjonalnie) Ścieżka pliku używana do lokalizowania blobów w określonym kontenerze. Jeśli chcesz odczytywać obiekty blob z głównego katalogu kontenera, nie ustawiaj wzorca ścieżki. W ścieżce można określić jedno lub więcej wystąpień następujących trzech zmiennych: , , lub

Przykład 1:

Przykład 2:

Znak nie jest dozwoloną wartością prefiksu ścieżki. Dozwolone są tylko prawidłowe znaki dla obiektów blob Azure. Nie dołączaj nazw kontenerów ani nazw plików.
Format daty (opcjonalnie) Jeśli używasz zmiennej daty w ścieżce, pliki będą zorganizowane według formatu daty. Przykład:

Gdy dane wejściowe obiektu blob mają lub w ścieżce, foldery są przeglądane w porządku chronologicznym.
Format czasu (opcjonalnie) Jeśli używasz zmiennej czasowej w ścieżce, format czasu, w którym są zorganizowane pliki. Obecnie jedyną obsługiwaną wartością jest liczba godzin.
Klucz partycji Jest to pole opcjonalne, które jest dostępne tylko wtedy, gdy zadanie jest skonfigurowane do używania poziomu zgodności 1.2 lub nowszego. Jeśli dane wejściowe są partycjonowane przez właściwość, możesz dodać tutaj nazwę tej właściwości. Służy do poprawy wydajności zapytania, jeśli zawiera klauzulę PARTITION BY lub GROUP BY dla tej właściwości. Jeśli to zadanie używa poziomu zgodności 1.2 lub nowszego, to pole domyślnie ma wartość "PartitionId".
Liczba partycji wejściowych To pole jest obecne tylko wtedy, gdy element {partition} jest obecny we wzorcu ścieżki. Wartość tej właściwości jest liczbą całkowitą =1. Gdziekolwiek {partition} pojawia się w pathPattern, zostanie użyta liczba z zakresu od 0 do wartości tego pola -1.
Format serializacji zdarzeń Format serializacji (JSON, CSV, Avro) przychodzącego strumienia danych. Upewnij się, że format JSON jest zgodny ze specyfikacją i nie zawiera wiodących wartości 0 dla liczb dziesiętnych.
Kodowanie W przypadku plików CSV i JSON format UTF-8 jest obecnie jedynym obsługiwanym formatem kodowania.
Kompresja Typ kompresji używany do odczytywania przychodzącego strumienia danych, takiego jak Brak (wartość domyślna), Gzip lub Deflate.

Gdy dane pochodzą ze źródła usługi Blob Storage, masz dostęp do następujących pól metadanych w zapytaniu usługi Stream Analytics:

Właściwości opis
Nazwa obiektu blob Nazwa wejściowego obiektu blob, z którego pochodzi zdarzenie.
EventProcessedUtcTime Data i godzina przetwarzania zdarzenia przez usługę Stream Analytics.
BlobLastModifiedUtcTime Data i godzina ostatniej modyfikacji blobu.
Identyfikator partycji Identyfikator partycji opartej na zera dla karty wejściowej.

Na przykład przy użyciu tych pól można napisać zapytanie podobne do następującego przykładu:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Przesyłanie strumieniowe danych z platformy Apache Kafka

Azure Stream Analytics umożliwia bezpośrednie łączenie się z klastrami platformy Apache Kafka w celu pozyskiwania danych. Rozwiązanie bazuje na podejściu z małą ilością kodu i jest całkowicie zarządzane przez zespół Azure Stream Analytics firmy Microsoft, co pozwala mu spełniać standardy zgodności z wymaganiami biznesowymi. Dane wejściowe platformy Kafka są zgodne z poprzednimi wersjami i obsługują wszystkie wersje z najnowszą wersją klienta, począwszy od wersji 0.10. Użytkownicy mogą łączyć się z klastrami platformy Kafka w sieci wirtualnej i klastrami Kafka z publicznym punktem końcowym w zależności od konfiguracji. Konfiguracja opiera się na istniejących konwencjach konfiguracji platformy Kafka. Obsługiwane typy kompresji to None, Gzip, Snappy, LZ4 i Zstd.

Aby uzyskać więcej informacji, zobacz strumieniowe przesyłanie danych z platformy Kafka do Azure Stream Analytics (wersja zapoznawcza).

Następne kroki