Sdílet prostřednictvím


Streamování dat jako vstupu do Stream Analytics

Stream Analytics se integruje s Azure datovými proudy jako vstupy od pěti typů zdrojů.

Tyto vstupní prostředky mohou existovat ve stejném předplatném Azure jako vaše úloha Stream Analytics nebo v jiném předplatném.

Komprese

Stream Analytics podporuje kompresi pro všechny vstupní zdroje. Podporované typy komprese jsou: Žádné, Gzip a Deflate. Stream Analytics nepodporuje kompresi referenčních dat. Pokud jsou vstupní data komprimovaná Avro, Stream Analytics je zpracuje automaticky. U serializace Avro nemusíte zadávat typ komprese.

Vytváření, úpravy nebo testování vstupů

Pomocí portálu Azure, Visual Studio a Visual Studio Code můžete přidat a zobrazit nebo upravit existující vstupy úlohy streamování. Vstupní připojení a testovací dotazy můžete také testovat z ukázkových dat z portálu Azure, Visual Studio a Visual Studio Code. Při psaní dotazu vypíšete vstup v klauzuli FROM. Seznam dostupných vstupů můžete získat na stránce Dotaz na portálu. Pokud chcete použít více vstupů, JOIN můžete je zadat nebo napsat více SELECT dotazů.

Poznámka:

Pro nejlepší místní prostředí pro vývoj použijte nástroje Stream Analytics pro Visual Studio Code. V nástrojích Stream Analytics pro Visual Studio 2019 (verze 2.6.3000.0) existují známé mezery ve funkcích a nebude se v budoucnu zlepšovat.

Streamování dat z Event Hubs

Azure Event Hubs je vysoce škálovatelný ingestor událostí publikování a odběru. Centrum událostí může shromažďovat miliony událostí za sekundu, abyste mohli zpracovávat a analyzovat obrovské objemy dat vytvořených připojenými zařízeními a aplikacemi. Služba Event Hubs a Stream Analytics společně poskytují ucelené řešení pro analýzy v reálném čase. Event Hubs odesílá události do Azure v reálném čase a úlohy Stream Analytics tyto události zpracovávají v reálném čase. Do služby Event Hubs můžete například posílat webové kliknutí, odečty snímačů nebo online záznamy protokolu. Pak můžete vytvořit úlohy Stream Analytics, které budou používat službu Event Hubs pro vstupní data pro filtrování, agregaci a korelaci v reálném čase.

EventEnqueuedUtcTime je časové razítko příchodu události do centra událostí a je výchozím časovým razítkem událostí přicházejících ze služby Event Hubs do Stream Analytics. Pokud chcete data zpracovat jako datový proud pomocí časového razítka v datové části události, musíte použít klíčové slovo TIMESTAMP BY .

Skupiny příjemců služby Event Hubs

Nakonfigurujte každý vstup centra událostí s vlastní skupinou příjemců. Pokud úloha obsahuje samoobslužné spojení nebo má více vstupů, může několik vstupů přečíst více než jeden čtenář podřízený. Tato situace ovlivňuje počet čtenářů v jedné skupině příjemců. Aby nedošlo k překročení limitu pěti čtenářů na skupinu příjemců a oddíl v rámci služby Event Hubs, určete skupinu příjemců pro každou úlohu Stream Analytics. Pro centrum událostí úrovně Standard je také limit 20 skupin příjemců. Další informace najdete v kapitole Odstraňování potíží s vstupy Azure Stream Analytics.

Vytvoření vstupu ze služby Event Hubs

Následující tabulka vysvětluje jednotlivé vlastnosti na stránce Nový vstup na portálu Azure pro streamování vstupu dat z centra událostí:

Vlastnost Popis
Vstupní alias Popisný název, který použijete v dotazu úlohy k odkazování na tento vstup.
Subscription Zvolte předplatné Azure, ve kterém se nachází prostředek Event Hub.
Obor názvů Event Hub Obor názvů služby Event Hubs je kontejner pro centra událostí. Když vytvoříte centrum událostí, vytvoříte také obor názvů.
Název centra událostí Název centra událostí, který se má použít jako vstup.
Skupina příjemců centra událostí (doporučeno) Pro každou úlohu Stream Analytics použijte odlišnou skupinu příjemců. Tento řetězec identifikuje skupinu příjemců, která se má použít k načtení dat z událostního centra. Pokud nezadáte skupinu příjemců, použije $Default úloha Stream Analytics skupinu příjemců.
Režim ověřování Zadejte typ ověřování, které chcete použít pro připojení k centru událostí. K ověření v centru událostí použijte connection string nebo spravovanou identitu. U možnosti spravované identity můžete vytvořit spravovanou identitu přiřazenou systémem pro úlohu Stream Analytics nebo spravovanou identitu přiřazenou uživatelem pro ověření v centru událostí. Pokud používáte spravovanou identitu, musí být spravovaná identita členem role správce dat Azure Event Hubs nebo role vlastníka dat Azure Event Hubs.
Název zásady Event Hub Zásady sdíleného přístupu, které poskytují přístup ke službě Event Hubs. Každá zásada sdíleného přístupu má název, oprávnění, která jste nastavili, a přístupové klíče. Tato možnost se vyplní automaticky, pokud nevyberete možnost ručního zadání nastavení služby Event Hubs.
Klíč oddílu Toto volitelné pole je k dispozici pouze v případě, že úlohu nakonfigurujete tak, aby používala úroveň kompatibility 1.2 nebo vyšší. Pokud je vstup rozdělený podle vlastnosti, přidejte sem název této vlastnosti. Použijte ho ke zlepšení výkonu dotazu, pokud obsahuje PARTITION BY klauzuli nebo GROUP BY klauzuli pro tuto vlastnost. Pokud tato úloha používá úroveň kompatibility 1.2 nebo vyšší, je toto pole ve výchozím nastavení nastaveno na PartitionId.
Formát serializace událostí Formát serializace (JSON, CSV, Avro) příchozího datového proudu. Ujistěte se, že formát JSON odpovídá specifikaci a neobsahuje úvodní číslo 0 pro desetinná čísla.
Kódování UTF-8 je v současné době jediným podporovaným formátem kódování.
Typ komprese událostí Typ komprese používaný ke čtení příchozího datového proudu, například None (výchozí), Gzip nebo Deflate.
Registr schématu Vyberte registr schématu se schématy pro data událostí přijatá z centra událostí.

Když data pocházejí ze vstupu streamu služby Event Hubs, máte v dotazu Stream Analytics přístup k následujícím polím metadat:

Vlastnost Popis
EventProcessedUtcTime Datum a čas, kdy Stream Analytics událost zpracuje.
EventEnqueuedUtcTime Datum a čas, kdy event Hubs přijímá události.
ID oddílu ID oddílu s indexem začínajícím od nuly pro vstupní adaptér.

Pomocí těchto polí můžete napsat dotaz jako v následujícím příkladu:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Poznámka:

Pokud jako koncový bod pro trasy IoT Hub používáte službu Event Hubs, můžete k metadatům IoT Hub přistupovat pomocí funkce GetMetadataPropertyValue.

Streamování dat z IoT Hub

Azure IoT Hub je vysoce škálovatelný ingestor událostí publikování a odběru optimalizovaný pro scénáře IoT.

Výchozí časové razítko událostí přicházejících z IoT Hub ve Stream Analytics je časové razítko, které událost dorazila do IoT Hub, což je EventEnqueuedUtcTime. Pokud chcete data zpracovat jako datový proud pomocí časového razítka v datové části události, použijte klíčové slovo TIMESTAMP BY .

skupiny uživatelů IoT Hub

Nakonfigurujte každý vstup Stream Analytics IoT Hub tak, aby měl vlastní skupinu příjemců. Pokud úloha obsahuje vlastní spojení nebo pokud má více vstupů, může některé vstupy přečíst více než jeden čtenář. Tato situace ovlivňuje počet čtenářů v jedné skupině příjemců. Pokud se chcete vyhnout překročení Azure IoT Hub limitu pěti čtenářů na skupinu příjemců na oddíl, určete skupinu příjemců pro každou úlohu Stream Analytics.

Nakonfigurujte IoT Hub jako vstup pro datový stream

Následující tabulka vysvětluje jednotlivé vlastnosti na stránce Nový vstup na portálu Azure při konfiguraci IoT Hub jako vstupu datového proudu.

Vlastnost Popis
Vstupní alias Popisný název, který použijete v dotazu úlohy k odkazování na tento vstup.
Subscription Zvolte předplatné, ve kterém IoT Hub prostředek existuje.
IoT Hub Název IoT Hub, který se má použít jako vstup.
Skupina příjemců Pro každou úlohu Stream Analytics použijte jinou skupinu příjemců. Skupina spotřebitelů přijímá data z IoT Hub. Stream Analytics používá skupinu uživatelů $Default, pokud nezadáte jinak.
Název zásady sdíleného přístupu Zásady sdíleného přístupu, které poskytují přístup k IoT Hub. Každá zásada sdíleného přístupu má název, oprávnění, která jste nastavili, a přístupové klíče.
Klíč zásad sdíleného přístupu Sdílený přístupový klíč použitý k autorizaci přístupu k IoT Hub. Tato možnost se vyplní automaticky, pokud nevyberete možnost ručního zadání nastavení IoT Hub.
Koncový bod Koncový bod pro IoT Hub
Klíč oddílu Je to volitelné pole, které je k dispozici jenom v případě, že úlohu nakonfigurujete tak, aby používala úroveň kompatibility 1.2 nebo vyšší. Pokud rozdělíte vstup podle vlastnosti, můžete sem přidat název této vlastnosti. Používá se ke zlepšení výkonu dotazu, pokud obsahuje klauzuli PARTITION BY nebo GROUP BY pro tuto vlastnost. Pokud tato úloha používá úroveň kompatibility 1.2 nebo vyšší, toto pole bude ve výchozím nastavení nastaveno na "PartitionId."
Formát serializace událostí Formát serializace (JSON, CSV, Avro) příchozího datového proudu. Ujistěte se, že formát JSON odpovídá specifikaci a neobsahuje úvodní číslo 0 pro desetinná čísla.
Kódování UTF-8 je v současné době jediným podporovaným formátem kódování.
Typ komprese událostí Typ komprese používaný ke čtení příchozího datového proudu, například None (výchozí), Gzip nebo Deflate.

Při použití streamu dat z IoT Hub máte v dotazu Stream Analytics přístup k následujícím polím metadat:

Vlastnost Popis
EventProcessedUtcTime Datum a čas zpracování události.
EventEnqueuedUtcTime Datum a čas, kdy IoT Hub obdrží událost.
ID oddílu ID oddílu s indexem začínajícím od nuly pro vstupní adaptér.
IoTHub.MessageId ID, které se používá ke korelaci obousměrné komunikace v IoT Hub.
IoTHub.CorrelationId ID, které se používá v odpovědích na zprávu a zpětné vazbě v IoT Hub
IoTHub.ConnectionDeviceId OVĚŘOVACÍ ID použité k odeslání této zprávy. IoT Hub tuto hodnotu označí u zpráv vázaných službou.
IoTHub.ConnectionDeviceGenerationId Identifikační číslo generace ověřeného zařízení použitého k odeslání této zprávy. IoT Hub tuto hodnotu potvrzuje na zprávách určených pro službu.
IoTHub.EnqueuedTime Čas, kdy IoT Hub obdrží zprávu.

Streamování dat z úložiště blob nebo Data Lake Storage Gen2

Pro scénáře, které zahrnují ukládání velkých objemů nestrukturovaných dat v cloudu, nabízí Azure úložiště objektů blob nebo Azure Data Lake Storage Gen2 nákladově efektivní a škálovatelné řešení. Data v úložišti objektů blob nebo Azure Data Lake Storage Gen2 se považují za data v klidu. Stream Analytics ale může tato data zpracovávat jako datový proud.

Obvyklý scénář použití vstupů se Stream Analytics je zpracování logů. V tomto scénáři zachytáváte datové soubory telemetrie ze systému a potřebujete je analyzovat a zpracovávat za účelem extrakce smysluplných dat.

