Adatok streamelése bemenetként a Stream Analyticsbe

A Stream Analytics ötféle erőforrás bemeneteként integrálható Azure adatfolyamokkal:

Ezek a bemeneti erőforrások ugyanabban a Azure előfizetésben létezhetnek, mint a Stream Analytics-feladat vagy egy másik előfizetés.

Tömörítés

A Stream Analytics támogatja az összes bemeneti forrás tömörítését. A támogatott tömörítési típusok a következők: Nincs, Gzip és Deflate. A Stream Analytics nem támogatja a referenciaadatok tömörítését. Ha a bemeneti adatok tömörített Avro-adatok, a Stream Analytics transzparensen kezeli azokat. Nem kell megadnia a tömörítés típusát Avro szerializálás során.

Bemenetek létrehozása, szerkesztése vagy tesztelése

A Azure portál, Visual Studio és Visual Studio Code használatával felveheti és megtekintheti vagy szerkesztheti a streamelési feladat meglévő bemeneteit. A bemeneti kapcsolatokat és a lekérdezéseket a Azure portálon, a Visual Studio és Visual Studio Code mintaadatokból is tesztelheti. Lekérdezés írásakor a FROM záradékban listázhatja a bemenetet. Az elérhető bemenetek listáját a portál Lekérdezés lapján szerezheti be. Ha több bemenetet szeretne használni, JOIN azokat össze vagy írjon több SELECT lekérdezést.

Megjegyzés:

A legjobb helyi fejlesztési élmény érdekében használja a Stream Analytics-eszközöket Visual Studio Code. Az Visual Studio 2019-es (2.6.3000.0-s verzió) Stream Analytics-eszközeiben ismert funkcióbeli hiányosságok vannak, és a jövőben nem fog javulni.

Adatok streamelése az Event Hubsból

Azure Event Hubs egy nagymértékben méretezhető közzétételi-feliratkozási eseménybetöltés. Az eseményközpont másodpercenként több millió eseményt gyűjthet össze, hogy feldolgozhassa és elemezhesse a csatlakoztatott eszközök és alkalmazások által előállított nagy mennyiségű adatot. Az Event Hubs és a Stream Analytics együttesen egy végpontok közötti megoldást biztosítanak a valós idejű elemzésekhez. Az Event Hubs folyamatosan az Azure-ba juttatja az eseményeket, a Stream Analytics-feladatok pedig valós időben dolgozzák fel ezeket. Küldhet például webes kattintásokat, érzékelőolvasásokat vagy online naplóeseményeket az Event Hubsnak. Ezután Létrehozhat Stream Analytics-feladatokat az Event Hubs használatával a bemeneti adatokhoz valós idejű szűréshez, összesítéshez és korrelációhoz.

EventEnqueuedUtcTime az esemény eseményközpontba való érkezésének időbélyege, és az Event Hubsból a Stream Analyticsbe érkező események alapértelmezett időbélyege. Az adatok streamként való feldolgozásához az eseménypayload időbélyegével, a TIMESTAMP BY kulcsszót kell használnia.

Event Hubs fogyasztói csoportok

Konfigurálja az egyes eseményközpont-bemeneteket a saját fogyasztói csoportjával. Ha egy feladat önillesztéseket tartalmaz, vagy több bemenettel rendelkezik, több olvasó is elolvashat néhány bemenetet. Ez a helyzet egyetlen fogyasztói csoport olvasóinak számát érinti. Annak érdekében, hogy ne lépje túl az Event Hubs partíciónkénti öt olvasóra vonatkozó korlátját, minden Stream Analytics-feladathoz jelöljön ki egy fogyasztói csoportot. A Standard szintű eseményközpontok esetében 20 fogyasztói csoportra van korlátozva. További információ: Azure Stream Analytics bemenetek hibaelhárítása.

Bemenet létrehozása az Event Hubsból

Az alábbi táblázat a Új bemenet lap minden tulajdonságát ismerteti az Azure portálon az adatbevitel eseményközpontból való streameléséhez:

Ingatlan Leírás
Bemeneti alias Egy barátságos név, amelyet a munka lekérdezésében használ a bemenetre való hivatkozáshoz.
Subscription Válassza ki azt a Azure előfizetést, amelyben az Event Hub-erőforrás létezik.
Event Hub-névtér Az Event Hubs-névtér az eseményközpontok tárolója. Eseményközpont létrehozásakor a névteret is létre kell hoznia.
Event Hub neve A bemenetként használni kívánt eseményközpont neve.
Event Hub fogyasztói csoport (ajánlott) Minden Stream Analytics-feladathoz használjon külön fogyasztói csoportot. Ez a szöveg azonosítja azt a fogyasztói csoportot, amely az eseményközpontból származó adatok betöltésére szolgál. Ha nem ad meg fogyasztói csoportot, a Stream Analytics-feladat a $Default fogyasztói csoportot használja.
Hitelesítési mód Adja meg az eseményközponthoz való csatlakozáshoz használni kívánt hitelesítés típusát. Az eseményközponttal való hitelesítéshez használjon connection string vagy felügyelt identitást. A felügyelt identitás beállításhoz létrehozhat egy rendszer által hozzárendelt felügyelt identitást a Stream Analytics-feladathoz, vagy egy felhasználó által hozzárendelt felügyelt identitást az eseményközponttal való hitelesítéshez. Felügyelt identitás használata esetén a felügyelt identitásnak Azure Event Hubs adatátvevő vagy Azure Event Hubs adattulajdonosi szerepkörök tagjának kell lennie.
Event Hub-szabályzat neve A közös hozzáférési szabályzat, amely hozzáférést biztosít az Event Hubshoz. Minden megosztott hozzáférési szabályzat rendelkezik névvel, beállított engedélyekkel és hozzáférési kulcsokkal. Ez a beállítás automatikusan fel lesz töltve, kivéve, ha manuálisan szeretné megadni az Event Hubs beállításait.
Partíciókulcs Ez az opcionális mező csak akkor érhető el, ha a feladatot az 1.2-es vagy újabb kompatibilitási szint használatára konfigurálja. Ha a bemenetet egy tulajdonság particionálta, adja hozzá a tulajdonság nevét. Ezt használja a lekérdezés teljesítményének javítására, ha ez a tulajdonság egy PARTITION BY vagy GROUP BY záradékot tartalmaz. Ha ez a feladat az 1.2-es vagy újabb kompatibilitási szintet használja, ez a mező alapértelmezés szerint a PartitionId.
Esemény szerializálási formátuma A bejövő adatfolyam szerializálási formátuma (JSON, CSV, Avro). Győződjön meg arról, hogy a JSON formátum megfelel a specifikációnak, és nem tartalmaz kezdő nullát a decimális számok előtt.
Kódolás Jelenleg az UTF-8 az egyetlen támogatott kódolási formátum.
Eseménytömörítés típusa A bejövő adatfolyam olvasásához használt tömörítési típus, például Nincs (alapértelmezett), Gzip vagy Deflate.
Sémaregisztrációs adatbázis Válassza ki az eseményközponttól kapott eseményadatok sémáit tartalmazó sémaregisztrációs adatbázist.

Amikor az adatok egy Event Hubs-streambemenetből származnak, a Stream Analytics-lekérdezésben a következő metaadatmezőkhöz férhet hozzá:

Ingatlan Leírás
EventProcessedUtcTime A Stream Analytics által az esemény feldolgozásakor megadott dátum és idő.
EventEnqueuedUtcTime Az a dátum és időpont, amikor az Event Hubs megkapja az eseményeket.
PartitionId A bemeneti adapter nulla alapú partícióazonosítója.

Az alábbi mezők használatával az alábbi példához hasonlóan írhat lekérdezést:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Megjegyzés:

Ha az Event Hubsot használja végpontként IoT Hub útvonalakhoz, a IoT Hub metaadatokat a GetMetadataPropertyValue függvény használatával érheti el.

Adatok streamelése az IoT Hub-ból

Azure IoT Hub egy nagymértékben skálázható, IoT-forgatókönyvekhez optimalizált publish-subscribe eseményfeldolgozó rendszer.

Az IoT Hub-hoz érkező események alapértelmezett időbélyege az az időbélyeg, amikor az esemény megérkezett az IoT Hub-ba, amely EventEnqueuedUtcTime. Ha streamként szeretné feldolgozni az adatokat az esemény hasznos adatainak időbélyegével, használja a TIMESTAMP BY kulcsszót .

IoT Hub fogyasztói csoportok

Konfigurálja az egyes Stream Analytics-IoT Hub bemeneteket úgy, hogy saját fogyasztói csoportjuk legyen. Ha egy feladat önálló illesztéseket tartalmaz, vagy ha több bemenettel rendelkezik, egynél több olvasó is elolvashat bizonyos bemeneteket. Ez a helyzet egyetlen fogyasztói csoport olvasóinak számát befolyásolja. Annak érdekében, hogy ne lépje túl a Azure IoT Hub felhasználói csoportonkénti öt olvasóra vonatkozó korlátot partíciónként, jelöljön ki egy fogyasztói csoportot minden Stream Analytics-feladathoz.

IoT Hub konfigurálása adatfolyam-bemenetként

Az alábbi táblázat a Új bemenet lap minden tulajdonságát ismerteti a Azure portálon, amikor streambemenetként konfigurál egy IoT Hub.

Ingatlan Leírás
Bemeneti alias Egy barátságos név, amelyet a munka lekérdezésében használ a bemenetre való hivatkozáshoz.
Subscription Válassza ki azt az előfizetést, amelyben a IoT Hub erőforrás létezik.
IoT Hub A bemenetként használni kívánt IoT Hub neve.
Fogyasztói csoport Használjon egy másik fogyasztói csoportot minden Stream Analytics-feladathoz. A fogyasztói csoport feldolgozza az adatokat az IoT Hubból. A Stream Analytics a $Default fogyasztói csoportot használja, hacsak másként nem adja meg.
megosztott hozzáférési szabályzat neve A IoT Hub hozzáférést biztosító megosztott hozzáférési szabályzat. Minden megosztott hozzáférési szabályzat rendelkezik névvel, beállított engedélyekkel és hozzáférési kulcsokkal.
Megosztott hozzáférési szabályzatkulcs A IoT Hub való hozzáférés engedélyezéséhez használt közös hozzáférési kulcs. Ez az opció automatikusan kitöltésre kerül, hacsak nem választja az az opciót, az IoT Hub beállítások kézzel történő megadásához.
végpont A IoT Hub végpontja.
Partíciókulcs Ez egy nem kötelező mező, amely csak akkor érhető el, ha a feladatot az 1.2-es vagy újabb kompatibilitási szint használatára konfigurálja. Ha egy tulajdonság alapján particionálja a bemenetet, itt adhatja hozzá a tulajdonság nevét. A lekérdezés teljesítményének javítására szolgál, ha tartalmaz egy PARTITION BY vagy GROUP BY záradékot ezen a tulajdonságon. Ha ez a feladat 1.2-es vagy újabb kompatibilitási szintet használ, ez a mező alapértelmezés szerint "PartitionId".
Esemény szerializálási formátuma A bejövő adatfolyam szerializálási formátuma (JSON, CSV, Avro). Győződjön meg arról, hogy a JSON formátum megfelel a specifikációnak, és nem tartalmaz kezdő nullát a decimális számok előtt.
Kódolás Jelenleg az UTF-8 az egyetlen támogatott kódolási formátum.
Eseménytömörítés típusa A bejövő adatfolyam olvasásához használt tömörítési típus, például Nincs (alapértelmezett), Gzip vagy Deflate.

Amikor streamadatokat használ egy IoT Hub, a Stream Analytics-lekérdezésben a következő metaadatmezőkhöz férhet hozzá:

Ingatlan Leírás
EventProcessedUtcTime Az esemény feldolgozásának dátuma és időpontja.
EventEnqueuedUtcTime Az a dátum és idő, amikor a IoT Hub megkapja az eseményt.
PartitionId A bemeneti adapter nulla alapú partícióazonosítója.
IoTHub.MessageId Egy azonosító, amely a kétirányú kommunikáció korrelációjára szolgál az IoT Hubban.
IoTHub.CorrelationId A IoT Hub üzenetválaszaiban és visszajelzéseiben használt azonosító.
IoTHub.ConnectionDeviceId Az üzenet küldéséhez használt hitelesítési azonosító. A IoT Hub ezt az értéket a szolgáltatáshoz kötött üzenetekre bélyegolja.
IoTHub.ConnectionDeviceGenerationId Az üzenet küldéséhez használt hitelesített eszköz generációazonosítója. A IoT Hub ezt az értéket a szolgáltatáshoz kötött üzenetekre bélyegozza.
IoTHub.EnqueuedTime Az az időpont, amikor a IoT Hub megkapja az üzenetet.

Adatok streamelése Blob Storage-ból vagy Data Lake Storage Gen2

A nagy mennyiségű strukturálatlan adat felhőben való tárolását magában foglaló forgatókönyvek esetében Azure Blob Storage vagy Azure Data Lake Storage Gen2 költséghatékony és méretezhető megoldást kínál. A Blob Storage-ban vagy Azure Data Lake Storage Gen2 tárolt adatok inaktív adatoknak minősülnek. A Stream Analytics azonban adatfolyamként is feldolgozhatja ezeket az adatokat.

Az ilyen bemenetek Stream Analyticsben való használatára gyakran használt forgatókönyv a naplófeldolgozás. Ebben a forgatókönyvben telemetriai adatfájlokat rögzít egy rendszerből, és elemeznie és feldolgoznia kell őket, hogy hasznos adatokat nyerjen ki.

A Blob Storage- vagy Azure Data Lake Storage Gen2-események alapértelmezett időbélyege a Stream Analyticsben a legutóbb módosított időbélyeg, amely BlobLastModifiedUtcTime. Ha 13:00-kor feltölt egy blobot egy tárfiókba, és 13:01-kor elindítja a Azure Stream Analytics feladatot a Now beállítással, a feladat nem veszi fel a blobot, mert a módosított idő a feladat futási időszakán kívül esik.

Ha 13:00-kor feltölt egy blobot egy tárfiók tárolójába, és az Azure Stream Analytics feladatot Egyéni idő 13:00-kor vagy korábban indítja el, a feladat azzal veszi fel a blobot, hogy a módosított idő a feladat futási időszakába esik.

Ha Azure Stream Analytics feladatot Now 13:00-kor indít el, és 13:01-kor feltölt egy blobot a tárfiók tárolójába, Azure Stream Analytics felveszi a blobot. Az egyes blobokhoz rendelt időbélyeg csak a BlobLastModifiedTime-on alapul. A blobhoz tartozó mappának nincs köze a hozzárendelt időbélyeghez. Ha például van egy 2019/10-01/00/b1.txt blob, amelynek BlobLastModifiedTime2019-11-11 van, akkor ennek a blobnak rendelt időbélyeg 2019-11-11.

Az adatok streamként való feldolgozásához az esemény adatcsomagjában lévő időbélyeget kell alkalmaznia, és erre a TIMESTAMP BY kulcsszót kell használnia. A Stream Analytics-feladat másodpercenként lekéri az adatokat az Azure Blob Storage-ból vagy az Azure Data Lake Storage Gen2-ból, ha a blobfájl elérhető. Ha a blobfájl nem érhető el, a feladat exponenciális visszalépést használ 90 másodperces maximális késleltetéssel.

Megjegyzés:

A Stream Analytics nem támogatja a tartalom meglévő blobfájlhoz való hozzáadását. A Stream Analytics csak egyszer tekinti meg az egyes fájlokat, és a feladat beolvasása után nem dolgoz fel semmilyen módosítást a fájlban. Ajánlott eljárás, ha egy blobfájl összes adatát egyszerre tölti fel, majd újabb eseményeket ad hozzá egy másik, új blobfájlhoz.

Olyan helyzetekben, amikor folyamatosan sok blobot ad hozzá, és ezeket a Stream Analytics azonnal feldolgozza, bizonyos ritka esetekben előfordulhat, hogy a BlobLastModifiedTime-ből fakadó részletesség miatt néhány blobot kihagy. Ezt a problémát a blobok legalább két másodperc távolságra történő feltöltésével háríthatja el. Ha ez a lehetőség nem megvalósítható, az Event Hubs használatával nagy mennyiségű eseményt streamelhet.

A Blob Storage konfigurálása stream bemenetként

Az alábbi táblázat a Új bemenet lap minden tulajdonságát ismerteti a Azure portálon, amikor streambemenetként konfigurálja a Blob Storage-t.

Ingatlan Leírás
Bemeneti alias Egy barátságos név, amelyet a munka lekérdezésében használ a bemenetre való hivatkozáshoz.
Subscription Válassza ki azt az előfizetést, amelyben a tárolási erőforrás létezik.
tárhelyfiók Annak a tárfióknak a neve, amelyben a blobfájlok találhatók.
Tárfiók kulcs A tárfiókhoz társított titkos kulcs. Ez a beállítás automatikusan kitöltésre kerül, hacsak nem választja a manuális megadás lehetőségét.
Konténer A tárolók logikai csoportosítást biztosítanak a blobokhoz. Választhatja a Meglévő tároló használata vagy az Új létrehozása lehetőséget egy új tároló létrehozásához.
Hitelesítési mód Adja meg a tárfiókhoz való csatlakozáshoz használni kívánt hitelesítés típusát. A tárfiókkal való hitelesítéshez használhat egy kapcsolati láncot vagy egy felügyelt identitást. A felügyelt identitás beállításhoz létrehozhat egy rendszer által hozzárendelt felügyelt identitást a Stream Analytics-feladathoz, vagy egy felhasználó által hozzárendelt felügyelt identitást a tárfiókkal való hitelesítéshez. Felügyelt identitás használatakor a felügyelt identitásnak egy megfelelő szerepkör tagjának kell lennie a tárfiókban.
Útvonalminta (nem kötelező) A blobok a megadott tárolóban való megkereséséhez használt fájl elérési útja. Ha blobokat szeretne olvasni a tároló gyökeréből, ne állítson be elérési utat. Az elérési úton a következő három változó egy vagy több példányát adhatja meg: {date}, {time}vagy {partition}

