Stream Analytics'e giriş olarak veri akışı yapın

Stream Analytics, beş tür kaynaktan giriş olarak Azure veri akışlarıyla tümleşir:

Bu giriş kaynakları Stream Analytics işinizle aynı Azure aboneliğinde veya farklı bir abonelikte bulunabilir.

Sıkıştırma

Stream Analytics tüm giriş kaynakları için sıkıştırmayı destekler. Desteklenen sıkıştırma türleri şunlardır: None, Gzip ve Deflate. Stream Analytics, başvuru verileri için sıkıştırmayı desteklemez. Giriş verileri sıkıştırılmış Avro verileriyse Stream Analytics bunu saydam bir şekilde işler. Sıkıştırma türünü Avro serileştirme ile belirtmeniz gerekmez.

Giriş oluşturma, düzenleme veya test

Akış işinizde var olan girişleri eklemek ve görüntülemek veya düzenlemek için Azure portal, Visual Studio ve Visual Studio Code kullanabilirsiniz. Giriş bağlantılarını test edebilir ve Azure portalından Visual Studio ve Visual Studio Code örnek verilerinden gelen sorguları test edebilirsiniz. Sorgu yazdığınızda, FROM yan tümcesindeki girişi listelersiniz. Kullanılabilir girişlerin listesini portaldaki Sorgu sayfasından alabilirsiniz. Birden fazla giriş kullanmak istiyorsanız, onları JOIN veya birden çok SELECT sorgusu yazın.

Uyarı

En iyi yerel geliştirme deneyimi için Visual Studio Code için Stream Analytics araçlarını kullanın. Visual Studio 2019 (sürüm 2.6.3000.0) için Stream Analytics araçlarında bilinen özellik boşlukları vardır ve bundan sonra iyileştirilmeyecektir.

Event Hubs'dan veri akışı

Azure Event Hubs yüksek oranda ölçeklenebilir bir yayınla-abone olma etkinliği alıcısıdır. Bir olay hub'ı saniyede milyonlarca olay toplayabilir, böylece bağlı cihazlarınız ve uygulamalarınız tarafından üretilen büyük miktarda veriyi işleyip analiz edebilirsiniz. Event Hubs ve Stream Analytics birlikte gerçek zamanlı analiz için uçtan uca bir çözüm sağlar. Event Hubs olayları gerçek zamanlı olarak Azure aktarır ve Stream Analytics işleri bu olayları gerçek zamanlı olarak işler. Örneğin, Event Hubs'a web tıklamaları, algılayıcı okumaları veya çevrimiçi günlük olayları gönderebilirsiniz. Ardından gerçek zamanlı filtreleme, toplama ve bağıntı için giriş verileri için Event Hubs'ı kullanmak üzere Stream Analytics işleri oluşturabilirsiniz.

EventEnqueuedUtcTime bir olayın olay hub'ına gelişinin zaman damgasıdır ve Event Hubs'tan Stream Analytics'e gelen olayların varsayılan zaman damgasıdır. Olay yükünde bir zaman damgası kullanarak verileri akış olarak işlemek için TIMESTAMP BY anahtar sözcüğünü kullanmanız gerekir.

Event Hubs tüketici grupları

Her etkinlik merkezi girişini kendi tüketici grubuyla yapılandırın. Bir iş kendi kendine birleşim içerdiğinde veya birden çok girişe sahip olduğunda, birden çok okuyucu aşağı yönde bazı girişleri okuyabilir. Bu durum, tek bir tüketici grubundaki okuyucu sayısını etkiler. Bölüm başına tüketici grubu başına beş okuyucu olan Event Hubs sınırını aşmamak için her Stream Analytics işi için bir tüketici grubu belirleyin. Standart katman olay hub'ı için 20 tüketici grubu sınırı da vardır. Daha fazla bilgi için bkz. Azure Stream Analytics girdi sorunlarını giderme.

Event Hubs'dan giriş oluşturma

Aşağıdaki tabloda, Azure portalındaki Yeni giriş sayfasındaki her özellik, bir olay hub'ından veri akışı yapmak için açıklanmaktadır:

Mülkiyet Açıklama
Giriş takma adı İşin sorgusunda bu girdiye referans vermek için kullandığınız anlamlı bir ad.
Subscription Event Hub kaynağının bulunduğu Azure aboneliğini seçin.
Event Hub ad alanı Event Hubs ad alanı, olay hub'ları için bir kapsayıcıdır. Bir olay hub'ı oluşturduğunuzda, ad alanını da oluşturursunuz.
Olay Hub'ı adı Girdi olarak kullanılacak Event Hub'ın adı.
Event Hub tüketici grubu (önerilir) Her Stream Analytics işi için ayrı bir tüketici grubu kullanın. Bu dize, olay hub'ından veri almak için kullanılacak tüketici grubunu tanımlar. Eğer bir tüketici grubu belirtmezseniz, Stream Analytics işi $Default tüketici grubunu kullanır.
Kimlik doğrulama modu Olay hub'ına bağlanmak için kullanmak istediğiniz kimlik doğrulaması türünü belirtin. Etkinlik merkezi ile kimlik doğrulaması yapmak için bir bağlantı dizisi veya yönetilen hesap kimliği kullanın. Yönetilen kimlik alternatifi için, Stream Analytics işine sistem tarafından atanan bir yönetilen kimlik veya olay hub'ı ile kimlik doğrulaması için kullanıcı tarafından atanan bir yönetilen kimlik oluşturabilirsiniz. Yönetilen kimlik kullandığınızda, yönetilen kimlik Azure Event Hubs Veri Alıcısı veya Azure Event Hubs Veri Sahibi rollerinin üyesi olmalıdır.
Event Hub ilke adı Event Hubs'a erişim sağlayan paylaşılan erişim ilkesi. Her paylaşılan erişim ilkesinin bir adı, ayarladığınız izinleri ve erişim anahtarları vardır. Event Hubs ayarlarını el ile sağlama seçeneğini belirlemediğiniz sürece bu seçenek otomatik olarak doldurulur.
Bölüm anahtarı Bu isteğe bağlı alan yalnızca işinizi uyumluluk düzeyi 1.2 veya üzerini kullanacak şekilde yapılandırdığınızda kullanılabilir. Girişiniz bir özellik tarafından bölümlenmişse, bu özelliğin adını buraya ekleyin. Sorgunuzun bu özellikte bir PARTITION BY veya GROUP BY yan tümcesi içerip içermediğini kontrol edin ve eğer içeriyorsa performansını geliştirmek için kullanın. Bu iş uyumluluk düzeyi 1.2 veya üzerini kullanıyorsa, bu alan varsayılan olarak PartitionId.
Olay serileştirme biçimi Gelen veri akışının serileştirme biçimi (JSON, CSV, Avro). JSON biçiminin belirtimle uyumlu olduğundan ve ondalık sayılar için baştaki 0'ı içermediğinden emin olun.
Kodlama UTF-8 şu anda desteklenen tek kodlama biçimidir.
Etkinlik sıkıştırma türü Gelen veri akışını okumak için kullanılan sıkıştırma türü: hiçbiri (varsayılan), gzip veya deflate gibi.
Şema kayıt defteri Olay hub'ından alınan olay verilerinin şemalarını içeren şema kayıt defterini seçin.

Verileriniz bir Event Hubs akış girişinden geldiğinde Stream Analytics sorgunuzda aşağıdaki meta veri alanlarına erişebilirsiniz:

Mülkiyet Açıklama
EventProcessedUtcTime Stream Analytics'in olayı işlediği tarih ve saat.
EventEnqueuedUtcTime Event Hubs'ın olayları aldığı tarih ve saat.
PartitionId (Bölme Kimliği) Giriş bağdaştırıcısı için sıfır tabanlı bölüm kimliği.

Bu alanları kullanarak aşağıdaki örneğe benzer bir sorgu yazabilirsiniz:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Uyarı

Event Hubs'ı IoT Hub Yolları için uç nokta olarak kullandığınızda, GetMetadataPropertyValue işlevini kullanarak IoT Hub meta verilerine erişebilirsiniz.

IoT Hub'dan veri akışı

Azure IoT Hub, IoT senaryoları için yükseltilmiş, yüksek oranda ölçeklenebilir bir yayınla-abone olma olay yutucusudur.

Stream Analytics'teki bir IoT Hub'dan gelen olayların varsayılan zaman damgası, olayın IoT Hub'a ulaştığı zamandır ve EventEnqueuedUtcTime. Olay yükünde bir zaman damgası kullanarak verileri akış olarak işlemek için TIMESTAMP BY anahtar sözcüğünü kullanın.

IoT Hub tüketici grupları

Her Stream Analytics IoT Hub girişini kendi tüketici grubuna sahip olacak şekilde yapılandırın. Bir görev kendi kendine eşleşme içerdiğinde veya birden çok girişi olduğunda, birden fazla işleme birimi bazı girişleri okuyabilir. Bu durum, tek bir tüketici grubundaki okuyucu sayısını etkiler. Azure IoT Hub'un bölüm başına tüketici grubu başına beş okuyucu sınırını aşmamak için her Stream Analytics işi için ayrı bir tüketici grubu belirleyin.

Bir veri akışı girişi olarak IoT Hub'ı yapılandırın

Aşağıdaki tabloda, IoT Hub akış girişi olarak yapılandırdığınızda Azure portalındaki Yeni giriş sayfasındaki her özellik açıklanmaktadır.

Mülkiyet Açıklama
Giriş takma adı İşin sorgusunda bu girdiye referans vermek için kullandığınız anlamlı bir ad.
Subscription IoT Hub kaynağının bulunduğu aboneliği seçin.
IoT Hub Giriş olarak kullanılacak IoT Hub adı.
Tüketici grubu Her Stream Analytics işi için farklı bir tüketici grubu kullanın. Tüketici grubu IoT Hub'dan veri alır. Stream Analytics, aksini belirtmediğiniz sürece $Default tüketici grubunu kullanır.
Paylaşılan erişim ilkesi adı IoT Hub'a erişim sağlayan paylaşılan erişim politikası. Her paylaşılan erişim ilkesinin bir adı, ayarladığınız izinleri ve erişim anahtarları vardır.
Paylaşılan erişim ilkesi anahtarı IoT Hub erişimi yetkilendirmek için kullanılan paylaşılan erişim anahtarı. bu seçenek, IoT Hub ayarlarını el ile sağlama seçeneğini belirlemediğiniz sürece otomatik olarak doldurulur.
Bitiş noktası IoT Hub uç noktası.
Bölüm anahtarı Bu, yalnızca işinizi uyumluluk düzeyi 1.2 veya üzerini kullanacak şekilde yapılandırdığınızda kullanılabilen isteğe bağlı bir alandır. Girişinizi bir özelliğe göre bölümlerseniz, bu özelliğin adını buraya ekleyebilirsiniz. Bu özellikte PARTITION BY veya GROUP BY yan tümcesi içeriyorsa sorgunuzun performansını geliştirmek için kullanılır. Bu iş uyumluluk düzeyi 1.2 veya üzerini kullanıyorsa, bu alan varsayılan olarak "PartitionId" olarak adlandırılır.
Olay serileştirme biçimi Gelen veri akışının serileştirme biçimi (JSON, CSV, Avro). JSON biçiminin belirtimle uyumlu olduğundan ve ondalık sayılar için baştaki 0'ı içermediğinden emin olun.
Kodlama UTF-8 şu anda desteklenen tek kodlama biçimidir.
Etkinlik sıkıştırma türü Gelen veri akışını okumak için kullanılan sıkıştırma türü: hiçbiri (varsayılan), gzip veya deflate gibi.

Bir IoT Hub akış verilerini kullandığınızda, Stream Analytics sorgunuzda aşağıdaki meta veri alanlarına erişebilirsiniz:

Mülkiyet Açıklama
EventProcessedUtcTime Olayın işlendiği tarih ve saat.
EventEnqueuedUtcTime IoT Hub olayı aldığı tarih ve saat.
PartitionId (Bölme Kimliği) Giriş bağdaştırıcısı için sıfır tabanlı bölüm kimliği.
IoTHub.MessageId IoT Hub'da iki yönlü iletişimi ilişkilendirmek için kullanılan kimlik.
IoTHub.CorrelationId IoT Hub ileti yanıtlarında ve geri bildirimlerinde kullanılan bir kimlik.
IoTHub.ConnectionDeviceId Bu iletiyi göndermek için kullanılan kimlik doğrulama kimliği. IoT Hub bu değeri hizmete bağlı iletilere damgalar.
IoTHub.ConnectionDeviceGenerationId Kimliği doğrulanmış cihazın, bu mesajı göndermek için kullanılan nesil kimliği. IoT Hub bu değeri hizmete giden iletilere damgalar.
IoTHub.EnqueuedTime IoT Hub iletiyi aldığı saat.

Blob veya Data Lake Storage Gen2'den veri akışı sağla

Büyük miktarlarda yapılandırılmamış verilerin bulutta depolanmasını içeren senaryolar için blob depolama veya Azure Data Lake Storage Gen2 Azure uygun maliyetli ve ölçeklenebilir bir çözüm sunar. Blob depolama veya Azure Data Lake Storage Gen2'deki veriler, dinlenim halindeki veriler olarak kabul edilir. Ancak Stream Analytics bu verileri veri akışı olarak işleyebilir.

