Menyalurkan data sebagai input ke Stream Analytics

Azure Stream Analytics terintegrasi dengan aliran data Azure sebagai input dari lima jenis sumber daya:

Sumber daya input ini dapat ada di langganan Azure yang sama dengan pekerjaan Azure Stream Analytics Anda atau di langganan yang berbeda.

Kompresi

Azure Stream Analytics mendukung pemadatan untuk semua sumber input. Jenis kompresi yang didukung adalah: None, Gzip, dan Deflate. Azure Stream Analytics tidak mendukung pemadatan untuk data referensi. Jika data input adalah data Avro yang dikompresi, Stream Analytics menanganinya secara transparan. Anda tidak perlu menentukan jenis kompresi dengan serialisasi Avro.

Membuat, mengedit, atau menguji input

Anda dapat menggunakan portal Azure, Visual Studio, dan Visual Studio Code untuk menambahkan dan melihat atau mengedit input yang ada pada pekerjaan streaming Anda. Anda juga dapat menguji koneksi input dan menguji kueri dari data sampel dari portal Azure, Visual Studio, dan Visual Studio Code. Saat Anda menulis kueri, cantumkan input dalam klausul FROM. Anda bisa mendapatkan daftar input yang tersedia dari halaman Kueri di portal. Jika Anda ingin menggunakan beberapa input, JOIN atau menulis beberapa SELECT kueri.

Catatan

Untuk pengalaman pengembangan lokal terbaik, gunakan alat Stream Analytics untuk Visual Studio Code. Ada celah fitur yang diketahui di alat Azure Stream Analytics untuk Visual Studio 2019 (versi 2.6.3000.0) dan tidak akan ditingkatkan ke depannya.

Mengalirkan data dari Event Hub

Azure Event Hubs adalah ingestor peristiwa terbitkan-berlangganan yang sangat dapat diskalakan. Pusat aktivitas dapat mengumpulkan jutaan peristiwa per detik sehingga Anda dapat memproses dan menganalisis sejumlah besar data yang dihasilkan oleh perangkat dan aplikasi Anda yang terhubung. Bersama-sama, Azure Event Hubs dan Stream Analytics menyediakan solusi end-to-end untuk analitik real time. Azure Event Hubs mengumpankan peristiwa ke Azure secara real time, dan pekerjaan Azure Stream Analytics memproses peristiwa tersebut secara real time. Misalnya, Anda dapat mengirim klik web, pembacaan sensor, atau peristiwa log online ke Azure Event Hubs. Anda kemudian dapat membuat pekerjaan Azure Stream Analytics untuk menggunakan Azure Event Hubs untuk data input untuk pemfilteran, agregasi, dan korelasi real time.

EventEnqueuedUtcTime adalah tanda waktu kedatangan peristiwa di pusat aktivitas dan merupakan tanda waktu default peristiwa yang berasal dari Azure Event Hubs ke Azure Stream Analytics. Untuk memproses data sebagai aliran menggunakan tanda waktu dalam payload peristiwa, Anda harus menggunakan kata kunci TIMESTAMP BY .

Grup konsumen Event Hubs

Konfigurasikan setiap input event hub dengan grup konsumennya sendiri. Saat pekerjaan berisi gabungan mandiri atau memiliki beberapa input, lebih dari satu pembaca hilir mungkin membaca beberapa input. Situasi ini berdampak pada jumlah pembaca dalam satu grup konsumen. Untuk menghindari melebihi batas Azure Event Hubs lima pembaca per grup konsumen per partisi, tetapkan grup konsumen untuk setiap pekerjaan Azure Stream Analytics. Ada juga batas 20 grup konsumen untuk hub peristiwa tingkat Standar. Untuk informasi selengkapnya, lihat Troubleshoot Azure Stream Analytics inputs.

Membuat input dari Event Hubs

Tabel berikut menjelaskan setiap properti di halaman input New di portal Azure untuk mengalirkan input data dari pusat aktivitas:

Properti Deskripsi
Alias masukan Nama mudah diingat yang Anda gunakan dalam kueri tugas untuk mereferensikan output ini.
Subscription Pilih langganan Azure tempat sumber daya Pusat aktivitas berada.
Namespace Layanan Pusat Aktivitas Namespace Event Hubs adalah kontainer untuk Event Hub. Saat Anda membuat hub acara, Anda juga membuat namespace.
Nama Pusat Aktivitas Nama event hub untuk digunakan sebagai input.
Grup konsumen Pusat Aktivitas (disarankan) Gunakan grup konsumen yang berbeda untuk setiap pekerjaan Azure Stream Analytics. String ini mengidentifikasi grup konsumen yang akan digunakan untuk mengambil data dari event hub. Jika Anda tidak menentukan grup konsumen, pekerjaan Azure Stream Analytics menggunakan $Default grup konsumen.
Mode autentikasi Tentukan jenis autentikasi yang ingin Anda gunakan untuk menyambungkan ke hub peristiwa. Gunakan connection string atau identitas yang dikelola untuk mengautentikasi dengan event hub. Untuk opsi identitas terkelola, Anda dapat membuat identitas terkelola yang ditetapkan sistem untuk job Azure Stream Analytics atau identitas terkelola yang ditetapkan pengguna untuk mengautentikasi dengan event hub. Saat Anda menggunakan identitas terkelola, identitas terkelola harus menjadi anggota peran Penerima Data Azure Event Hubs atau Pemilik Data Azure Event Hubs.
Nama kebijakan Azure Event Hub Kebijakan akses bersama yang menyediakan akses ke Azure Event Hubs. Setiap kebijakan akses bersama memiliki nama, izin yang Anda tetapkan, dan kunci akses. Opsi ini diisi secara otomatis, kecuali jika Anda memilih opsi untuk menyediakan pengaturan Azure Event Hubs secara manual.
kunci partisi Bidang opsional ini hanya tersedia jika Anda mengonfigurasi pekerjaan Anda untuk menggunakan tingkat kompatibilitas 1.2 atau yang lebih tinggi. Jika input Anda dipartisi oleh properti, tambahkan nama properti ini di sini. Ini dapat digunakan untuk meningkatkan performa kueri Anda jika menyertakan sebuah PARTITION BY atau GROUP BY pada properti ini. Jika pekerjaan ini menggunakan tingkat kompatibilitas 1.2 atau lebih tinggi, bidang ini default ke PartitionId.
Format serialisasi peristiwa Format serialisasi (JSON, CSV, Avro) dari aliran data masuk. Pastikan format JSON selaras dengan spesifikasi dan tidak menyertakan 0 di depan untuk angka desimal.
Encoding UTF-8 saat ini adalah satu-satunya format pengodean yang didukung.
Jenis kompresi peristiwa Jenis kompresi yang digunakan untuk membaca aliran data yang masuk, seperti Tidak Ada (default), Gzip, atau Deflate.
Skema Registri Pilih registri skema dengan skema untuk data peristiwa yang diterima dari pusat aktivitas.

Saat data Anda berasal dari input aliran Azure Event Hubs, Anda memiliki akses ke bidang metadata berikut dalam kueri Analisis Aliran Anda:

Properti Deskripsi
EventProcessedUtcTime Tanggal waktu saat Stream Analytics memproses peristiwa.
EventEnqueuedUtcTime Tanggal dan waktu ketika Event Hubs menerima kejadian.
PartitionId ID partisi berbasis nol untuk adapter input.

Dengan menggunakan bidang ini, Anda bisa menulis kueri seperti contoh berikut:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Catatan

Saat Anda menggunakan Azure Event Hubs sebagai titik akhir untuk Rute IoT Hub, Anda dapat mengakses metadata IoT Hub dengan menggunakan fungsi GetMetadataPropertyValue.

Melakukan streaming data dari IoT Hub

Azure IoT Hub adalah ingestor peristiwa publikasi-berlangganan yang dioptimalkan untuk skenario IoT dan sangat dapat diskalakan.

Tanda waktu default dari peristiwa yang berasal dari IoT Hub di Azure Stream Analytics adalah tanda waktu saat peristiwa tiba di IoT Hub, yang EventEnqueuedUtcTime. Untuk memproses data sebagai aliran menggunakan tanda waktu dalam payload peristiwa, gunakan kata kunci TIMESTAMP BY .

Grup konsumen IoT Hub

Konfigurasikan setiap input IoT Hub Azure Stream Analytics untuk memiliki grup konsumennya sendiri. Saat pekerjaan berisi gabungan mandiri atau ketika memiliki beberapa input, lebih dari satu pembaca mungkin membaca beberapa input. Situasi ini berdampak pada jumlah pembaca dalam satu grup konsumen. Untuk menghindari melebihi batas Azure IoT Hub sebanyak lima pembaca per grup konsumen per partisi, tetapkan satu grup konsumen untuk setiap tugas Azure Stream Analytics.