Výchozím časovým razítkem úložiště objektů blob nebo události Azure Data Lake Storage Gen2 ve Stream Analytics je časové razítko, které bylo naposledy změněno, což je BlobLastModifiedUtcTime. Pokud nahrajete objekt blob do účtu úložiště v 13:00 a spustíte úlohu Azure Stream Analytics pomocí možnosti Now v 13:01, úloha nezahrne objekt blob, protože jeho čas změny spadá mimo období běhu úlohy.

Pokud nahrajete objekt blob do kontejneru účtu úložiště v 13:00 a spustíte úlohu Azure Stream Analytics pomocí Custom Time v 13:00 nebo starší, úloha převezme objekt blob, protože jeho upravený čas spadá do období spuštění úlohy.

Pokud spustíte úlohu Azure Stream Analytics pomocí Now v 13:00 a nahrajete objekt blob do kontejneru účtu úložiště v 13:01, Azure Stream Analytics objekt blob převezme. Časové razítko přiřazené každému objektu blob je založeno pouze na BlobLastModifiedTime. Složka, ve které je objekt blob, nemá žádný vztah k přiřazenému časovému razítku. Například, pokud existuje objekt blob s vlastností 2019/10-01/00/b1.txtBlobLastModifiedTime hodnotou 2019-11-11, pak je k tomuto objektu přiřazeno časové razítko 2019-11-11.

Pokud chcete data zpracovat jako datový proud pomocí časového razítka v datové části události, musíte použít klíčové slovo TIMESTAMP BY . Úloha Stream Analytics načítá data z úložiště objektů blob Azure nebo z Azure Data Lake Storage Gen2 jako vstup každou sekundu, pokud je soubor blob k dispozici. Pokud soubor objektu blob není dostupný, úloha použije exponenciální zpomalování s maximálním časovým zpožděním 90 sekund.

Poznámka:

Stream Analytics nepodporuje přidávání obsahu do existujícího souboru objektů blob. Stream Analytics zobrazí každý soubor jenom jednou a nezpracuje žádné změny, ke kterým dojde v souboru po načtení dat úlohy. Osvědčeným postupem je nahrát všechna data pro soubor objektu blob najednou a pak do jiného nového souboru objektu blob přidat další novější události.

Ve scénářích, kdy průběžně přidáváte mnoho objektů blob a Stream Analytics zpracovává objekty blob v okamžiku jejich přidávání, můžete některé objekty blob ve výjimečných případech přeskočit kvůli granulaci BlobLastModifiedTime. Tento problém můžete zmírnit tím, že objekty blob budete nahrávat s minimálním intervalem dvou sekund. Pokud tato možnost není proveditelná, můžete pomocí služby Event Hubs streamovat velké objemy událostí.

Konfigurace úložiště objektů blob jako vstupu datového proudu

Následující tabulka vysvětluje jednotlivé vlastnosti na stránce Nový vstup na portálu Azure při konfiguraci úložiště objektů blob jako vstupu datového proudu.

Vlastnost Popis
Vstupní alias Popisný název, který použijete v dotazu úlohy k odkazování na tento vstup.
Subscription Vyberte předplatné, ve kterém se nachází úložišťový prostředek.
Účet úložiště Název účtu úložiště, kde jsou uložené blobové objekty.
Klíč účtu úložiště Tajný klíč přidružený k účtu úložiště. Tato možnost se vyplní automaticky, pokud nevyberete možnost ručního zadání nastavení.
Kontejner Kontejnery poskytují logické seskupení objektů blob. Pokud chcete vytvořit nový kontejner, můžete zvolit možnost Použít existující kontejner nebo Vytvořit nový .
Režim ověřování Zadejte typ ověřování, které chcete použít pro připojení k účtu úložiště. K ověření pomocí účtu úložiště můžete použít connection string nebo spravovanou identitu. U možnosti spravované identity můžete buď vytvořit spravovanou identitu přiřazenou systémem k úloze Stream Analytics, nebo spravovanou identitu přiřazenou uživatelem pro ověření pomocí účtu úložiště. Při použití spravované identity musí být spravovaná identita členem příslušné role v účtu úložiště.
Model cesty (volitelné) Cesta k souboru použitá pro vyhledávání blobů v zadaném kontejneru. Pokud chcete číst bloby z kořene kontejneru, nenastavujte cestu. V rámci cesty můžete zadat jednu nebo více instancí následujících tří proměnných: {date}, {time}nebo {partition}

