Streamování dat jako vstupu do Stream Analytics

Stream Analytics má prvotřídní integraci s datovými proudy Azure jako vstupy ze tří druhů prostředků:

Tyto vstupní prostředky můžou být ve stejném předplatném Azure jako vaše úloha Stream Analytics nebo jiné předplatné.

Komprese

Stream Analytics podporuje kompresi pro všechny vstupní zdroje. Podporované typy komprese jsou: Žádné, Gzip a Deflate. Podpora komprese není dostupná pro referenční data. Pokud jsou vstupní data komprimovaná avro, Stream Analytics je zpracuje transparentně. U serializace Avro nemusíte zadávat typ komprese.

Vytváření, úpravy nebo testování vstupů

Pomocí webu Azure Portal, sady Visual Studio a editoru Visual Studio Code můžete přidat a zobrazit nebo upravit existující vstupy v úloze streamování. Vstupní připojení a testovací dotazy můžete testovat také z ukázkových dat z webu Azure Portal, sady Visual Studio a editoru Visual Studio Code. Při psaní dotazu vypíšete vstup v klauzuli FROM. Seznam dostupných vstupů můžete získat na stránce Dotaz na portálu. Pokud chcete použít více vstupů, JOIN zapište je nebo napíšete více SELECT dotazů.

Poznámka:

Důrazně doporučujeme používat nástroje Stream Analytics pro Visual Studio Code pro nejlepší místní vývojové prostředí. V nástrojích Stream Analytics pro Visual Studio 2019 (verze 2.6.3000.0) existují známé mezery ve funkcích a nebude se dále zlepšovat.

Streamování dat z Event Hubs

Azure Event Hubs je vysoce škálovatelný ingestor událostí publikování a odběru. Centrum událostí může shromažďovat miliony událostí za sekundu, abyste mohli zpracovávat a analyzovat obrovské objemy dat vytvořených připojenými zařízeními a aplikacemi. Služba Event Hubs a Stream Analytics společně poskytují ucelené řešení pro analýzu v reálném čase. Služba Event Hubs umožňuje kanálovat události do Azure v reálném čase a úlohy Stream Analytics můžou tyto události zpracovávat v reálném čase. Do služby Event Hubs můžete například odesílat události webového kliknutí, snímačů nebo událostí protokolu online. Pak můžete vytvořit úlohy Stream Analytics, které budou používat službu Event Hubs pro vstupní data pro filtrování, agregaci a korelaci v reálném čase.

EventEnqueuedUtcTime je časové razítko příchodu události do centra událostí a je výchozím časovým razítkem událostí přicházejících ze služby Event Hubs do Stream Analytics. Pokud chcete data zpracovat jako datový proud pomocí časového razítka v datové části události, musíte použít klíčové slovo TIMESTAMP BY .

Skupiny příjemců služby Event Hubs

Každý vstup centra událostí byste měli nakonfigurovat tak, aby měl vlastní skupinu příjemců. Pokud úloha obsahuje samoobslužné spojení nebo má více vstupů, mohou být některé vstupy přečtené více než jedním čtenářem v podřízené části. Tato situace ovlivňuje počet čtenářů v jedné skupině příjemců. Aby se zabránilo překročení limitu služby Event Hubs pěti čtenářů na skupinu příjemců na oddíl, je osvědčeným postupem určit skupinu příjemců pro každou úlohu Stream Analytics. Pro centrum událostí úrovně Standard je také limit 20 skupin příjemců. Další informace najdete v tématu Řešení potíží se vstupy Azure Stream Analytics.

Vytvoření vstupu ze služby Event Hubs

Následující tabulka vysvětluje jednotlivé vlastnosti na nové vstupní stránce na webu Azure Portal pro streamování vstupu dat z centra událostí:

Vlastnost Popis
Vstupní alias Popisný název, který použijete v dotazu úlohy k odkazování na tento vstup.
Předplatné Zvolte předplatné Azure, ve kterém existuje prostředek centra událostí.
Obor názvů centra událostí Obor názvů služby Event Hubs je kontejner pro centra událostí. Při vytváření centra událostí vytvoříte také obor názvů.
Název centra událostí Název centra událostí, který se má použít jako vstup.
Skupina příjemců centra událostí (doporučeno) Pro každou úlohu Stream Analytics doporučujeme použít samostatnou skupinu příjemců. Tento řetězec identifikuje skupinu příjemců, která se má použít k ingestování dat z centra událostí. Pokud není zadána žádná skupina příjemců, použije $Default úloha Stream Analytics skupinu příjemců.
Režim ověřování Zadejte typ ověřování, které chcete použít pro připojení k centru událostí. K ověření v centru událostí můžete použít připojovací řetězec nebo spravovanou identitu. U možnosti spravované identity můžete buď vytvořit spravovanou identitu přiřazenou systémem k úloze Stream Analytics, nebo spravovanou identitu přiřazenou uživatelem pro ověření v centru událostí. Pokud používáte spravovanou identitu, musí být spravovaná identita členem role Vlastník dat služby Azure Event Hubs nebo Vlastník dat služby Azure Event Hubs.
Název zásady centra událostí Zásady sdíleného přístupu, které poskytují přístup ke službě Event Hubs. Každá zásada sdíleného přístupu má název, oprávnění, která jste nastavili, a přístupové klíče. Tato možnost se vyplní automaticky, pokud nevyberete možnost ručního zadání nastavení služby Event Hubs.
Klíč oddílu Jedná se o volitelné pole, které je k dispozici pouze v případě, že je vaše úloha nakonfigurovaná tak, aby používala úroveň kompatibility 1.2 nebo vyšší. Pokud je vstup rozdělený podle vlastnosti, můžete sem přidat název této vlastnosti. Používá se ke zlepšení výkonu dotazu, pokud obsahuje PARTITION BY klauzuli nebo GROUP BY klauzuli pro tuto vlastnost. Pokud tato úloha používá úroveň kompatibility 1.2 nebo vyšší, je toto pole ve výchozím nastavení nastaveno na PartitionId.
Formát serializace události Formát serializace (JSON, CSV, Avro, Parquet nebo Other (Protobuf, XML, proprietární...) příchozího datového proudu. Ujistěte se, že formát JSON odpovídá specifikaci a neobsahuje úvodní číslo 0 pro desetinná čísla.
Kódování UTF-8 je v současné době jediným podporovaným formátem kódování.
Typ komprese událostí Typ komprese používaný ke čtení příchozího datového proudu, například None (výchozí), Gzip nebo Deflate.
Registr schématu (Preview) Můžete vybrat registr schématu se schématy pro data událostí přijatá z centra událostí.

Když data pocházejí ze vstupu streamu služby Event Hubs, máte v dotazu Stream Analytics přístup k následujícím polím metadat:

Vlastnost Popis
EventProcessedUtcTime Datum a čas, kdy Stream Analytics událost zpracuje.
EventEnqueuedUtcTime Datum a čas, kdy služba Event Hubs přijímá události.
Partitionid ID oddílu založeného na nule pro vstupní adaptér.

Pomocí těchto polí můžete například napsat dotaz jako v následujícím příkladu:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Poznámka:

Při použití služby Event Hubs jako koncového bodu pro trasy služby IoT Hub můžete přistupovat k metadatům služby IoT Hub pomocí funkce GetMetadataPropertyValue.

Streamování dat ze služby IoT Hub

Azure IoT Hub je vysoce škálovatelný ingestor událostí publikování a odběru optimalizovaný pro scénáře IoT.

Výchozí časové razítko událostí přicházejících ze služby IoT Hub ve Stream Analytics je časové razítko, které událost přišla do IoT Hubu, což je EventEnqueuedUtcTime. Pokud chcete data zpracovat jako datový proud pomocí časového razítka v datové části události, musíte použít klíčové slovo TIMESTAMP BY .

Skupiny uživatelů iot Hubu

Každý vstup služby Stream Analytics IoT Hub byste měli nakonfigurovat tak, aby měl vlastní skupinu příjemců. Pokud úloha obsahuje vlastní spojení nebo pokud má více vstupů, může být některé vstupy přečteno více než jedním čtenářem v podřízené oblasti. Tato situace ovlivňuje počet čtenářů v jedné skupině příjemců. Abyste se vyhnuli překročení limitu služby Azure IoT Hub pěti čtenářů na skupinu příjemců na oddíl, je osvědčeným postupem určit skupinu příjemců pro každou úlohu Stream Analytics.

Konfigurace ioT Hubu jako vstupu datového streamu

Následující tabulka vysvětluje jednotlivé vlastnosti na stránce Nový vstup na webu Azure Portal při konfiguraci ioT Hubu jako vstupu datového proudu.

Vlastnost Popis
Vstupní alias Popisný název, který použijete v dotazu úlohy k odkazování na tento vstup.
Předplatné Zvolte předplatné, ve kterém existuje prostředek IoT Hubu.
IoT Hub Název IoT Hubu, který se má použít jako vstup.
Skupina příjemců Pro každou úlohu Stream Analytics doporučujeme použít jinou skupinu příjemců. Skupina příjemců se používá k ingestování dat ze služby IoT Hub. Stream Analytics používá skupinu uživatelů $Default, pokud nezadáte jinak.
Název zásady sdíleného přístupu Zásady sdíleného přístupu, které poskytují přístup ke službě IoT Hub. Každá zásada sdíleného přístupu má název, oprávnění, která jste nastavili, a přístupové klíče.
Klíč zásad sdíleného přístupu Sdílený přístupový klíč použitý k autorizaci přístupu ke službě IoT Hub. Tato možnost se automaticky vyplní, pokud nevyberete možnost ručního zadání nastavení služby Iot Hub.
Endpoint Koncový bod pro IoT Hub.
Klíč oddílu Jedná se o volitelné pole, které je k dispozici pouze v případě, že je vaše úloha nakonfigurovaná tak, aby používala úroveň kompatibility 1.2 nebo vyšší. Pokud je vstup rozdělený podle vlastnosti, můžete sem přidat název této vlastnosti. Používá se ke zlepšení výkonu dotazu, pokud obsahuje klauzuli PARTITION BY nebo GROUP BY pro tuto vlastnost. Pokud tato úloha používá úroveň kompatibility 1.2 nebo vyšší, bude toto pole ve výchozím nastavení nastaveno na Id oddílu.
Formát serializace události Formát serializace (JSON, CSV, Avro, Parquet nebo Other (Protobuf, XML, proprietární...) příchozího datového proudu. Ujistěte se, že formát JSON odpovídá specifikaci a neobsahuje úvodní číslo 0 pro desetinná čísla.
Kódování UTF-8 je v současné době jediným podporovaným formátem kódování.
Typ komprese událostí Typ komprese používaný ke čtení příchozího datového proudu, například None (výchozí), Gzip nebo Deflate.

Pokud používáte streamová data ze služby IoT Hub, máte v dotazu Stream Analytics přístup k následujícím polím metadat:

Vlastnost Popis
EventProcessedUtcTime Datum a čas zpracování události.
EventEnqueuedUtcTime Datum a čas, kdy IoT Hub obdrží událost.
Partitionid ID oddílu založeného na nule pro vstupní adaptér.
IoTHub.MessageId ID, které se používá ke korelaci obousměrné komunikace ve službě IoT Hub.
IoTHub.CorrelationId ID, které se používá v odpovědích na zprávy a zpětné vazbě ve službě IoT Hub.
IoTHub. Připojení ionDeviceId OVĚŘOVACÍ ID použité k odeslání této zprávy. Tato hodnota se označí u zpráv vázaných službou ioT Hub.
IoTHub. Připojení ionDeviceGenerationId ID generování ověřeného zařízení použitého k odeslání této zprávy. Tato hodnota se v IoT Hubu razítek na příchozí zprávy služby.
IoTHub.EnqueuedTime Čas, kdy IoT Hub obdrží zprávu.

Streamování dat ze služby Blob Storage nebo Data Lake Storage Gen2

Pro scénáře s velkým množstvím nestrukturovaných dat, která se mají ukládat v cloudu, nabízí Azure Blob Storage nebo Azure Data Lake Storage Gen2 nákladově efektivní a škálovatelné řešení. Data ve službě Blob Storage nebo Azure Data Lake Storage Gen2 se považují za neaktivní uložená data. Tato data je ale možné zpracovat jako datový proud streamem Stream Analytics.

Zpracování protokolů je běžně používaný scénář pro použití takových vstupů se službou Stream Analytics. V tomto scénáři se datové soubory telemetrie zaznamenávají ze systému a je potřeba je analyzovat a zpracovávat za účelem extrakce smysluplných dat.

Výchozí časové razítko události Blob Storage nebo Azure Data Lake Storage Gen2 ve Stream Analytics je časové razítko, které BlobLastModifiedUtcTimebylo naposledy změněno. Pokud se objekt blob nahraje do účtu úložiště v 13:00 a úloha Azure Stream Analytics se spustí pomocí možnosti Nyní v 13:01, nebude vyzvednuta, protože její čas změny spadá mimo období spuštění úlohy.

Pokud se objekt blob nahraje do kontejneru účtu úložiště v 13:00 a úloha Azure Stream Analytics začne používat vlastní čas v 13:00 nebo starší, objekt blob se vyzvedne, protože jeho čas změny spadá do období spuštění úlohy.

Pokud je úloha Azure Stream Analytics spuštěná v 13:00 a objekt blob se nahraje do kontejneru účtu úložiště v 13:01, Azure Stream Analytics objekt blob převezme. Časové razítko přiřazené každému objektu blob je založeno pouze na BlobLastModifiedTime. Složka, ve které je objekt blob, nemá žádný vztah k přiřazenému časovému razítku. Pokud je například objekt blob s objektem 2019-11-11BlobLastModifiedTime blob 2019/10-01/00/b1.txt , časové razítko přiřazené k tomuto objektu blob je 2019-11-11.

Pokud chcete data zpracovat jako datový proud pomocí časového razítka v datové části události, musíte použít klíčové slovo TIMESTAMP BY . Úloha Stream Analytics načítá data ze služby Azure Blob Storage nebo vstupu Azure Data Lake Storage Gen2 každou sekundu, pokud je soubor objektu blob k dispozici. Pokud soubor objektu blob není dostupný, dojde k exponenciálnímu zpožďování s maximálním časovým zpožděním 90 sekund.

Poznámka:

Stream Analytics nepodporuje přidávání obsahu do existujícího souboru objektů blob. Stream Analytics zobrazí každý soubor pouze jednou a všechny změny, ke kterým dojde v souboru po načtení dat, se nezpracují. Osvědčeným postupem je nahrát všechna data pro soubor objektu blob najednou a pak do jiného nového souboru objektu blob přidat další novější události.

Ve scénářích, kdy se průběžně přidává mnoho objektů blob a Stream Analytics zpracovává objekty blob při jejich přidání, je možné některé objekty blob přeskočit ve výjimečných případech kvůli členitosti objektu BlobLastModifiedTimeblob . Tento případ můžete zmírnit tak, že nahrajete objekty blob aspoň dvě sekundy. Pokud tato možnost není proveditelná, můžete pomocí služby Event Hubs streamovat velké objemy událostí.

Konfigurace úložiště objektů blob jako vstupu datového proudu

Následující tabulka vysvětluje jednotlivé vlastnosti na stránce Nový vstup na webu Azure Portal při konfiguraci úložiště objektů blob jako vstup datového proudu.

Vlastnost Popis
Vstupní alias Popisný název, který použijete v dotazu úlohy k odkazování na tento vstup.
Předplatné Zvolte předplatné, ve kterém prostředek úložiště existuje.
Účet úložiště Název účtu úložiště, ve kterém jsou umístěné soubory objektů blob.
Klíč účtu úložiště Tajný klíč přidružený k účtu úložiště. Tato možnost se automaticky vyplní, pokud nevyberete možnost pro ruční zadání nastavení.
Kontejner Kontejnery poskytují logické seskupení objektů blob. Pokud chcete vytvořit nový kontejner, můžete zvolit možnost Použít existující kontejner nebo Vytvořit nový .
Režim ověřování Zadejte typ ověřování, které chcete použít pro připojení k účtu úložiště. K ověření pomocí účtu úložiště můžete použít připojovací řetězec nebo spravovanou identitu. U možnosti spravované identity můžete buď vytvořit spravovanou identitu přiřazenou systémem k úloze Stream Analytics, nebo spravovanou identitu přiřazenou uživatelem pro ověření pomocí účtu úložiště. Při použití spravované identity musí být spravovaná identita členem příslušné role v účtu úložiště.
Model cesty (volitelné) Cesta k souboru použitá k vyhledání objektů blob v zadaném kontejneru. Pokud chcete číst objekty blob z kořenového adresáře kontejneru, nenastavujte vzor cesty. V rámci cesty můžete zadat jednu nebo více instancí následujících tří proměnných: {date}, {time}nebo {partition}

Příklad 1: cluster1/logs/{date}/{time}/{partition}

Příklad 2: cluster1/logs/{date}

Znak * není povolenou hodnotou pro předponu cesty. Jsou povoleny pouze platné znaky objektů blob Azure. Nezahrnujte názvy kontejnerů ani názvy souborů.
Formát data (volitelné) Pokud v cestě použijete proměnnou kalendářního data, formát data, ve kterém jsou soubory uspořádány. Příklad: YYYY/MM/DD

Pokud má {date} vstup objektu blob nebo {time} jeho cestu, složky se prohlížejí ve vzestupném časovém pořadí.
Formát času (volitelné) Pokud v cestě použijete časová proměnná, formát času, ve kterém jsou soubory uspořádány. V současné době je HH jediná podporovaná hodnota určená pro hodiny.
Klíč oddílu Jedná se o volitelné pole, které je k dispozici pouze v případě, že je vaše úloha nakonfigurovaná tak, aby používala úroveň kompatibility 1.2 nebo vyšší. Pokud je vstup rozdělený podle vlastnosti, můžete sem přidat název této vlastnosti. Používá se ke zlepšení výkonu dotazu, pokud obsahuje klauzuli PARTITION BY nebo GROUP BY pro tuto vlastnost. Pokud tato úloha používá úroveň kompatibility 1.2 nebo vyšší, bude toto pole ve výchozím nastavení nastaveno na Id oddílu.
Počet vstupních oddílů Toto pole je k dispozici pouze v případě, že je {partition} ve vzoru cesty. Hodnota této vlastnosti je celé číslo >=1. Všude, kde se {partition} zobrazí v cestěPattern, se použije číslo od 0 do hodnoty tohoto pole -1.
Formát serializace události Formát serializace (JSON, CSV, Avro, Parquet nebo Other (Protobuf, XML, proprietární...) příchozího datového proudu. Ujistěte se, že formát JSON odpovídá specifikaci a neobsahuje úvodní číslo 0 pro desetinná čísla.
Kódování U CSV a JSON je UTF-8 v současné době jediným podporovaným formátem kódování.
Komprese Typ komprese používaný ke čtení příchozího datového proudu, například None (výchozí), Gzip nebo Deflate.

Když data pocházejí ze zdroje úložiště objektů blob, máte v dotazu Stream Analytics přístup k následujícím polím metadat:

Vlastnost Popis
Název objektu blob Název vstupního objektu blob, ze kterého událost pochází.
EventProcessedUtcTime Datum a čas, kdy Stream Analytics událost zpracuje.
BlobLastModifiedUtcTime Datum a čas poslední změny objektu blob.
Partitionid ID oddílu založeného na nule pro vstupní adaptér.

Pomocí těchto polí můžete například napsat dotaz jako v následujícím příkladu:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Streamování dat z Apache Kafka

Azure Stream Analytics umožňuje připojit se přímo ke clusterům Apache Kafka k ingestování dat. Řešení je málo kódu a je plně spravované týmem Azure Stream Analytics v Microsoftu, což umožňuje vyhovět standardům dodržování předpisů pro firmy. Vstup Kafka je zpětně kompatibilní a podporuje všechny verze s nejnovější verzí klienta počínaje verzí 0.10. Uživatelé se můžou připojit ke clusterům Kafka ve virtuální síti a clusterech Kafka s veřejným koncovým bodem v závislosti na konfiguracích. Konfigurace spoléhá na existující konvence konfigurace Kafka. Podporované typy komprese jsou None, Gzip, Snappy, LZ4 a Zstd.

Další informace najdete v tématu Streamování dat ze systému Kafka do Azure Stream Analytics (Preview).

Další kroky