Mengonfigurasi IoT Hub sebagai input aliran data

Tabel berikut menjelaskan setiap properti di halaman input New di portal Azure saat Anda mengonfigurasi IoT Hub sebagai input aliran.

Properti Deskripsi
Alias masukan Nama mudah diingat yang Anda gunakan dalam kueri tugas untuk mereferensikan output ini.
Subscription Pilih langganan tempat sumber daya IoT Hub ada.
IoT Hub Nama *IoT Hub* yang akan digunakan sebagai masukan.
Grup konsumen Gunakan grup konsumen yang berbeda untuk setiap pekerjaan Azure Stream Analytics. Grup konsumen menyerap data dari IoT Hub. Stream Analytics menggunakan grup konsumen $Default kecuali jika Anda menentukan sebaliknya.
Nama kebijakan akses bersama Kebijakan akses bersama yang menyediakan akses ke IoT Hub. Setiap kebijakan akses bersama memiliki nama, izin yang Anda tetapkan, dan kunci akses.
Kunci kebijakan akses bersama Kunci akses bersama yang digunakan untuk mengotorisasi akses ke IoT Hub. Opsi ini diisi secara otomatis kecuali Anda memilih opsi untuk menyediakan pengaturan IoT Hub secara manual.
Titik Akhir Titik akhir untuk IoT Hub.
kunci partisi Ini adalah bidang opsional yang hanya tersedia jika Anda mengonfigurasi pekerjaan Anda untuk menggunakan tingkat kompatibilitas 1.2 atau yang lebih tinggi. Jika Anda mempartisi input dengan properti, Anda dapat menambahkan nama properti ini di sini. Ini digunakan untuk meningkatkan performa kueri Anda jika menyertakan klausa PARTITION BY atau GROUP BY pada properti ini. Jika pekerjaan ini menggunakan tingkat kompatibilitas 1.2 atau lebih tinggi, bidang ini default ke "PartitionId."
Format serialisasi peristiwa Format serialisasi (JSON, CSV, Avro) dari aliran data masuk. Pastikan format JSON selaras dengan spesifikasi dan tidak menyertakan 0 di depan untuk angka desimal.
Encoding UTF-8 saat ini adalah satu-satunya format pengodean yang didukung.
Jenis kompresi peristiwa Jenis kompresi yang digunakan untuk membaca aliran data yang masuk, seperti Tidak Ada (default), Gzip, atau Deflate.

Saat menggunakan data aliran dari IoT Hub, Anda memiliki akses ke bidang metadata berikut di kueri Azure Stream Analytics Anda:

Properti Deskripsi
EventProcessedUtcTime Tanggal dan waktu pemrosesan peristiwa.
EventEnqueuedUtcTime Tanggal dan waktu saat IoT Hub menerima peristiwa.
PartitionId ID partisi berbasis nol untuk adapter input.
IoTHub.MessageId ID yang digunakan untuk menghubungkan komunikasi dua arah dalam IoT Hub.
IoTHub.CorrelationId ID yang digunakan dalam respons pesan dan umpan balik dalam IoT Hub.
IoTHub.ConnectionDeviceId ID autentikasi yang digunakan untuk mengirim pesan ini. IoT Hub menstempel nilai ini pada pesan yang mengarah ke layanan.
IoTHub.ConnectionDeviceGenerationId ID generasi dari perangkat yang diautentikasi, yang digunakan untuk mengirim pesan ini. IoT Hub membubuhkan stempel pada nilai ini untuk pesan tujuan layanan.
IoTHub.EnqueuedTime Waktu ketika IoT Hub menerima pesan.

Mengalirkan data dari penyimpanan Blob atau Data Lake Storage Gen2

Untuk skenario yang melibatkan penyimpanan data yang tidak terstruktur dalam jumlah besar di cloud, Azure penyimpanan Blob atau Azure Data Lake Storage Gen2 menawarkan solusi yang hemat biaya dan dapat diskalakan. Data dalam penyimpanan Blob atau Azure Data Lake Storage Gen2 dianggap sebagai data tidak aktif. Namun, Stream Analytics dapat memproses data ini sebagai aliran data.