Příklad 1: cluster1/logs/{date}/{time}/{partition}

Příklad 2: cluster1/logs/{date}

Znak * není povolenou hodnotou pro předponu cesty. Jsou povoleny pouze platné znaky Azure blob. Nezahrnujte názvy kontejnerů ani názvy souborů.
Formát data (volitelné) Pokud v cestě použijete proměnnou data, formát data, podle kterého jsou soubory uspořádány. Příklad: YYYY/MM/DD

Pokud je ve vstupu objektu blob cesta obsahující {date} nebo {time}, Stream Analytics prozkoumá složky ve vzestupném pořadí podle času.
Formát času (volitelné) Pokud v cestě použijete časovou proměnnou, zvolte formát času, ve kterém budou soubory uspořádány. V současné době je HH jediná podporovaná hodnota určená pro hodiny.
Klíč oddílu Je to volitelné pole, které je k dispozici jenom v případě, že úlohu nakonfigurujete tak, aby používala úroveň kompatibility 1.2 nebo vyšší. Pokud rozdělíte vstup podle vlastnosti, můžete sem přidat název této vlastnosti. Používá se ke zlepšení výkonu dotazu, pokud obsahuje klauzuli PARTITION BY nebo GROUP BY pro tuto vlastnost. Pokud tato úloha používá úroveň kompatibility 1.2 nebo vyšší, toto pole bude ve výchozím nastavení nastaveno na "PartitionId."
Počet vstupních segmentů Toto pole je k dispozici pouze v případě, že je {partition} ve vzoru cesty. Hodnota této vlastnosti je celé číslo >=1. Všude, kde se {partition} zobrazí v pathPattern, se použije číslo od 0 do hodnoty tohoto pole minus jedna.
Formát serializace událostí Formát serializace (JSON, CSV, Avro) příchozího datového proudu. Ujistěte se, že formát JSON odpovídá specifikaci a neobsahuje úvodní číslo 0 pro desetinná čísla.
Kódování U CSV a JSON je UTF-8 v současné době jediným podporovaným formátem kódování.
Compression Typ komprese používaný ke čtení příchozího datového proudu, například None (výchozí), Gzip nebo Deflate.

Když data pocházejí ze zdroje úložiště objektů blob, můžete v dotazu Stream Analytics přistupovat k následujícím polím metadat:

Vlastnost Popis
Název blobu Název vstupního blobu, odkud událost pochází.
EventProcessedUtcTime Datum a čas, kdy Stream Analytics událost zpracuje.
BlobLastModifiedUtcTime Datum a čas poslední změny objektu blob.
ID oddílu ID oddílu s indexem začínajícím od nuly pro vstupní adaptér.

Pomocí těchto polí můžete napsat dotaz jako v následujícím příkladu:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Streamování dat z Apache Kafka

Azure Stream Analytics umožňuje přímé připojení ke clusterům Apache Kafka k ingestování dat. Řešení je málo kódu a je plně spravované týmem Azure Stream Analytics v Microsoftu, takže splňuje standardy dodržování předpisů pro firmy. Vstup Kafka je zpětně kompatibilní a podporuje všechny verze od verze 0.10 s nejnovější verzí klienta. V závislosti na konfiguracích se můžete připojit ke clusterům Kafka ve virtuální síti a clusterech Kafka s veřejným koncovým bodem. Konfigurace spoléhá na existující konvence konfigurace Kafka. Podporované typy komprese jsou None, Gzip, Snappy, LZ4 a Zstd.

Další informace najdete v tématu Streamová data ze systému Kafka do Azure Stream Analytics (Preview).

Další kroky