Aracılığıyla paylaş


Azure Databricks'te Delta Lake değişiklik veri akışını kullanma

Değişiklik veri akışı, Azure Databricks'in Delta tablosunun sürümleri arasındaki satır düzeyi değişiklikleri izlemesine olanak tanır. Delta tablosunda etkinleştirildiğinde, çalışma zamanı kayıtları tabloya yazılan tüm veriler için olayları değiştirir. Bu, satır verilerinin yanı sıra belirtilen satırın eklendiğini, silindiğini veya güncelleştirildiğini belirten meta verileri içerir.

Önemli

Değişiklik veri akışı, değişiklik bilgilerini sağlamak için tablo geçmişiyle birlikte çalışır. Delta tablosunun kopyalanması ayrı bir geçmiş oluşturduğundan, kopyalanan tablolardaki değişiklik veri akışı özgün tablonunkiyle eşleşmiyor.

Değişiklik verilerini artımlı olarak işleme

Databricks, Değişiklik tablolarındaki değişiklikleri artımlı olarak işlemek için değişiklik veri akışının Yapılandırılmış Akış ile birlikte kullanılmasını önerir. Tablonuzun değişiklik veri akışı sürümlerini otomatik olarak izlemek için Azure Databricks için Yapılandırılmış Akış'ı kullanmanız gerekir.

Not

Delta Live Tables, değişiklik verilerinin kolay yayılması ve sonuçların SCD (yavaş değişen boyut) türü 1 veya tür 2 tablo olarak depolanması için işlevsellik sağlar. Bkz . DEĞIŞIKLIKLERI UYGULAMA API'leri: Delta Live Tablolarıyla değişiklik verilerini yakalamayı basitleştirme.

Bir tablodan değişiklik veri akışını okumak için, bu tabloda değişiklik veri akışını etkinleştirmeniz gerekir. Bkz . Değişiklik veri akışını etkinleştirme.

Aşağıdaki söz dizimi örneğinde gösterildiği gibi değişiklik veri akışını okumak için bir tabloya karşı akış yapılandırırken seçeneğini readChangeFeed true ayarlayın:

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

Varsayılan olarak akış, akış ilk kez başlatıldığında INSERT tablonun en son anlık görüntüsünü döndürür ve gelecekte değişiklik verileri olarak değişir.

Delta Lake işleminin bir parçası olarak veri işlemelerini değiştirin ve yeni veri işlemeleri tabloya aynı anda kullanılabilir hale gelir.

İsteğe bağlı olarak bir başlangıç sürümü belirtebilirsiniz. Bkz. Başlangıç sürümü belirtmeli miyim?.

Değişiklik veri akışı, bir başlangıç sürümü belirtmeyi gerektiren toplu yürütmeyi de destekler. Bkz. Toplu sorgulardaki değişiklikleri okuma.

Hız sınırları (maxFilesPerTrigger, maxBytesPerTrigger) gibi excludeRegex seçenekler değişiklik verileri okunurken de desteklenir.

Hız sınırlama, başlangıç anlık görüntüsü sürümü dışındaki sürümler için atomik olabilir. Diğer bir ifadeyle, işleme sürümünün tamamı hız sınırına sahip olur veya işlemenin tamamı döndürülür.

Başlangıç sürümü belirtmeliyim mi?

Belirli bir sürümden önce gerçekleşen değişiklikleri yoksaymak istiyorsanız, isteğe bağlı olarak bir başlangıç sürümü belirtebilirsiniz. Zaman damgasını veya Delta işlem günlüğüne kaydedilen sürüm kimliği numarasını kullanarak bir sürüm belirtebilirsiniz.

Not

Toplu okumalar için başlangıç sürümü gereklidir ve birçok toplu iş deseni isteğe bağlı bir bitiş sürümü ayarlamanın avantajlarından yararlanabilir.

Değişiklik veri akışı içeren Yapılandırılmış Akış iş yüklerini yapılandırırken, başlangıç sürümü belirtmenin işlemeyi nasıl etkilediğini anlamanız önemlidir.

Başta yeni veri işleme işlem hatları olmak üzere birçok akış iş yükü varsayılan davranıştan yararlanmaktadır. Varsayılan davranışla, akış tablodaki tüm mevcut kayıtları değişiklik veri akışında işlem olarak INSERT ilk kez kaydettiğinde ilk toplu iş işlenir.

Hedef tablonuz zaten belirli bir noktaya kadar uygun değişiklikleri olan tüm kayıtları içeriyorsa, kaynak tablo durumunun olay olarak INSERT işlenmesini önlemek için bir başlangıç sürümü belirtin.

Aşağıdaki örnek söz dizimi, denetim noktasının bozuk olduğu bir akış hatasından kurtarılır. Bu örnekte, aşağıdaki koşulları varsayın:

  1. Değişiklik veri akışı, tablo oluşturma sırasında kaynak tabloda etkinleştirildi.
  2. Hedef aşağı akış tablosu, sürüm 75 dahil olmak üzere en fazla tüm değişiklikleri işlemiştir.
  3. Kaynak tablonun sürüm geçmişi 70 ve üzeri sürümlerde kullanılabilir.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

Bu örnekte, yeni bir denetim noktası konumu da belirtmeniz gerekir.

Önemli

Başlangıç sürümü belirtirseniz, başlangıç sürümü artık tablo geçmişinde yoksa akış yeni bir denetim noktasından başlatılamaz. Delta Lake, geçmiş sürümleri otomatik olarak temizler; bu da belirtilen tüm başlangıç sürümlerinin sonunda silindiği anlamına gelir.

Bkz. Bir tablonun geçmişinin tamamını yeniden oynatmak için değişiklik veri akışını kullanabilir miyim?.

Toplu sorgulardaki değişiklikleri okuma

Toplu sorgu söz dizimini kullanarak belirli bir sürümden başlayarak tüm değişiklikleri okuyabilir veya belirtilen sürüm aralığındaki değişiklikleri okuyabilirsiniz.

Bir sürümü tamsayı ve zaman damgalarını biçiminde yyyy-MM-dd[ HH:mm:ss[.SSS]]bir dize olarak belirtirsiniz.

Başlangıç ve bitiş sürümleri sorgularda kapsayıcıdır. Belirli bir başlangıç sürümünden tablonun en son sürümüne yapılan değişiklikleri okumak için yalnızca başlangıç sürümünü belirtin.

Değişiklik olaylarını kaydeden bir sürümden daha eski bir sürüm veya zaman damgası sağlarsanız (değişiklik veri akışı etkinleştirildiğinde), değişiklik veri akışının etkinleştirilmediğini belirten bir hata oluşur.

Aşağıdaki söz dizimi örneklerinde, toplu okumalarla başlangıç ve bitiş sürümü seçeneklerinin kullanılması gösterilmektedir:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

Not

Varsayılan olarak, bir kullanıcı bir sürümde veya zaman damgasında tablodaki son işlemeyi aşıyorsa hata timestampGreaterThanLatestCommit oluşur. Databricks Runtime 11.3 LTS ve üzeri sürümlerinde, kullanıcı aşağıdaki yapılandırmayı trueolarak ayarlarsa değişiklik veri akışı aralık dışı sürüm durumunu işleyebilir:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

Bir tablodaki son işlemeden daha büyük bir başlangıç sürümü veya bir tablodaki son işlemeden daha yeni bir başlangıç zaman damgası sağlarsanız, önceki yapılandırma etkinleştirildiğinde boş bir okuma sonucu döndürülür.

Bir tablodaki son işlemeden daha büyük bir bitiş sürümü veya tablodaki son işlemeden daha yeni bir bitiş zaman damgası sağlarsanız, önceki yapılandırma toplu okuma modunda etkinleştirildiğinde, başlangıç sürümü ile son işleme arasındaki tüm değişiklikler döndürülür.

Değişiklik veri akışının şeması nedir?

Bir tablonun değişiklik veri akışından okuduğunuzda, en son tablo sürümünün şeması kullanılır.

Not

Çoğu şema değişikliği ve evrim işlemi tam olarak desteklenir. Sütun eşlemesi etkinleştirilmiş tablo tüm kullanım örneklerini desteklemez ve farklı davranış gösterir. Bkz . Sütun eşlemesi etkin tablolar için veri akışı sınırlamalarını değiştirme.

Değişiklik veri akışı, Delta tablosunun şemasındaki veri sütunlarına ek olarak değişiklik olayının türünü tanımlayan meta veri sütunlarını içerir:

Sütun adı Tür Değerler
_change_type String insert, update_preimage , update_postimage, delete (1)
_commit_version Uzun Değişikliği içeren Delta günlüğü veya tablo sürümü.
_commit_timestamp Zaman damgası İşleme oluşturulduğunda ilişkili zaman damgası.

(1) preimage güncelleştirmeden önceki değerdir, postimage güncelleştirmeden sonraki değerdir.

Not

Şemada bu eklenen sütunlarla aynı adlara sahip sütunlar varsa tablodaki değişiklik veri akışını etkinleştiremezsiniz. Değişiklik veri akışını etkinleştirmeye çalışmadan önce bu çakışmayı çözmek için tablodaki sütunları yeniden adlandırın.

Değişiklik veri akışını etkinleştirme

Yalnızca etkin tablolar için değişiklik veri akışını okuyabilirsiniz. Aşağıdaki yöntemlerden birini kullanarak veri akışını değiştir seçeneğini açıkça etkinleştirmeniz gerekir:

  • Yeni tablo: komutunda CREATE TABLE table özelliğini delta.enableChangeDataFeed = true ayarlayın.

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Varolan tablo: komutunda ALTER TABLE table özelliğini delta.enableChangeDataFeed = true ayarlayın.

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • Tüm yeni tablolar:

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

Önemli

Yalnızca değişiklik veri akışını etkinleştirdikten sonra yapılan değişiklikler kaydedilir. Tablodaki geçmiş değişiklikler yakalanmaz.