Skenario yang umum digunakan untuk menggunakan input tersebut dengan Azure Stream Analytics adalah pemrosesan log. Dalam skenario ini, Anda mengambil file data telemetri dari sistem dan perlu mengurai dan memprosesnya untuk mengekstrak data yang bermakna.

Tanda waktu default untuk penyimpanan Blob atau peristiwa Azure Data Lake Storage Gen2 dalam Azure Stream Analytics adalah tanda waktu ketika terakhir diubah, yaitu BlobLastModifiedUtcTime. Jika Anda mengunggah blob ke akun penyimpanan pada pukul 13:00, dan memulai pekerjaan Azure Stream Analytics dengan opsi Now pada pukul 13:01, pekerjaan tersebut tidak memproses blob karena waktu modifikasinya berada di luar periode eksekusi pekerjaan.

Jika Anda mengunggah blob ke kontainer akun penyimpanan pada pukul 13:00, dan memulai pekerjaan Azure Stream Analytics dengan menggunakan Waktu Kustom pada pukul 13:00 atau lebih awal, pekerjaan tersebut mengambil blob karena waktu modifikasinya berada di dalam periode menjalankan pekerjaan.

Jika Anda memulai pekerjaan Azure Stream Analytics dengan menggunakan Now pukul 13:00, dan mengunggah blob ke kontainer akun penyimpanan pada pukul 13:01, Azure Stream Analytics mengambil blob. Tanda waktu yang ditetapkan untuk setiap blob hanya didasarkan pada BlobLastModifiedTime. Folder tempat blob berada tidak memiliki hubungan dengan tanda waktu yang ditetapkan. Misalnya, jika ada blob 2019/10-01/00/b1.txt dengan BlobLastModifiedTime , 2019-11-11maka tanda waktu yang ditetapkan ke blob ini adalah 2019-11-11.

Untuk memproses data sebagai aliran dengan menggunakan tanda waktu dalam payload peristiwa, Anda harus menggunakan kata kunci TIMESTAMP BY . Tugas Azure Stream Analytics mengambil data dari Azure Blob Storage atau Azure Data Lake Storage Gen2 setiap detik jika file blob tersedia. Jika file blob tidak tersedia, pekerjaan akan menggunakan backoff eksponensial dengan penundaan waktu maksimum 90 detik.

Catatan

Azure Stream Analytics tidak mendukung penambahan konten ke file blob yang ada. Azure Stream Analytics hanya menampilkan setiap file sekali, dan tidak memproses perubahan apa pun yang terjadi dalam file setelah pekerjaan membaca data. Praktik terbaik adalah mengunggah semua data untuk file blob sekaligus dan kemudian menambahkan peristiwa baru lain ke file blob baru yang berbeda.

Dalam skenario di mana Anda terus menambahkan sejumlah besar blob dan Stream Analytics memproses blob saat menambahkannya, Anda mungkin melewatkan beberapa blob dalam kasus yang jarang karena granularitas BlobLastModifiedTime. Anda dapat mengurangi masalah ini dengan mengunggah blob setidaknya dua detik terpisah. Jika opsi ini tidak layak, Anda dapat menggunakan Azure Event Hubs untuk mengalirkan peristiwa dalam volume besar.

Konfigurasikan penyimpanan Blob sebagai input streaming

Tabel berikut menjelaskan setiap properti di halaman input New di portal Azure saat Anda mengonfigurasi penyimpanan Blob sebagai input aliran.

Properti Deskripsi
Alias masukan Nama mudah diingat yang Anda gunakan dalam kueri tugas untuk mereferensikan output ini.
Subscription Pilih langganan tempat sumber daya penyimpanan berada.
Akun penyimpanan Nama akun penyimpanan tempat file blob berada.
kunci akun Penyimpanan Kunci rahasia yang terkait dengan akun penyimpanan. Opsi ini diisi secara otomatis kecuali Anda memilih opsi untuk menyediakan pengaturan secara manual.
Kontainer Kontainer menyediakan pengelompokan logis untuk blob. Anda dapat memilih Gunakan kontainer yang sudah ada atau Buat baru agar kontainer baru dibuat.
Mode autentikasi Tentukan jenis autentikasi yang ingin Anda gunakan untuk menyambungkan ke akun penyimpanan. Anda dapat menggunakan connection string atau identitas terkelola untuk mengautentikasi dengan akun penyimpanan. Untuk opsi identitas terkelola, Anda dapat membuat identitas terkelola yang diberi sistem ke pekerjaan Stream Analytics atau identitas terkelola yang diberi pengguna untuk melakukan autentikasi dengan akun penyimpanan. Saat Anda menggunakan identitas terkelola, identitas terkelola harus menjadi anggota peran yang sesuai di akun penyimpanan.
Pola jalur (opsional) Jalur file yang digunakan untuk menemukan blob di dalam kontainer yang ditentukan. Jika Anda ingin membaca blob dari akar kontainer, jangan atur pola jalur. Dalam jalur, Anda dapat menentukan satu atau beberapa instans dari tiga variabel berikut: {date}, , {time}atau {partition}

Contoh 1: cluster1/logs/{date}/{time}/{partition}

Contoh 2: cluster1/logs/{date}

Karakter * bukan nilai yang diizinkan untuk awalan jalur. Hanya karakter blob Azure yang valid yang diizinkan. Jangan sertakan nama kontainer atau nama file.
Format tanggal (opsional) Jika Anda menggunakan variabel tanggal di jalur, format tanggal di mana file diatur. Contoh: YYYY/MM/DD

Saat input blob memiliki {date} atau {time} di jalurnya, Stream Analytics melihat folder dalam urutan kronologis.
Format waktu (opsional) Jika Anda menggunakan variabel waktu di jalur, format waktu yang digunakan untuk mengatur file. Saat ini satu-satunya nilai yang didukung adalah HH selama berjam-jam.
kunci partisi Ini adalah bidang opsional yang hanya tersedia jika Anda mengonfigurasi pekerjaan Anda untuk menggunakan tingkat kompatibilitas 1.2 atau yang lebih tinggi. Jika Anda mempartisi input dengan properti, Anda dapat menambahkan nama properti ini di sini. Ini digunakan untuk meningkatkan performa kueri Anda jika menyertakan klausa PARTITION BY atau GROUP BY pada properti ini. Jika pekerjaan ini menggunakan tingkat kompatibilitas 1.2 atau lebih tinggi, bidang ini default ke "PartitionId."
Jumlah perhitungan partisi input Bidang ini hanya ada ketika {partition} ada dalam pola jalur. Nilai properti ini adalah bilangan bulat >=1. Di mana pun {partition} muncul di pathPattern, angka antara 0 dan nilai bidang ini -1 akan digunakan.
Format serialisasi peristiwa Format serialisasi (JSON, CSV, Avro) dari aliran data masuk. Pastikan format JSON selaras dengan spesifikasi dan tidak menyertakan 0 di depan untuk angka desimal.
Encoding Untuk CSV dan JSON, UTF-8 saat ini adalah satu-satunya format pengodean yang didukung.
Compression Jenis kompresi yang digunakan untuk membaca aliran data yang masuk, seperti Tidak Ada (default), Gzip, atau Deflate.

Saat data Anda berasal dari sumber penyimpanan Blob, Anda dapat mengakses bidang metadata berikut di kueri Azure Stream Analytics Anda:

Properti Deskripsi
BlobName Nama blob input dari mana peristiwa berasal.
EventProcessedUtcTime Tanggal waktu saat Stream Analytics memproses peristiwa.
BlobLastModifiedUtcTime Tanggal dan waktu blob terakhir diubah.
PartitionId ID partisi berbasis nol untuk adapter input.

Dengan menggunakan bidang ini, Anda bisa menulis kueri seperti contoh berikut:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Mengalirkan data dari Apache Kafka

Azure Stream Analytics memungkinkan Anda terhubung langsung ke kluster Apache Kafka untuk menyerap data. Solusinya adalah kode rendah dan sepenuhnya dikelola oleh tim Azure Stream Analytics di Microsoft, sehingga memenuhi standar kepatuhan bisnis. Input Kafka kompatibel dengan versi sebelumnya dan mendukung semua versi mulai dari versi 0.10 dengan rilis klien terbaru. Anda dapat terhubung ke kluster Kafka di dalam jaringan virtual dan kluster Kafka dengan titik akhir publik, tergantung pada konfigurasinya. Konfigurasi bergantung pada konvensi konfigurasi Kafka yang ada. Jenis kompresi yang didukung adalah None, Gzip, Snappy, LZ4, dan Zstd.

Untuk informasi selengkapnya, lihat Mengalirkan data dari Kafka ke Azure Stream Analytics (Pratinjau).

Langkah berikutnya