Aracılığıyla paylaş


Lakeflow Spark Bildirimli İşlem Hatları akışlarıyla verileri artımlı olarak yükleme ve işleme

Veriler, veri işleme hatları üzerinden akan akışlarla işlenir. Her akış bir sorgudan ve genellikle bir hedeflerden oluşur. Veri akışı, sorguyu ya toplu işlem olarak ya da veri akışı biçiminde hedefe artımlı olarak işler. Akış, Lakeflow Spark Bildirimli İşlem Hatları'ndaki bir işlem hattının içinde bulunur.

Genellikle, bir hedefi güncelleştiren bir işlem hattında sorgu oluşturduğunuzda akışlar otomatik olarak tanımlanır, ancak birden çok kaynaktan tek bir hedefe ekleme gibi daha karmaşık işlemler için açıkça ek akışlar tanımlayabilirsiniz.

Güncelleştirmeler

Bir akış, tanımlama işlem hattı her güncelleştirildiğinde çalıştırılır. Akış, kullanılabilir en son verileri içeren tablolar oluşturur veya güncelleştirir. Akışın türüne ve verilerdeki değişikliklerin durumuna bağlı olarak, güncelleştirme yalnızca yeni kayıtları işleyen veya veri kaynağındaki tüm kayıtları yeniden işleyen bir tam yenileme gerçekleştiren artımlı yenileme gerçekleştirebilir.

Varsayılan akış oluşturma

İşlem hattı oluşturduğunuzda, genellikle destekleyen sorguyla birlikte bir tablo veya görünüm tanımlarsınız. Örneğin, bu SQL sorgusunda, customers_silver adlı tablodan okuyarak customers_bronze adlı bir akış tablosu oluşturursunuz.

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

Python'da da aynı akış tablosunu oluşturabilirsiniz. Python'da, Lakeflow Spark Bildirimli İşlem Hatları işlevini eklemek için dekoratörlerle birlikte veri çerçevesi döndüren bir sorgu işlevi oluşturarak işlem hatlarını kullanırsınız:

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

Bu örnekte bir akış tablosu oluşturdunuz. SQL ve Python'da benzer söz dizimine sahip maddileştirilmiş görünümler de oluşturabilirsiniz. Daha fazla bilgi için bkz . Akış tabloları ve Gerçekleştirilmiş görünümler.

Bu örnek, akış tablosuyla birlikte varsayılan bir akış oluşturur. Akış tablosu için varsayılan akış, her tetikleyiciye yeni satırlar ekleyen bir ekleme akışıdır. İşlem hatlarını kullanmanın en yaygın yolu budur: tek adımda akış ve hedef oluşturma. Verileri almak veya dönüştürmek için bu stili kullanabilirsiniz.

Ekleme akışları, tek bir hedefi güncelleştirmek için birden çok akış kaynağından veri okumayı gerektiren işlemeyi de destekler. Örneğin, var olan bir akış tablonuz ve akışınız varsa ve bu mevcut akış tablosuna yazan yeni bir akış kaynağı eklemek istediğinizde ekleme akışı işlevini kullanabilirsiniz.

Tek bir hedefe yazmak için birden çok akış kullanma

Önceki örnekte, tek adımda bir akış ve akış tablosu oluşturdunuz. Önceden oluşturulmuş bir tablo için akışlar da oluşturabilirsiniz. Bu örnekte, tablo ve onunla ilişkili akışın oluşturulmasını ayrı adımlarda görebilirsiniz. Bu kod, akış tablosu ve akış için aynı adın kullanılması da dahil olmak üzere varsayılan akış oluşturmayla aynı sonuçlara sahiptir.

Piton

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

Bir akışı hedeften bağımsız olarak oluşturmak, aynı hedefe veri ekleyen birden çok akış da oluşturabileceğiniz anlamına gelir.