Veri depolama alanını değiştirme

Değişiklik veri akışının etkinleştirilmesi, bir tablonun depolama maliyetlerinde küçük bir artışa neden olur. Değişiklik veri kayıtları sorgu çalıştırılırken oluşturulur ve genellikle yeniden yazılan dosyaların toplam boyutundan çok daha küçüktür.

Azure Databricks kayıtları tablo dizininin altındaki klasördeki _change_data , DELETEve MERGE işlemleri için UPDATEdeğişiklik verilerini değiştirir. Azure Databricks değişiklik veri akışını doğrudan işlem günlüğünden verimli bir şekilde hesaplayabildiği için, yalnızca ekleme işlemleri ve tam bölüm silme işlemleri gibi bazı işlemler dizinde _change_data veri oluşturmaz.

Klasördeki veri dosyalarına _change_data yönelik tüm okuma işlemleri desteklenen Delta Lake API'lerinden geçmelidir.

Klasördeki _change_data dosyalar tablonun bekletme ilkesini izler. Değişiklik veri akışı verileri, komut çalıştırıldığında VACUUM silinir.

Bir tablonun geçmişinin tamamını yeniden oynatmak için değişiklik veri akışını kullanabilir miyim?

Değişiklik veri akışı, bir tablodaki tüm değişikliklerin kalıcı bir kaydı olarak hizmet vermek üzere tasarlanmamıştır. Değişiklik veri akışı yalnızca etkinleştirildikten sonra gerçekleşen değişiklikleri kaydeder.

Veri akışını değiştirme ve Delta Lake, kaynak tablonun tam anlık görüntüsünü her zaman yeniden oluşturmanıza olanak sağlar; başka bir deyişle, değişiklik veri akışının etkinleştirildiği bir tabloda yeni bir akış okuması başlatabilir ve bu tablonun geçerli sürümünü ve sonrasında gerçekleşen tüm değişiklikleri yakalayabilirsiniz.

Değişiklik veri akışındaki kayıtları geçici ve yalnızca belirli bir bekletme penceresi için erişilebilir olarak değerlendirmelisiniz. Delta işlem günlüğü, tablo sürümlerini ve buna karşılık gelen değişiklik veri akışı sürümlerini düzenli aralıklarla kaldırır. Bir sürüm işlem günlüğünden kaldırıldığında, bu sürüm için değişiklik veri akışını artık okuyasınız.

Kullanım örneğiniz bir tablodaki tüm değişikliklerin kalıcı geçmişinin korunmasını gerektiriyorsa, değişiklik veri akışından yeni bir tabloya kayıt yazmak için artımlı mantık kullanmanız gerekir. Aşağıdaki kod örneğinde, Yapılandırılmış Akış'ın artımlı işlenmesinden yararlanan ancak kullanılabilir verileri toplu iş yükü olarak işleyen kullanımı trigger.AvailableNowgösterilmektedir. Denetim veya tam yeniden yürütülebilirlik amacıyla değişiklik veri akışının yedeğini oluşturmak için bu iş yükünü ana işleme işlem hatlarınızla zaman uyumsuz olarak zamanlayabilirsiniz.

Python

(spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

Sütun eşlemesi etkin tablolar için veri akışı sınırlamalarını değiştirme

Delta tablosunda sütun eşlemesi etkinleştirildiğinde, var olan veriler için veri dosyalarını yeniden yazmadan tablodaki sütunları bırakabilir veya yeniden adlandırabilirsiniz. Sütun eşleme etkinleştirildiğinde, sütun yeniden adlandırma veya bırakma, veri türünü değiştirme veya null atanabilirlik değişiklikleri gibi eklemesiz şema değişiklikleri gerçekleştirdikten sonra değişiklik veri akışında sınırlamalar vardır.

Önemli

  • Toplu iş semantiği kullanılarak eklemesiz şema değişikliğinin gerçekleştiği bir işlem veya aralık için değişiklik veri akışını okuyamazsınız.
  • Databricks Runtime 12.2 LTS ve altında, sütun eşlemesi etkin olan ve eklemesiz şema değişiklikleriyle karşılaşan tablolar, değişiklik veri akışında akış okumalarını desteklemez. Bkz. Sütun eşleme ve şema değişiklikleriyle akış yapma.
  • Databricks Runtime 11.3 LTS ve altında, sütun eşlemesi etkin olan ve sütun yeniden adlandırma veya bırakma işlemiyle karşılaşmış olan tabloların değişiklik veri akışını okuyamazsınız.

Databricks Runtime 12.2 LTS ve üzeri sürümlerde, sütun eşlemesi etkin olan ve eklemesiz şema değişiklikleriyle karşılaşan tablolar için değişiklik veri akışında toplu okuma gerçekleştirebilirsiniz. Okuma işlemleri, tablonun en son sürümünün şemasını kullanmak yerine sorguda belirtilen tablonun son sürümünün şemasını kullanır. Belirtilen sürüm aralığı eklemeli olmayan bir şema değişikliğine yayılsa da sorgular başarısız olur.