Stream Analytics ile bu tür girişleri kullanmak için yaygın olarak kullanılan bir senaryo günlük işlemedir. Bu senaryoda, bir sistemden telemetri veri dosyalarını yakalarsınız ve anlamlı verileri ayıklamak için bunları ayrıştırıp işlemeniz gerekir.

Stream Analytics'te Blob Depolama veya Azure Data Lake Storage Gen2 olayının varsayılan zaman damgası, en son değiştirildiği zamandır ve BlobLastModifiedUtcTime. Bir blobu saat 13:00'te bir depolama hesabına yüklerseniz ve Azure Stream Analytics işini 13:01'de Now seçeneğini kullanarak başlatırsanız, değiştirilme süresi işin çalışma süresine uymadığından iş blobu işlemez.

Eğer bir blobu saat 13:00'te bir depolama hesabı konteynerine yüklerseniz ve Azure Stream Analytics işini saat 13:00'te veya daha önce Özel Zaman kullanarak başlatırsanız, değişiklik zamanı iş çalışma süresi içine denk geldiği için iş blobu alır.

Now kullanarak saat 13:00'te bir Azure Stream Analytics işi başlatır ve 13:01'de depolama hesabı kapsayıcısına bir blob yüklerseniz Azure Stream Analytics blobu alır. Her blob için atanmış zaman damgası yalnızca BlobLastModifiedTime'ye dayanmaktadır. Blob'un içinde yer aldığı klasörün atanan zaman damgasıyla hiçbir ilişkisi yoktur. Örneğin, 2019/10-01/00/b1.txt özelliği 2019-11-11 olan bir blob varsa, bu bloba atanan zaman damgası 2019-11-11 olur.

Olay yükünde bir zaman damgası kullanarak verileri akış olarak işlemek için TIMESTAMP BY anahtar sözcüğünü kullanmanız gerekir. Stream Analytics işi, blob dosyası mevcut olduğunda Azure Blob depolamadan veya her saniye Azure Data Lake Storage Gen2'den veri çeker. Blob dosyası kullanılamıyorsa, iş maksimum 90 saniyelik gecikme süresiyle üstel geri çekilme kullanır.

Uyarı

Stream Analytics, mevcut blob dosyasına içerik eklemeyi desteklemez. Stream Analytics her dosyayı yalnızca bir kez görüntüler ve iş verileri okuduktan sonra dosyada gerçekleşen değişiklikleri işlemez. En iyi yöntem, bir blob dosyasının tüm verilerini aynı anda karşıya yüklemek ve ardından farklı, yeni bir blob dosyasına daha yeni olaylar eklemektir.

Sürekli olarak çok sayıda blob eklediğiniz ve Stream Analytics'in blobları eklediğinizde işlediği senaryolarda, detay düzeyinin BlobLastModifiedTime nedeniyle bazı blobları nadiren atlama ihtimaliniz olabilir. En az iki saniye arayla blobları karşıya yükleyerek bu sorunu azaltabilirsiniz. Bu seçenek uygun değilse, büyük hacimli olayların akışını yapmak için Event Hubs'ı kullanabilirsiniz.

Blob depolamayı akış girişi olarak yapılandırma

Aşağıdaki tabloda blob depolamayı akış girişi olarak yapılandırdığınızda Azure portalındaki Yeni giriş sayfasındaki her özellik açıklanmaktadır.

Mülkiyet Açıklama
Giriş takma adı İşin sorgusunda bu girdiye referans vermek için kullandığınız anlamlı bir ad.
Subscription Depolama kaynağının bulunduğu aboneliği seçin.
Depolama hesabı Blob dosyalarının bulunduğu depolama hesabının adı.
Depolama hesabı anahtarı Depolama hesabıyla ilişkili gizli anahtar. Ayarları el ile sağlama seçeneğini belirlemediğiniz sürece bu seçenek otomatik olarak doldurulur.
Kapsayıcı Kapsayıcılar bloblar için mantıksal bir gruplandırma sağlar. Yeni bir kapsayıcı oluşturmak için Mevcut kapsayıcıyı kullan'ı veya Yeni oluştur'u seçebilirsiniz.
Kimlik doğrulama modu Depolama hesabına bağlanmak için kullanmak istediğiniz kimlik doğrulama türünü belirtin. Depolama hesabıyla kimlik doğrulaması yapmak için bir connection string veya yönetilen kimlik kullanabilirsiniz. Yönetilen kimlik seçeneği için Stream Analytics işine sistem tarafından atanan bir yönetilen kimlik veya depolama hesabıyla kimlik doğrulaması yapmak için kullanıcı tarafından atanan yönetilen kimlik oluşturabilirsiniz. Yönetilen kimlik kullandığınızda, yönetilen kimlik depolama hesabında uygun bir rolün üyesi olmalıdır.
Yol deseni (isteğe bağlı) Belirtilen kapsayıcı içindeki blobları bulmak için kullanılan dosya yolu. Kapsayıcının kökünden blobları okumak istiyorsanız, bir yol deseni ayarlamayın. Yol içinde, şu üç değişkenin bir veya daha fazla örneğini belirtebilirsiniz: {date}, {time}veya {partition}