1. példa: cluster1/logs/{date}/{time}/{partition}

2. példa: cluster1/logs/{date}

A * karakter nem engedélyezett érték az elérési út előtagjában. Csak érvényes Azure blobkarakterek engedélyezettek. Ne adjon meg tárolóneveket vagy fájlneveket.
Dátumformátum (nem kötelező) Ha az elérési úton a dátumváltozót használja, az a dátumformátum, amelyben a fájlok rendszerezésre kerülnek. Például: YYYY/MM/DD

Amikor a blobbemenet elérési útjában {date} vagy {time} található, a Stream Analytics a mappákat növekvő időrendi sorrendben vizsgálja.
Időformátum (nem kötelező) Ha az elérési útvonalban használja az időváltozót, akkor a fájlok időrendben történő rendszerezésének formátumát határozza meg. Jelenleg az egyetlen támogatott érték órákra vonatkozik HH .
Partíciókulcs Ez egy nem kötelező mező, amely csak akkor érhető el, ha a feladatot az 1.2-es vagy újabb kompatibilitási szint használatára konfigurálja. Ha egy tulajdonság alapján particionálja a bemenetet, itt adhatja hozzá a tulajdonság nevét. A lekérdezés teljesítményének javítására szolgál, ha tartalmaz egy PARTITION BY vagy GROUP BY záradékot ezen a tulajdonságon. Ha ez a feladat 1.2-es vagy újabb kompatibilitási szintet használ, ez a mező alapértelmezés szerint "PartitionId".
Bemeneti partíciók száma Ez a mező csak akkor jelenik meg, ha {partition} szerepel az elérésiút-mintában. A tulajdonság értéke =1 egész szám >. Ahol a(z) {partition} megjelenik a pathPatternben, a program egy 0 és a mező értéke közötti számot használ, -1.
Esemény szerializálási formátuma A bejövő adatfolyam szerializálási formátuma (JSON, CSV, Avro). Győződjön meg arról, hogy a JSON formátum megfelel a specifikációnak, és nem tartalmaz kezdő nullát a decimális számok előtt.
Kódolás CSV és JSON esetén jelenleg az UTF-8 az egyetlen támogatott kódolási formátum.
Tömörítés A bejövő adatfolyam olvasásához használt tömörítési típus, például Nincs (alapértelmezett), Gzip vagy Deflate.

Ha az adatok egy Blob Storage-forrásból származnak, a Stream Analytics-lekérdezésben a következő metaadatmezőket érheti el:

Ingatlan Leírás
BlobName Annak a bemeneti blobnak a neve, amelyből az esemény származik.
EventProcessedUtcTime A Stream Analytics által az esemény feldolgozásakor megadott dátum és idő.
BlobLastModifiedUtcTime A blob utolsó módosításának dátuma és időpontja.
PartitionId A bemeneti adapter nulla alapú partícióazonosítója.

Az alábbi mezők használatával az alábbi példához hasonlóan írhat lekérdezést:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Adatok streamelése az Apache Kafkából

Azure Stream Analytics lehetővé teszi, hogy közvetlenül az Apache Kafka-fürtökhöz csatlakozzon az adatok betöltéséhez. A megoldás alacsony kódszámú, és teljes mértékben a Microsoft Azure Stream Analytics csapata felügyeli, így megfelel az üzleti megfelelőségi szabványoknak. A Kafka bemenet visszamenőlegesen kompatibilis, és a legújabb ügyfélkiadással támogatja a 0.10-es verziótól kezdődő összes verziót. A konfigurációktól függően kapcsolódhat egy virtuális hálózaton belüli Kafka-fürtökhöz és nyilvános végponttal rendelkező Kafka-fürtökhöz. A konfiguráció a meglévő Kafka konfigurációs konvenciókra támaszkodik. A támogatott tömörítési típusok: None, Gzip, Snappy, LZ4 és Zstd.

További információkért lásd: Adatfolyam küldése a Kafkából az Azure Stream Analytics-be (előzetes verzió).

Következő lépések