Używanie przetwarzania równoległego zapytań w usłudze Azure Stream Analytics
W tym artykule pokazano, jak korzystać z równoległości w usłudze Azure Stream Analytics. Dowiedz się, jak skalować zadania usługi Stream Analytics, konfigurując partycje wejściowe i dostrajając definicję zapytania analizy.
W ramach wymagań wstępnych warto zapoznać się z pojęciem jednostki przesyłania strumieniowego opisanego w artykule Omówienie i dostosowywanie jednostek przesyłania strumieniowego.
Jakie są części zadania usługi Stream Analytics?
Definicja zadania usługi Stream Analytics zawiera co najmniej jedno dane wejściowe przesyłania strumieniowego, zapytanie i dane wyjściowe. Dane wejściowe to miejsce, z którego zadanie odczytuje strumień danych. Zapytanie służy do przekształcania strumienia wejściowego danych, a dane wyjściowe to miejsce, do którego zadanie wysyła wyniki zadania.
Partycje w danych wejściowych i wyjściowych
Partycjonowanie umożliwia podzielenie danych na podzestawy na podstawie klucza partycji. Jeśli dane wejściowe (na przykład Event Hubs) są partycjonowane przez klucz, zalecamy określenie klucza partycji podczas dodawania danych wejściowych do zadania usługi Stream Analytics. Skalowanie zadania usługi Stream Analytics wykorzystuje partycje w danych wejściowych i wyjściowych. Zadanie usługi Stream Analytics może używać i zapisywać różne partycje równolegle, co zwiększa przepływność.
Dane wejściowe
Wszystkie dane wejściowe przesyłania strumieniowego usługi Azure Stream Analytics mogą korzystać z partycjonowania: Event Hubs, IoT Hub, Blob Storage, Data Lake Storage Gen2.
Uwaga
W przypadku poziomu zgodności 1.2 lub nowszego klucz partycji należy ustawić jako właściwość wejściową, bez konieczności wprowadzania słowa kluczowego PARTITION BY w zapytaniu. Aby uzyskać poziom zgodności 1.1 i poniżej, należy zamiast tego zdefiniować klucz partycji za pomocą słowa kluczowego PARTITION BY w zapytaniu.
Dane wyjściowe
Podczas pracy z usługą Stream Analytics możesz korzystać z partycjonowania w danych wyjściowych:
- Azure Data Lake Storage
- Azure Functions
- Tabela platformy Azure
- Magazyn obiektów blob (może jawnie ustawić klucz partycji)
- Azure Cosmos DB (należy jawnie ustawić klucz partycji)
- Event Hubs (należy jawnie ustawić klucz partycji)
- Usługa IoT Hub (należy jawnie ustawić klucz partycji)
- Service Bus
- Sql i Azure Synapse Analytics z opcjonalnym partycjonowaniem: zobacz więcej informacji na stronie Dane wyjściowe w usłudze Azure SQL Database.
Usługa Power BI nie obsługuje partycjonowania. Można jednak nadal partycjonować dane wejściowe zgodnie z opisem w tej sekcji.
Aby uzyskać więcej informacji na temat partycji, zobacz następujące artykuły:
- Event Hubs features overview (Omówienie funkcji usługi Event Hubs)
- Data partitioning (Partycjonowanie danych)
Query
Aby zadanie było równoległe, klucze partycji muszą być wyrównane między wszystkimi danymi wejściowymi, wszystkimi krokami logiki zapytań i wszystkimi danymi wyjściowymi. Partycjonowanie logiki zapytań jest określane przez klucze używane do sprzężeń i agregacji (GROUP BY). To ostatnie wymaganie można zignorować, jeśli logika zapytania nie jest kluczem (projekcja, filtry, sprzężenia odwołań...).
- Jeśli dane wejściowe i wyjściowe są partycjonowane przez
WarehouseId
element , a grupy zapytań wedługProductId
bezWarehouseId
, zadanie nie jest równoległe. - Jeśli dwa dane wejściowe do sprzężenia są partycjonowane przez różne klucze partycji (
WarehouseId
iProductId
), zadanie nie jest równoległe. - Jeśli co najmniej dwa niezależne przepływy danych znajdują się w jednym zadaniu, każde z własnym kluczem partycji, zadanie nie jest równoległe.
Tylko wtedy, gdy wszystkie dane wejściowe, dane wyjściowe i kroki zapytania używają tego samego klucza, zadanie jest równoległe.
Żenujące zadania równoległe
Żenujące zadanie równoległe to najbardziej skalowalny scenariusz w usłudze Azure Stream Analytics. Łączy jedną partycję danych wejściowych z jednym wystąpieniem zapytania z jedną partycją danych wyjściowych. Ten równoległość ma następujące wymagania:
Jeśli logika zapytania zależy od tego samego klucza przetwarzanego przez to samo wystąpienie zapytania, upewnij się, że zdarzenia przechodzą do tej samej partycji danych wejściowych. W przypadku usługi Event Hubs lub usługi IoT Hub oznacza to, że dane zdarzenia muszą mieć ustawioną wartość PartitionKey . Alternatywnie można użyć podzielonych na partycje nadawców. W przypadku magazynu obiektów blob oznacza to, że zdarzenia są wysyłane do tego samego folderu partycji. Przykładem może być wystąpienie zapytania, które agreguje dane na identyfikator użytkownika, gdzie wejściowe centrum zdarzeń jest partycjonowane przy użyciu identyfikatora userID jako klucza partycji. Jeśli jednak logika zapytania nie wymaga przetworzenia tego samego klucza przez to samo wystąpienie zapytania, możesz zignorować to wymaganie. Przykładem tej logiki jest proste zapytanie select-project-filter.
Następnym krokiem jest podzielenie zapytania na partycje. W przypadku zadań z poziomem zgodności 1.2 lub nowszym (zalecane) można określić kolumnę niestandardową jako Klucz partycji w ustawieniach wejściowych, a zadanie będzie automatycznie równoległe. Zadania z poziomem zgodności 1.0 lub 1.1 wymagają użycia identyfikatora PARTITION BY PartitionId we wszystkich krokach zapytania. Dozwolone są wiele kroków, ale wszystkie muszą być podzielone na partycje przy użyciu tego samego klucza.
Większość danych wyjściowych obsługiwanych w usłudze Stream Analytics może korzystać z partycjonowania. Jeśli używasz typu danych wyjściowych, który nie obsługuje partycjonowania zadania, nie będzie kłopotliwie równoległy. W przypadku danych wyjściowych usługi Event Hubs upewnij się, że kolumna Klucz partycji jest ustawiona na ten sam klucz partycji używany w zapytaniu. Aby uzyskać więcej informacji, zobacz sekcję danych wyjściowych.
Liczba partycji wejściowych musi być równa liczbie partycji wyjściowych. Dane wyjściowe usługi Blob Storage mogą obsługiwać partycje i dziedziczyć schemat partycjonowania zapytania nadrzędnego. Po określeniu klucza partycji dla usługi Blob Storage dane są partycjonowane na partycję wejściową, co powoduje, że wynik jest nadal w pełni równoległy. Oto przykłady wartości partycji, które zezwalają na w pełni równoległe zadanie:
- Osiem partycji wejściowych centrum zdarzeń i osiem partycji wyjściowych centrum zdarzeń
- Osiem partycji wejściowych centrum zdarzeń i danych wyjściowych magazynu obiektów blob
- Osiem partycji wejściowych centrum zdarzeń i danych wyjściowych magazynu obiektów blob podzielonych na partycje według pola niestandardowego z dowolną kardynalnością
- Osiem partycji wejściowych magazynu obiektów blob i danych wyjściowych magazynu obiektów blob
- Osiem partycji wejściowych magazynu obiektów blob i osiem partycji wyjściowych centrum zdarzeń
W poniższych sekcjach omówiono kilka przykładowych scenariuszy, które są żenujące równoległe.
Proste zapytanie
- Dane wejściowe: centrum zdarzeń z ośmioma partycjami
- Dane wyjściowe: Centrum zdarzeń z ośmioma partycjami ("Kolumna klucza partycji" musi być ustawiona na wartość
PartitionId
)
Zapytanie:
--Using compatibility level 1.2 or above
SELECT TollBoothId
FROM Input1
WHERE TollBoothId > 100
--Using compatibility level 1.0 or 1.1
SELECT TollBoothId
FROM Input1 PARTITION BY PartitionId
WHERE TollBoothId > 100
To zapytanie jest prostym filtrem. W związku z tym nie musimy martwić się o partycjonowanie danych wejściowych wysyłanych do centrum zdarzeń. Zwróć uwagę, że zadania z poziomem zgodności przed 1.2 muszą zawierać klauzulę PARTITION BY PartitionId , więc spełnia wymagania nr 2 z wcześniejszej wersji. W przypadku danych wyjściowych musimy skonfigurować dane wyjściowe centrum zdarzeń w zadaniu, aby klucz partycji był ustawiony na PartitionId. Ostatnim sprawdzeniem jest upewnienie się, że liczba partycji wejściowych jest równa liczbie partycji wyjściowych.
Wykonywanie zapytań przy użyciu klucza grupowania
- Dane wejściowe: Centrum zdarzeń z ośmioma partycjami
- Dane wyjściowe: Blob Storage
Zapytanie:
--Using compatibility level 1.2 or above
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId
--Using compatibility level 1.0 or 1.1
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
To zapytanie ma klucz grupowania. W związku z tym zdarzenia zgrupowane razem muszą być wysyłane do tej samej partycji usługi Event Hubs. Ponieważ w tym przykładzie grupujemy według identyfikatora TollBoothID, upewnij się, że TollBoothID
jest on używany jako klucz partycji, gdy zdarzenia są wysyłane do usługi Event Hubs. Następnie w usłudze Azure Stream Analytics można użyć identyfikatora PARTITION BY PartitionId , aby dziedziczyć z tego schematu partycji i włączyć pełną równoległości. Ponieważ dane wyjściowe są magazynem obiektów blob, nie musimy martwić się o skonfigurowanie wartości klucza partycji zgodnie z wymaganiami nr 4.
Przykład scenariuszy, które nie są* żenujące równoległe
W poprzedniej sekcji artykuł obejmował niektóre kłopotliwe scenariusze równoległe. W tej sekcji dowiesz się więcej na temat scenariuszy, które nie spełniają wszystkich wymagań, które mają być kłopotliwie równoległe.
Niezgodna liczba partycji
- Dane wejściowe: centrum zdarzeń z ośmioma partycjami
- Dane wyjściowe: centrum zdarzeń z 32 partycjami
Jeśli liczba partycji wejściowych nie jest zgodna z liczbą partycji wyjściowych, topologia nie jest kłopotliwie równoległa niezależnie od zapytania. Jednak nadal możemy uzyskać pewien poziom równoległości.
Wykonywanie zapytań przy użyciu danych wyjściowych bez partycjonowania
- Dane wejściowe: centrum zdarzeń z ośmioma partycjami
- Dane wyjściowe: Power BI
Dane wyjściowe usługi Power BI nie obsługują obecnie partycjonowania. W związku z tym ten scenariusz nie jest żenujący równoległy.
Zapytanie wieloetapowe z różnymi wartościami PARTITION BY
- Dane wejściowe: Centrum zdarzeń z ośmioma partycjami
- Dane wyjściowe: Centrum zdarzeń z ośmioma partycjami
- Poziom zgodności: 1.0 lub 1.1
Zapytanie:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId, PartitionId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1 Partition By TollBoothId
GROUP BY TumblingWindow(minute, 3), TollBoothId
Jak widać, drugi krok używa identyfikatora TollBoothId jako klucza partycjonowania. Ten krok nie jest taki sam jak pierwszy krok i dlatego wymaga od nas przetasowania.
Zapytanie wieloetapowe z różnymi wartościami PARTITION BY
- Dane wejściowe: Centrum zdarzeń z ośmioma partycjami ("Kolumna klucza partycji" nie jest ustawiona, wartość domyślna to "PartitionId")
- Dane wyjściowe: Centrum zdarzeń z ośmioma partycjami ("Kolumna klucza partycji" musi być ustawiona na wartość "TollBoothId")
- Poziom zgodności — 1.2 lub nowszy
Zapytanie:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId
Poziom zgodności 1.2 lub nowszy domyślnie umożliwia równoległe wykonywanie zapytań. Na przykład zapytanie z poprzedniej sekcji zostanie podzielone na partycje, o ile kolumna "TollBoothId" jest ustawiona jako klucz partycji wejściowej. Klauzula PARTITION BY PartitionId nie jest wymagana.
Obliczanie maksymalnej liczby jednostek przesyłania strumieniowego zadania
Łączna liczba jednostek przesyłania strumieniowego, które mogą być używane przez zadanie usługi Stream Analytics, zależy od liczby kroków w zapytaniu zdefiniowanym dla zadania i liczby partycji dla każdego kroku.
Kroki w zapytaniu
Zapytanie może zawierać jeden lub wiele kroków. Każdy krok jest podzapytaniem zdefiniowanym przez słowo kluczowe WITH . Zapytanie spoza słowa kluczowego WITH (tylko jedno zapytanie) jest również liczone jako krok, na przykład instrukcja SELECT w następującym zapytaniu:
Zapytanie:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute,3), TollBoothId
To zapytanie ma dwa kroki.
Uwaga
To zapytanie zostało szczegółowo omówione w dalszej części artykułu.
Partycjonowanie kroku
Partycjonowanie kroku wymaga następujących warunków:
- Źródło danych wejściowych musi być podzielone na partycje.
- Instrukcja SELECT zapytania musi odczytywać dane z partycjonowanego źródła danych wejściowych.
- Zapytanie w kroku musi mieć słowo kluczowe PARTITION BY .
Gdy zapytanie jest partycjonowane, zdarzenia wejściowe są przetwarzane i agregowane w oddzielnych grupach partycji, a zdarzenia wyjściowe są generowane dla każdej z grup. Jeśli chcesz połączyć agregację, musisz utworzyć drugi krok niepartycyjny w celu agregowania.
Obliczanie maksymalnej liczby jednostek przesyłania strumieniowego dla zadania
Wszystkie niepartycyjne kroki razem mogą być skalowane w górę do jednej jednostki przesyłania strumieniowego (SU V2) dla zadania usługi Stream Analytics. Ponadto można dodać jedną jedną jednostki SU V2 dla każdej partycji w kroku podzielonym na partycje. W poniższej tabeli przedstawiono kilka przykładów .
Query | Maksymalna liczba jednostek jednostki SU dla zadania |
---|---|
|
1 SU V2 |
|
16 SU V2 (1 * 16 partycji) |
|
1 SU V2 |
|
4 SU V2s (3 dla kroków partycjonowanych + 1 dla niepartycjonowanych kroków |
Przykłady skalowania
Poniższe zapytanie oblicza liczbę samochodów w trzyminutowym oknie przechodzącym przez stację opłat, która ma trzy opłaty. To zapytanie można skalować w górę do jednego SU V2.
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
Aby użyć większej liczby jednostek przesyłania strumieniowego dla zapytania, zarówno strumień danych wejściowych, jak i zapytanie musi być partycjonowane. Ponieważ partycja strumienia danych jest ustawiona na 3, następujące zmodyfikowane zapytanie można skalować w górę do 3 jednostek SU V2:
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
Gdy zapytanie jest partycjonowane, zdarzenia wejściowe są przetwarzane i agregowane w oddzielnych grupach partycji. Zdarzenia wyjściowe są również generowane dla każdej z grup. Partycjonowanie może spowodować nieoczekiwane wyniki, gdy pole GROUP BY nie jest kluczem partycji w strumieniu danych wejściowych. Na przykład pole TollBoothId w poprzednim zapytaniu nie jest kluczem partycji input1. Wynikiem jest to, że dane z TollBooth #1 mogą być rozłożone w wielu partycjach.
Każda z partycji Input1 będzie przetwarzana oddzielnie przez usługę Stream Analytics. W rezultacie zostanie utworzonych wiele rekordów liczby samochodów dla tego samego tollbooth w tym samym oknie wirowania. Jeśli nie można zmienić klucza partycji wejściowej, ten problem można rozwiązać, dodając krok niepartycyjny do agregacji wartości między partycjami, jak w poniższym przykładzie:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId
To zapytanie można skalować do 4 jednostek SU V2.
Uwaga
Jeśli łączysz dwa strumienie, upewnij się, że strumienie są partycjonowane przy użyciu klucza partycji kolumny używanej do tworzenia sprzężeń. Upewnij się również, że masz taką samą liczbę partycji w obu strumieniach.
Osiąganie większej przepływności na dużą skalę
Żenujące zadanie równoległe jest konieczne, ale nie wystarczające do utrzymania wyższej przepływności na dużą skalę. Każdy system magazynowania i odpowiadające mu dane wyjściowe usługi Stream Analytics mają różnice w sposobie osiągnięcia najlepszej możliwej przepływności zapisu. Podobnie jak w przypadku dowolnego scenariusza na dużą skalę, istnieją pewne wyzwania, które można rozwiązać przy użyciu odpowiednich konfiguracji. W tej sekcji omówiono konfiguracje dla kilku typowych danych wyjściowych i przedstawiono przykłady umożliwiające utrzymanie współczynników pozyskiwania 1 K, 5 K i 10 K na sekundę.
Poniższe obserwacje używają zadania usługi Stream Analytics z zapytaniem bezstanowym (przekazywaniem), podstawową funkcją zdefiniowaną przez użytkownika języka JavaScript, która zapisuje dane w usłudze Event Hubs, Azure SQL lub Azure Cosmos DB.
Event Hubs
Szybkość pozyskiwania (zdarzenia na sekundę) | Jednostki przesyłania strumieniowego | Zasoby wyjściowe |
---|---|---|
1 K | 1/3 | 2 TU |
5 K | 1 | 6 TU |
10 tys. | 2 | 10 TU |
Rozwiązanie Event Hubs skaluje liniowo pod względem jednostek przesyłania strumieniowego (SU) i przepływności, dzięki czemu jest to najbardziej wydajny i wydajny sposób analizowania i przesyłania strumieniowego danych z usługi Stream Analytics. Zadania można skalować w górę do 66 SU V2s, co w przybliżeniu przekłada się na przetwarzanie do 400 MB/s lub 38 bilionów zdarzeń dziennie.
Azure SQL
Szybkość pozyskiwania (zdarzenia na sekundę) | Jednostki przesyłania strumieniowego | Zasoby wyjściowe |
---|---|---|
1 K | 2/3 | S3 |
5 K | 3 | P4 |
10 tys. | 6 | P6 |
Usługa Azure SQL obsługuje równoległe pisanie o nazwie Dziedzicz partycjonowanie, ale nie jest domyślnie włączona. Jednak włączenie funkcji Dziedziczenie partycjonowania wraz z w pełni równoległym zapytaniem może nie być wystarczające do osiągnięcia większej przepływności. Przepływność zapisu SQL zależy znacznie od konfiguracji bazy danych i schematu tabeli. Artykuł Wydajność danych wyjściowych SQL zawiera więcej szczegółów na temat parametrów, które mogą zmaksymalizować przepływność zapisu. Jak wspomniano w artykule Dotyczącym danych wyjściowych usługi Azure Stream Analytics w usłudze Azure SQL Database , to rozwiązanie nie jest skalowane liniowo jako w pełni równoległy potok poza 8 partycjami i może wymagać ponownej partycjonowania przed danymi wyjściowymi SQL (zobacz INTO). Jednostki SKU w warstwie Premium są potrzebne do utrzymania wysokich stawek operacji we/wy wraz z obciążeniem podczas wykonywania kopii zapasowych dzienników co kilka minut.
Azure Cosmos DB
Szybkość pozyskiwania (zdarzenia na sekundę) | Jednostki przesyłania strumieniowego | Zasoby wyjściowe |
---|---|---|
1 K | 2/3 | 20 K RU |
5 K | 100 | 60 K RU |
10 tys. | 8 | 120 K RU |
Dane wyjściowe usługi Azure Cosmos DB z usługi Stream Analytics zostały zaktualizowane w celu korzystania z natywnej integracji na poziomie zgodności 1.2. Poziom zgodności 1.2 zapewnia znacznie większą przepływność i zmniejsza zużycie jednostek RU w porównaniu z 1,1, co jest domyślnym poziomem zgodności dla nowych zadań. Rozwiązanie korzysta z kontenerów usługi Azure Cosmos DB podzielonych na partycje na /deviceId, a reszta rozwiązania jest identycznie skonfigurowana.
Wszystkie przykłady przesyłania strumieniowego na dużą skalę platformy Azure używają usługi Event Hubs jako danych wejściowych, które są przekazywane przez obciążenie symulujące klientów testowych. Każde zdarzenie wejściowe to dokument JSON o rozmiarze 1 KB, który łatwo tłumaczy skonfigurowane współczynniki pozyskiwania na współczynniki przepływności (1 MB/s, 5 MB/s i 10 MB/s). Zdarzenia symulują urządzenie IoT wysyłające następujące dane JSON (w skróconej postaci) dla maksymalnie 1000 urządzeń:
{
"eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
"complexData": {
"moreData0": 51.3068118685458,
"moreData22": 45.34076957651598
},
"value": 49.02278128887753,
"deviceId": "contoso://device-id-1554",
"type": "CO2",
"createdAt": "2019-05-16T17:16:40.000003Z"
}
Uwaga
Konfiguracje mogą ulec zmianie ze względu na różne składniki używane w rozwiązaniu. Aby uzyskać dokładniejsze oszacowanie, dostosuj przykłady, aby pasowały do danego scenariusza.
Identyfikowanie wąskich gardeł
Użyj okienka Metryki w zadaniu usługi Azure Stream Analytics, aby zidentyfikować wąskie gardła w potoku. Przejrzyj zdarzenia wejściowe/wyjściowe pod kątem przepływności i "Opóźnienie znaku wodnego" lub Zdarzenia z zaległymi, aby sprawdzić, czy zadanie utrzymuje szybkość wprowadzania. W przypadku metryk usługi Event Hubs poszukaj żądań ograniczonych i odpowiednio dostosuj jednostki progowe. W przypadku metryk usługi Azure Cosmos DB zapoznaj się z artykułem Maksymalna liczba jednostek RU/s na zakres kluczy partycji w obszarze Przepływność, aby upewnić się, że zakresy kluczy partycji są równomiernie używane. W przypadku usługi Azure SQL DB monitoruj we/ wy dzienników i procesor CPU.
Uzyskaj pomoc
Aby uzyskać dalszą pomoc, wypróbuj stronę pytań i odpowiedzi firmy Microsoft dotyczącą usługi Azure Stream Analytics.
Następne kroki
- Wprowadzenie do usługi Azure Stream Analytics
- Get started using Azure Stream Analytics (Rozpoczynanie pracy z usługą Azure Stream Analytics)
- Azure Stream Analytics Query Language Reference (Dokumentacja dotycząca języka zapytań usługi Azure Stream Analytics)
- Azure Stream Analytics Management REST API Reference (Dokumentacja interfejsu API REST zarządzania usługą Azure Stream Analytics)