Örnek 1: cluster1/logs/{date}/{time}/{partition}

Örnek 2: cluster1/logs/{date}

Karakter * , yol ön eki için izin verilen bir değer değildir. Yalnızca geçerli Azure blob karakterleri izin verilir. Kapsayıcı adlarını veya dosya adlarını eklemeyin.
Tarih biçimi (isteğe bağlı) Yolda tarih değişkenini kullanırsanız, dosyaların düzenlendiği tarih biçimi. Örnek: YYYY/MM/DD

Blob girişi {date} veya {time} yolunda olduğunda, Stream Analytics klasörlere artan zaman sırasına bakar.
Saat biçimi (isteğe bağlı) Yoldaki zaman değişkenini kullanırsanız, dosyaların düzenlendiği zaman biçimi. Şu anda desteklenen tek değer saat cinsindendir HH .
Bölüm anahtarı Bu, yalnızca işinizi uyumluluk düzeyi 1.2 veya üzerini kullanacak şekilde yapılandırdığınızda kullanılabilen isteğe bağlı bir alandır. Girişinizi bir özelliğe göre bölümlerseniz, bu özelliğin adını buraya ekleyebilirsiniz. Bu özellikte PARTITION BY veya GROUP BY yan tümcesi içeriyorsa sorgunuzun performansını geliştirmek için kullanılır. Bu iş uyumluluk düzeyi 1.2 veya üzerini kullanıyorsa, bu alan varsayılan olarak "PartitionId" olarak adlandırılır.
Giriş bölümlerinin sayısı Bu alan yalnızca yol deseninde {partition} mevcut olduğunda bulunur. Bu özelliğin değeri =1 tamsayıdır >. pathPattern içinde {partition} nerede görünürse görünsin, 0 ile bu alan -1 değeri arasında bir sayı kullanılır.
Olay serileştirme biçimi Gelen veri akışının serileştirme biçimi (JSON, CSV, Avro). JSON biçiminin belirtimle uyumlu olduğundan ve ondalık sayılar için baştaki 0'ı içermediğinden emin olun.
Kodlama CSV ve JSON için utf-8 şu anda desteklenen tek kodlama biçimidir.
Compression Gelen veri akışını okumak için kullanılan sıkıştırma türü: hiçbiri (varsayılan), gzip veya deflate gibi.

Verileriniz bir Blob depolama kaynağından geldiğinde Stream Analytics sorgunuzda aşağıdaki meta veri alanlarına erişebilirsiniz:

Mülkiyet Açıklama
BlobName Olayın geldiği kaynak blobun adı.
EventProcessedUtcTime Stream Analytics'in olayı işlediği tarih ve saat.
BlobLastModifiedUtcTime Blobun en son değiştirildiği tarih ve saat.
PartitionId (Bölme Kimliği) Giriş bağdaştırıcısı için sıfır tabanlı bölüm kimliği.

Bu alanları kullanarak aşağıdaki örneğe benzer bir sorgu yazabilirsiniz:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

Apache Kafka'dan veri akışı

Azure Stream Analytics, verileri almak için doğrudan Apache Kafka kümelerine bağlanmanızı sağlar. Çözüm düşük kodludur ve tamamen Microsoft'taki Azure Stream Analytics ekibi tarafından yönetildiğinden iş uyumluluğu standartlarını karşılar. Kafka girişi geriye dönük olarak uyumludur ve en son istemci sürümüyle 0.10 sürümünden itibaren tüm sürümleri destekler. Yapılandırmalara bağlı olarak, bir sanal ağ içindeki Kafka kümelerine ve genel uç nokta ile Kafka kümelerine bağlanabilirsiniz. Yapılandırma, mevcut Kafka yapılandırma kurallarına dayanır. Desteklenen sıkıştırma türleri None, Gzip, Snappy, LZ4 ve Zstd'dir.

Daha fazla bilgi için bkz. Kafka'dan veri akışını Azure Stream Analytics'e (Önizleme) iletme.

Sonraki Adımlar