Python arabirimindeki @dp.append_flow dekoratörünü veya SQL arabirimindeki CREATE FLOW...INSERT INTO ifadesini kullanarak, birden çok akış kaynağından gelen bir akış tablosunu hedeflemek üzere yeni bir akış oluşturun. Aşağıdaki gibi görevleri işlemek için ekleme akışını kullanın:

  • Tam yenileme gerektirmeden mevcut bir akış tablosuna veri ekleyen akış kaynakları ekleyin. Örneğin, çalıştığınız her bölgeden bölgesel verileri birleştiren bir tablonuz olabilir. Yeni bölgeler kullanıma sunulduktan sonra, tam yenileme gerçekleştirmeden yeni bölge verilerini tabloya ekleyebilirsiniz. Mevcut akış tablosuna akış kaynakları ekleme örneği için bkz . Örnek: Birden çok Kafka konu başlığından akış tablosuna yazma.
  • Eksik geçmiş verileri (geri doldurma) ekleyerek bir akış tablosunu güncelleştirin. Söz dizimini INSERT INTO ONCE kullanarak bir kez çalışan geçmişe dönük bir veri doldurma eklemesi oluşturabilirsiniz. Örneğin, bir Apache Kafka konu başlığı tarafından yazılan mevcut bir akış tablonuz var. Ayrıca, akış tablosuna tam olarak bir kez eklenmesi gereken bir tabloda depolanan geçmiş verileriniz vardır ve işlemeniz verileri eklemeden önce karmaşık bir toplama gerçekleştirmeyi içerdiğinden verileri akışla aktaramazsınız. Örnek olarak, Geçmiş verileri işlem hatlarıyla geri doldurma bölümüne bakın.
  • Birden çok kaynaktaki verileri birleştirin ve UNION yan tümcesini sorguda kullanmak yerine, tek bir veri akışı tablosuna yazın. Yerine ekleme akışı işlemeyi UNION kullanmak, tam yenileme güncelleştirmesi çalıştırmadan hedef tabloyu artımlı olarak güncelleştirmenizi sağlar. Bu şekilde yapılan bir birleşim örneği için Örnek: ekleme akışı işlemeyi kullanma UNION yerine bakınız.

Ekleme akışı işleme tarafından üretilen kayıtların hedefi, mevcut bir tablo veya yeni bir tablo olabilir. Python sorguları için create_streaming_table() işlevini kullanarak bir hedef tablo oluşturun.

Aşağıdaki örnekte, aynı hedef için iki akış eklenerek iki kaynak tablonun birleşimi oluşturulur:

Piton

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

Önemli

  • Veri kalitesi kısıtlamalarını beklentilerle tanımlamanız gerekiyorsa, hedef tablodaki beklentileri işlevin bir parçası create_streaming_table() olarak veya mevcut bir tablo tanımında tanımlayın. Tanımda beklentileri tanımlayamazsınız @append_flow .
  • Akışlar bir akış adıyla tanımlanır ve bu ad akış denetim noktalarını tanımlamak için kullanılır. Denetim noktasını tanımlamak için akış adının kullanılması aşağıdaki anlamına gelir:
    • İşlem hattındaki mevcut bir akış yeniden adlandırılırsa denetim noktası taşınmaz ve yeniden adlandırılan akış etkili bir şekilde tamamen yeni bir akıştır.
    • Mevcut denetim noktası yeni akış tanımıyla eşleşmediğinden, bir akış adını işlem hattında yeniden kullanamazsınız.

Akış türleri

Akış tabloları ve maddileşmiş görünümler için varsayılan akışlar ekleme akışlarıdır. Değişiklik verisini yakalama (change data capture) kaynaklarından okumak için de akışlar oluşturabilirsiniz. Aşağıdaki tabloda farklı akış türleri açıklanmaktadır.

Akış türü Description
Append Ekleme akışları, kaynaktaki yeni kayıtların her güncelleştirmeyle hedefe yazıldığı en yaygın akış türüdür. Yapılandırılmış akıştaki ekleme moduna karşılık gelir. Tamamen yenilenmediği sürece, verilerin hedefe yalnızca bir kez eklenmesini belirten toplu iş sorgusu bayrağını ONCE ekleyebilirsiniz. Birden fazla ekleme akışı belirli bir hedefe yazabilir.
Varsayılan akışlar (hedef akış tablosuyla veya gerçekleştirilmiş görünümlerle oluşturulur) hedefle aynı adı alır. Diğer hedeflerin varsayılan akışları yoktur.
Otomatik CDC (önceden değişiklikleri uygula) Otomatik CDC akışı, değişiklik verileri yakalama (CDC) verilerini içeren bir sorguyu alır. Otomatik CDC akışları yalnızca akış tablolarını hedefleyebilir ve kaynak mutlaka bir akış kaynağı olmalıdır (bu durum ONCE akışları için de geçerlidir). Birden fazla otomatik CDC akışı tek bir akış tablosunu hedefleyebilir. Otomatik CDC akışı için hedef işlevi gören bir akış tablosu yalnızca diğer otomatik CDC akışları tarafından hedeflenebilir.
CDC verileri hakkında daha fazla bilgi için bkz . AUTO CDC API'leri: İşlem hatlarıyla değişiklik verilerini yakalamayı basitleştirme.

Ek bilgiler

Akışlar ve bunların kullanımı hakkında daha fazla bilgi için aşağıdaki konulara bakın: