Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Değişiklik veri akışı, Azure Databricks'in Delta tablosunun sürümleri arasındaki satır düzeyi değişiklikleri izlemesine olanak tanır. Delta tablosunda etkinleştirildiğinde, çalışma zamanı kayıtları tabloya yazılan tüm veriler için olayları değiştirir. Bu, satır verilerinin yanı sıra belirtilen satırın eklendiğini, silindiğini veya güncelleştirildiğini belirten meta verileri içerir.
Değişiklik veri akışını kullanarak aşağıdaki yaygın veri kullanım örneklerini güçlendirebilirsiniz:
- ETL işlem hatları: Yalnızca son işlem hattı çalıştırmadan sonra değişen satırları artımlı olarak işler.
- Denetim izleri: Uyumluluk ve idare gereksinimleri için veri değişikliklerini izleyin.
- Veri çoğaltma: Aşağı akış tablolarında, önbelleklerde veya dış sistemlerde yapılan değişiklikleri eşitleyin.
Önemli
Değişiklik veri akışı, değişiklik bilgilerini sağlamak için tablo geçmişiyle birlikte çalışır. Delta tablosunun kopyalanması ayrı bir geçmiş oluşturduğundan, kopyalanan tablolardaki değişiklik veri akışı özgün tablonunkiyle eşleşmiyor.
Değişiklik veri akışını etkinleştirme
Değişiklik veri akışı, okumak istediğiniz tablolarda açıkça etkinleştirilmelidir. Aşağıdaki yöntemlerden birini kullanın.
Yeni tablo
komutunda delta.enableChangeDataFeed = true table özelliğini CREATE TABLE ayarlayın.
CREATE TABLE student (id INT, name STRING, age INT)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
Varolan tablo
komutunda delta.enableChangeDataFeed = true table özelliğini ALTER TABLE ayarlayın.
ALTER TABLE myDeltaTable
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
Oturumdaki tüm yeni tablolar
Oturumda oluşturulan tüm yeni tablolar için değişiklik veri akışını etkinleştirmek için bir Spark yapılandırması ayarlayın.
SET spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
Önemli
Yalnızca değişiklik veri akışını etkinleştirdikten sonra yapılan değişiklikler kaydedilir. Tablodaki geçmiş değişiklikler yakalanmaz.
Veri akışı şemasını değiştirme
Bir tablonun değişiklik veri akışından okuduğunuzda, en son tablo sürümünün şeması kullanılır. Azure Databricks çoğu şema değişikliği ve evrim işlemlerini tam olarak destekler, ancak sütun eşlemesi etkinleştirilmiş tablolarda sınırlamalar vardır. Bkz . Sütun eşlemeli tablolar için veri akışı sınırlamalarını değiştirme.
Değişiklik veri akışı, Delta tablosunun şemasındaki veri sütunlarına ek olarak değişiklik olayının türünü tanımlayan meta veri sütunlarını içerir:
| Sütun adı | Tür | Değerler |
|---|---|---|
_change_type |
Dize |
insert, update_preimage , update_postimage, delete(1) |
_commit_version |
Uzun | Değişikliği içeren Delta günlüğü veya tablo sürümü. |
_commit_timestamp |
Zaman damgası | İşlemin oluşturulduğu zamana ilişkin zaman damgası. |
(1)preimage güncelleştirmeden önceki değerdir, postimage güncelleştirmeden sonraki değerdir.
Şemada bu meta veri sütunlarıyla aynı adlara sahip sütunlar varsa tablodaki değişiklik veri akışını etkinleştiremezsiniz. Değişiklik veri akışını etkinleştirmeden önce bu çakışmayı çözmek için tablonuzdaki sütunları yeniden adlandırın.
Değişiklik verilerini artımlı olarak işleme
Databricks, Delta tablolarındaki değişiklikleri artımlı olarak işlemek için değişiklik veri akışını Yapılandırılmış Akış ile birlikte kullanılmasını önerir. Tablonuzun değişiklik veri akışı sürümlerini otomatik olarak izlemek için Azure Databricks için Yapılandırılmış Akış'ı kullanmanız gerekir. SCD tür 1 veya tür 2 tabloları ile CDC işlemesi için bkz. AUTO CDC API'leri: İşlem hatlarıyla değişiklik veri yakalamayı basitleştirme.
Aşağıdaki söz dizimi örneğinde gösterildiği gibi değişiklik veri akışını okumak için bir tabloya karşı akış yapılandırırken seçeneğini readChangeFeedtrue ayarlayın:
Piton
(spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
)
Scala programlama dili
spark.readStream
.option("readChangeFeed", "true")
.table("myDeltaTable")
Varsayılan davranış
Akış ilk kez başlatıldığında, tablonun en son anlık görüntüsünü `INSERT` kayıtları olarak döndürür ve ardından gelecekteki değişiklikleri değişiklik verileri olarak döndürür. Değişiklik verileri, Delta Lake işleminin parçası olarak taahhüt edilir ve yeni veriler tabloya taahhüt edildiğinde aynı anda kullanılabilir hale gelir.
Ek seçenekler
İsteğe bağlı olarak bir başlangıç sürümü belirtebilir (bkz . Başlangıç sürümü belirtme) veya toplu yürütme kullanabilirsiniz (bkz. Toplu iş sorgularında değişiklikleri okuma). Azure Databricks ayrıca hız sınırlarını (maxFilesPerTrigger, maxBytesPerTrigger) ve excludeRegex değişiklik verilerini okurken de destekler. Delta Lake akış seçeneklerinin tam listesi için bkz. Delta Lake.
Başlangıç anlık görüntüsü dışındaki sürümler için hız sınırlama, işlemelerin tamamına atomik olarak uygulanır; tüm işleme geçerli toplu işlemeye dahil edilir veya sonraki toplu işlemeye ertelenir.
Başlangıç sürümü belirtme
Belirli bir noktadan gelen değişiklikleri okumak için, zaman damgası veya sürüm numarası kullanarak bir başlangıç sürümü belirtin. Toplu okumalar için başlangıç sürümleri gereklidir. İsteğe bağlı olarak aralığı sınırlamak için bir bitiş sürümü belirtebilirsiniz. Delta Lake tablo geçmişi hakkında daha fazla bilgi edinmek için bkz. Zaman yolculuğu nedir?.
Değişiklik veri akışı içeren Yapılandırılmış Akış iş yüklerini yapılandırırken, başlangıç sürümü belirtmenin işlemeyi nasıl etkilediğini anlayın:
- Yeni veri işleme işlem hatları genellikle, akış ilk kez başlatıldığında tablodaki tüm mevcut kayıtları işlem olarak
INSERTkaydeden varsayılan davranıştan yararlanır. - Hedef tablonuz zaten belirli bir noktaya kadar uygun değişiklikleri olan tüm kayıtları içeriyorsa, kaynak tablo durumunun olay olarak
INSERTişlenmesini önlemek için bir başlangıç sürümü belirtin.
Aşağıdaki örnek, denetim noktasının bozuk olduğu bir akış hatasından kurtarmaya yönelik söz dizimini gösterir. Bu örnekte, aşağıdaki koşulları varsayın:
- Değişiklik veri akışı, tablo oluşturma sırasında kaynak tabloda etkinleştirildi.
- Hedef aşağı akış tablosu, sürüm 75 dahil olmak üzere en fazla tüm değişiklikleri işlemiştir.
- Kaynak tablonun sürüm geçmişi 70 ve üzeri sürümlerde kullanılabilir.
Piton
(spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
)
Scala programlama dili
spark.readStream
.option("readChangeFeed", "true")
.option("startingVersion", 76)
.table("source_table")
Bu örnekte, yeni bir denetim noktası konumu da belirtmeniz gerekir.
Önemli
Başlangıç sürümü belirtirseniz, başlangıç sürümü artık tablo geçmişinde yoksa akış yeni bir denetim noktasından başlatılamaz. Delta Lake, geçmiş sürümleri otomatik olarak temizler; bu da belirtilen tüm başlangıç sürümlerinin sonunda silindiği anlamına gelir.
Bkz. Tablo geçmişini yeniden yürütme.
Toplu sorgulardaki değişiklikleri okuma
Toplu sorgu söz dizimini kullanarak belirli bir sürümden başlayarak tüm değişiklikleri okuyabilir veya belirtilen sürüm aralığındaki değişiklikleri okuyabilirsiniz.
- Sürümleri tamsayılar ve zaman damgaları olarak biçiminde
yyyy-MM-dd[ HH:mm:ss[.SSS]]dizeler olarak belirtin. - Başlangıç ve bitiş sürümleri dahildir.
- Başlangıç sürümünden en son sürüme kadar okumak için yalnızca başlangıç sürümünü belirtin.
- Değişiklik veri akışı etkinleştirilmeden önce bir sürüm belirtilmesi hataya neden olur.
Aşağıdaki söz dizimi örneklerinde, toplu okumalarla başlangıç ve bitiş sürümü seçeneklerinin kullanılması gösterilmektedir:
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name,
-- with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
Piton
# version as ints or longs
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
Scala programlama dili
// version as ints or longs
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
Aralık dışı sürümleri işleme
Varsayılan olarak, son işlemeyi aşan bir sürüm veya zaman damgası belirtilmesi hatasını timestampGreaterThanLatestCommitoluşturur. Databricks Runtime 11.3 LTS ve üzeri sürümlerde, aralık dışı sürümler için toleransı etkinleştirebilirsiniz:
SET spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
Bu ayar etkinken:
- Son değişikliğin ötesinde başlangıç sürümü/zaman damgası: Boş sonuç döner.
- Son işlemenin ötesindeki bitiş sürümü/zaman damgası: Baştan son işlemeye kadar tüm değişiklikleri döndürür.
Veri değişikliklerini kaydetme
Delta Lake, veri değişikliklerini verimli bir şekilde kaydeder ve depolama gösterimini iyileştirmek için diğer Delta Lake özelliklerini kullanabilir.
Depolama ile ilgili dikkat edilmesi gerekenler
- Depolama maliyetleri: Değişiklikler ayrı dosyalara kaydedilebileceğinden değişiklik veri akışının etkinleştirilmesi depolama maliyetlerinde küçük bir artışa neden olabilir.
- Değişiklik dosyası olmayan işlemler: Bazı işlemler (yalnızca ekleme, tam bölüm silme işlemleri) değişiklik veri dosyaları oluşturmaz; Azure Databricks değişiklik veri akışını doğrudan işlem günlüğünden hesaplar.
-
Bekletme: Veri dosyalarını değiştirme, tablonun bekletme ilkesine uyar. Komut
VACUUMbunları siler ve işlem günlüğündeki değişiklikler denetim noktası saklamayı izler.
Değişiklik verisi dosyalarını doğrudan sorgulayarak değişiklik veri akışını yeniden yapılandırmayı denemeyin. Delta Lake API'lerini her zaman kullanın.
Tablo geçmişini yeniden yürütme
Değişiklik veri akışı, bir tablodaki tüm değişikliklerin kalıcı bir kaydı olarak hizmet vermek üzere tasarlanmamıştır. Yalnızca etkinleştirildikten sonra gerçekleşen değişiklikleri kaydeder ve geçerli sürümü ve sonraki tüm değişiklikleri yakalamak için yeni bir akış okuması başlatabilirsiniz.
Değişiklik veri akışındaki kayıtlar geçicidir ve yalnızca belirli bir bekletme penceresi için erişilebilir. Delta Lake işlem günlüğü, tablo sürümlerini ve ilgili değişiklik veri akışı sürümlerini düzenli aralıklarla kaldırır. Bir sürüm kaldırıldığında, bu sürüm için değişiklik veri akışını artık okuyasınız.
Kalıcı geçmiş için değişiklik verilerini arşivle
Kullanım örneğiniz bir tablodaki tüm değişikliklerin kalıcı geçmişinin korunmasını gerektiriyorsa, değişiklik veri akışındaki kayıtları yeni bir tabloya yazmak için artımlı mantık kullanın. Aşağıdaki örnek, trigger.AvailableNow kullanarak denetim veya tam yeniden yürütülebilirlik için kullanılabilir verileri toplu bir çalışma yükü olarak işlemenin nasıl yapıldığını gösterir.
Piton
(spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("target_table")
)
Scala programlama dili
spark.readStream
.option("readChangeFeed", "true")
.table("source_table")
.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("target_table")
Sütun eşlemeli tablolar için veri akışı sınırlamalarını değiştirme
Delta tablosunda sütun eşleme etkinleştirildiğinde, veri dosyalarını yeniden yazmadan sütunları bırakabilir veya yeniden adlandırabilirsiniz. Ancak, sütunları yeniden adlandırma veya bırakma, veri türlerini değiştirme veya null atanabilirlik değişiklikleri gibi eklemesiz şema değişikliklerinden sonra değişiklik veri akışında sınırlamalar vardır:
- Toplu iş semantiği: Eklemesiz şema değişikliğinin gerçekleştiği bir işlem veya aralık için değişiklik veri akışını okuyamazsınız.
- Databricks Runtime 12.2 LTS ve öncesi: Sütun eşlemesi etkin olan ve ekleme yapmayan şema değişiklikleri yaşayan tablolar, değişiklik veri akışında veri akışı okumalarını desteklemez. Bkz. Sütun eşleme ve akış.
- Databricks Runtime 11.3 LTS ve üzeri: Sütun eşlemesi etkin olan ve sütun yeniden adlandırma veya bırakma işlemiyle karşılaşmış olan tabloların değişiklik veri akışını okuyamazsınız.
Databricks Runtime 12.2 LTS ve üzeri sürümlerde, sütun eşlemesi etkin olan ve eklemesiz şema değişiklikleriyle karşılaşan tablolar için değişiklik veri akışında toplu okuma gerçekleştirebilirsiniz. Okuma işlemleri, en son tablo sürümü yerine sorguda belirtilen son sürümün şemasını kullanır. Sürüm aralığı eklemeli olmayan bir şema değişikliğine yayılsa bile sorgular başarısız olur.