Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Apache Spark Yapılandırılmış Akış, veri akışlarını işlemek için ölçeklenebilir, yüksek aktarım hızına sahip, hataya dayanıklı uygulamalar uygulamanıza olanak tanır. Yapılandırılmış Akış, Spark SQL altyapısını temel alır ve Spark SQL Veri Çerçeveleri ve Veri Kümelerindeki yapıları geliştirerek akış sorgularını toplu sorgu yazdığınız gibi yazabilirsiniz.
Yapılandırılmış Veri Akışı uygulamaları, HDInsight Spark kümelerinde çalışır ve Apache Kafka, bir TCP bağlantı noktası (hata ayıklama amacıyla), Azure Depolama veya Azure Veri Gölü Depolama'dan akış verilerine bağlanır. Dış depolama hizmetlerini kullanan ikinci iki seçenek, depolama alanına eklenen yeni dosyaları izlemenizi ve içeriklerini akışa alınmış gibi işlemenizi sağlar.
Yapılandırılmış Akış, giriş verilerine seçim, yansıtma, toplama, pencereleme ve akış DataFrame'i başvuru DataFrame'leri ile birleştirme gibi işlemleri uyguladığınız uzun süre çalışan bir sorgu oluşturur. Ardından, özel kod (SQL Veritabanı veya Power BI gibi) kullanarak sonuçları dosya depolamaya (Azure Depolama Blobları veya Data Lake Storage) veya herhangi bir veri deposuna verirsiniz. Yapılandırılmış Akış ayrıca yerel olarak hata ayıklamak için konsola ve HDInsight'ta hata ayıklama için oluşturulan verileri görebilmeniz için bellek içi bir tabloya çıkış sağlar.
Not
Spark Yapılandırılmış Akış, Spark Akışı'nın (DStreams) yerini alıyor. Bundan sonra Yapılandırılmış Akış geliştirmeler ve bakım alırken DStream'ler yalnızca bakım modunda olacaktır. Yapılandırılmış Akış, varsayılan olarak desteklediği kaynaklar ve çıkışlar için şu anda DStream'ler kadar özellik açısından eksiksiz değildir. Bu nedenle, uygun Spark akış işleme seçeneğini belirlemek için gereksinimlerinizi değerlendirin.
Tablo olarak akışlar
Spark Yapılandırılmış Akış, bir veri akışını ayrıntılı olarak ilişkisiz bir tablo olarak temsil eder, yani yeni veriler geldikçe tablo büyümeye devam eder. Bu giriş tablosu uzun süre çalışan bir sorgu tarafından sürekli işlenir ve sonuçlar bir çıkış tablosuna gönderilir:
Yapılandırılmış Akış'ta veriler sisteme ulaşır ve hemen bir giriş tablosuna alınır. Bu giriş tablosunda işlem gerçekleştiren sorgular (DataFrame ve Veri Kümesi API'lerini kullanarak) yazarsınız. Sorgu çıktısı, sonuç tablosu olan başka bir tablo döndürür. Sonuçlar tablosu, içinden ilişkisel veritabanı gibi bir dış veri deposu için veri çektiğiniz sorgunuzun sonuçlarını içerir. Giriş tablosundan verilerin işlenme zamanlaması tetikleyici aralığı tarafından denetlenir. Varsayılan olarak tetikleyici aralığı sıfırdır, bu nedenle Yapılandırılmış Akış verileri gelir gelmez işlemeye çalışır. Uygulamada bu, Akış İşlemi önceki sorgunun çalışmasını işlemeyi tamamladığında, yeni alınan verilerle ilgili başka bir işlem çalışması başlatır. Tetikleyiciyi belirli bir aralıkta çalışacak şekilde yapılandırarak akış verilerinin zaman tabanlı partilerde işlenmesini sağlayabilirsiniz.
Sonuç tablolarındaki veriler yalnızca sorgunun son işlenme zamanından (ekleme modu) sonra yeni olan verileri içerebilir veya tablo her yeni veri olduğunda yenilenebilir, böylece akış sorgusu başladığından (tam mod) itibaren tablo tüm çıkış verilerini içerir.
Ekleme modu
Ekleme modunda, sonuçlar tablosunda yalnızca son sorgu çalıştırmasının ardından sonuçlar tablosuna eklenen satırlar bulunur ve dış depolama alanına yazılır. Örneğin, en basit sorgu yalnızca giriş tablosundaki tüm verileri değiştirilmeden sonuç tablosuna kopyalar. Tetikleyici aralığı her geçtiğinde, yeni veriler işlenir ve yeni verileri temsil eden satırlar sonuçlar tablosunda görünür.
Termostat gibi sıcaklık algılayıcılarından gelen telemetri verilerini işleyebileceğiniz bir senaryo düşünün. İlk tetikleyicinin saat 00:01'de, sıcaklık okuması 95 derece olan cihaz 1 için bir olayı işlediğini varsayalım. Sorgunun ilk tetikleyicisinde, sonuçlar tablosunda yalnızca saat 00:01 olan satır görüntülenir. Başka bir olay geldiğinde saat 00:02'de, tek yeni satır 00:02 zamanına sahip satırdır ve sonuç tablosunda yalnızca bir satır yer alır.
Ekleme modunu kullanırken sorgunuz projeksiyonlar (önem verdiği sütunları seçme), filtreleme (yalnızca belirli koşullarla eşleşen satırları seçme) veya birleştirme (verileri statik arama tablosundaki verilerle artırma) uygular. Ekleme modu, yalnızca ilgili yeni veri noktalarını dış depolama alanına göndermeyi kolaylaştırır.
Tamamlama modu
Bu kez tam modu kullanarak aynı senaryoyu göz önünde bulundurun. Tam modda, çıkış tablosunun tamamı her tetikleyicide yenilenir, böylece tablo yalnızca en son tetikleyici çalıştırmasından değil tüm çalıştırmalardan veriler içerir. Giriş tablosundan sonuçlar tablosuna değiştirilmemiş verileri kopyalamak için tam modu kullanabilirsiniz. Tetiklenen her çalıştırmada, yeni sonuç satırları önceki tüm satırlarla birlikte görünür. Çıktı sonuçları tablosu, sorgu başladıktan sonra toplanan tüm verileri depolar ve sonunda belleğiniz tükenecektir. Tam mod, gelen verileri bir şekilde özetleyen toplama sorgularıyla kullanılmak üzere tasarlanmıştır, bu nedenle her tetikleyicide sonuç tablosu yeni bir özetle güncelleştirilir.
Şu ana kadar işlenmiş beş saniyelik veriler olduğunu ve altıncı saniye için verileri işlemenin zamanı olduğunu varsayalım. Giriş tablosunda saat 00:01 ve saat 00:03 için olaylar bulunur. Bu örnek sorgunun amacı, cihazın ortalama sıcaklığını beş saniyede bir vermektir. Bu sorgunun uygulanması, her 5 saniyelik pencere içinde kalan tüm değerleri alan, sıcaklığın ortalamasını alan ve bu aralıktaki ortalama sıcaklık için bir satır oluşturan bir toplama uygular. İlk 5 saniyelik pencerenin sonunda iki ikili vardır: (00:01, 1, 95) ve (00:03, 1, 98). Dolayısıyla 00:00-00:05 penceresi için toplama işlemi, ortalama sıcaklığı 96,5 derece olan bir küme oluşturur. Sonraki 5 saniyelik pencerede, saat 00:06'da yalnızca bir veri noktası vardır, dolayısıyla sonuçta elde edilen ortalama sıcaklık 98 derecedir. Saat 00:10'da, tam modu kullanarak sonuç tablosunda hem windows 00:00-00:05 hem de 00:05-00:10 için satırlar bulunur çünkü sorgu yalnızca yeni satırları değil, tüm toplu satırların çıkışını verir. Bu nedenle, yeni pencereler eklendikçe sonuçlar tablosu büyümeye devam eder.
Tam modu kullanan tüm sorgular tablonun sınır olmadan büyümesine neden olmaz. Önceki örnekte, sıcaklığı zaman penceresine göre ortalamak yerine, cihaz kimliğine göre ortalandığını düşünün. Sonuç tablosu, bu cihazdan alınan tüm veri noktalarında cihaz için ortalama sıcaklık ile sabit sayıda satır (cihaz başına bir tane) içerir. Yeni sıcaklıklar alındığında, sonuç tablosu tablodaki ortalamaların her zaman güncel olması için güncelleştirilir.
Spark Yapılandırılmış Akış uygulamasının bileşenleri
Basit bir örnek sorgu, sıcaklık okumalarını saat uzunluğundaki pencerelere göre özetleyebilir. Bu durumda veriler Azure Depolama'daki JSON dosyalarında depolanır (HDInsight kümesi için varsayılan depolama alanı olarak eklenir):
{"time":1469501107,"temp":"95"}
{"time":1469501147,"temp":"95"}
{"time":1469501202,"temp":"95"}
{"time":1469501219,"temp":"95"}
{"time":1469501225,"temp":"95"}
Bu JSON dosyaları, HDInsight kümesinin kapsayıcısının altındaki alt klasörde depolanır temps .
Giriş kaynağını tanımlama
İlk olarak, verilerin kaynağını ve bu kaynağın gerektirdiği ayarları açıklayan bir DataFrame yapılandırın. Bu örnek, Azure Depolama'daki JSON dosyalarından çizimler oluşturur ve bu dosyalara okuma zamanında bir şema uygular.
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
//Cluster-local path to the folder containing the JSON files
val inputPath = "/temps/"
//Define the schema of the JSON files as having the "time" of type TimeStamp and the "temp" field of type String
val jsonSchema = new StructType().add("time", TimestampType).add("temp", StringType)
//Create a Streaming DataFrame by calling readStream and configuring it with the schema and path
val streamingInputDF = spark.readStream.schema(jsonSchema).json(inputPath)
Sorguyu uygulama
Ardından, Streaming DataFrame'e yönelik istenen işlemleri içeren bir sorgu uygulayın. Bu durumda, bir toplama tüm satırları 1 saatlik pencereler halinde gruplandırıp bu 1 saatlik pencerede minimum, ortalama ve maksimum sıcaklıkları hesaplar.
val streamingAggDF = streamingInputDF.groupBy(window($"time", "1 hour")).agg(min($"temp"), avg($"temp"), max($"temp"))
Çıkış kaynağını tanımla
Ardından, her tetikleyici aralığı içinde sonuçlar tablosuna eklenen satırların hedefini tanımlayın. Bu örnek yalnızca tüm satırları daha sonra SparkSQL ile sorgulayabileceğiniz bellek içi bir tabloya temps verir. Tam çıkış modu, tüm pencereler için tüm satırların her seferinde çıkış olmasını sağlar.
val streamingOutDF = streamingAggDF.writeStream.format("memory").queryName("temps").outputMode("complete")
Sorguyu başlatma
Akış sorgusunu başlatın ve sonlandırma sinyali alınana kadar çalıştırın.
val query = streamingOutDF.start()
Sonuçları görüntüleme
Sorgu çalışırken, aynı SparkSession'da sorgu sonuçlarının temps depolandığı tabloda bir SparkSQL sorgusu çalıştırabilirsiniz.
select * from temps
Bu sorgu aşağıdakine benzer sonuçlar verir:
| pencere | min(sıcaklık) | ortalama(sıcaklık) | max(temp) |
|---|---|---|---|
| {u'start': u'2016-07-26T02:00:00.000Z', u'end'... | 95 | 95.231579 | 99 |
| {u'start': u'2016-07-26T03:00:00.000Z', u'end'... | 95 | 96.023048 | 99 |
| {u'start': u'2016-07-26T04:00:00.000Z', u'end'... | 95 | 96.797133 | 99 |
| {u'start': u'2016-07-26T05:00:00.000Z', u'end'... | 95 | 96.984639 | 99 |
| {u'start': u'2016-07-26T06:00:00.000Z', u'end'... | 95 | 97.014749 | 99 |
| {u'start': u'2016-07-26T07:00:00.000Z', u'end'... | 95 | 96.980971 | 99 |
| {u'start': u'2016-07-26T08:00:00.000Z', u'end'... | 95 | 96.965997 | 99 |
Spark Yapılandırılmış Akış API'sinin yanı sıra desteklediği giriş veri kaynakları, işlemler ve çıkış havuzları hakkında ayrıntılı bilgi için bkz . Apache Spark Yapılandırılmış Akış Programlama Kılavuzu.
Kontrol noktası oluşturma ve ileri yazım günlükleri
Dayanıklı ve hataya dayanıklılık sağlamak için Yapılandırılmış Akış , düğüm hatalarında bile akış işlemenin kesintisiz devam etmesini sağlamak için denetim noktalarına dayanır. HDInsight'ta Spark, Azure Depolama veya Data Lake Storage gibi dayanıklı depolama için denetim noktaları oluşturur. Bu denetim noktaları akış sorgusu hakkındaki ilerleme bilgilerini depolar. Ayrıca, Structured Streaming bir önceden yazma günlüğü (WAL) kullanır. WAL, alınmış ancak henüz bir sorgu tarafından işlenmemiş verileri kaydeder. Bir hata oluşursa ve WAL'dan işleme yeniden başlatılırsa, kaynaktan alınan olaylar kaybolmaz.
Spark Streaming uygulamalarını dağıtma
Genellikle bir Spark Streaming uygulamasını yerel olarak bir JAR dosyasına derler ve ardından JAR dosyasını HDInsight kümenize bağlı varsayılan depolama alanına kopyalayarak HDInsight üzerinde Spark'a dağıtırsınız. Uygulamanızı post işlemi kullanarak kümenizden edinebileceğiniz Apache Livy REST API'leriyle başlatabilirsiniz. POST'un gövdesi JAR'nizin yolunu, ana yöntemi akış uygulamasını tanımlayıp çalıştıran sınıfın adını ve isteğe bağlı olarak işin kaynak gereksinimlerini (yürütücü sayısı, bellek ve çekirdek sayısı gibi) ve uygulama kodunuzun gerektirdiği tüm yapılandırma ayarlarını sağlayan bir JSON belgesi içerir.
Tüm uygulamaların durumu, LIVY uç noktasına yönelik get isteğiyle de denetlenebilir. Son olarak, LIVY uç noktasına bir DELETE isteği vererek çalışan bir uygulamayı sonlandırabilirsiniz. LIVY API'siyle ilgili ayrıntılar için bkz. Apache LIVY ile uzak işler