Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Durumda saklanan verileri etkili bir şekilde yönetmek için, kümelemeler, katılmalar ve yinelenenleri kaldırma dahil olmak üzere Lakeflow Spark Bildirimli İşlem Hatları'nda durum bilgili akış işlemesi gerçekleştirirken filigranları kullanın. Bu makalede, işlem hattı sorgularınızda filigranların nasıl kullanılacağı açıklanır ve önerilen işlemlerin örnekleri yer alır.
Uyarı
Toplamalar gerçekleştiren sorguların artımlı olarak işlenmesini ve her güncellemede tam olarak tekrar hesaplanmamasını sağlamak için su izi kullanmanız gerekir.
Filigran nedir?
Akış işlemede filigran , toplamalar gibi durum bilgisi olan işlemleri gerçekleştirirken verileri işlemek için zamana dayalı eşik tanımlayabilen bir Apache Spark özelliğidir. Gelen veriler eşiğe ulaşılana kadar işlenir ve bu noktada eşik tarafından tanımlanan zaman penceresi kapatılır. Filigranlar, daha büyük veri kümelerini veya uzun süre çalışan işlemleri işlerken sorgu işleme sırasında sorun yaşamamak için kullanılabilir. Bu sorunlar, işleme sırasında durumunda tutulan veri miktarı nedeniyle sonuç üretiminde yüksek gecikme ve hatta bellek yetersizliği hatalarını içerebilir. Akış verileri doğası gereği sıralanmamış olduğundan, filigranlar zaman penceresi toplamaları gibi işlemlerin doğru hesaplanmasına da destek olur.
Akış işlemede filigranları kullanma hakkında daha fazla bilgi edinmek için bkz . Apache Spark Yapılandırılmış Akış'ta Filigran oluşturma ve Veri işleme eşiklerini denetlemek için filigranları uygulama.
Filigranı nasıl tanımlarsınız?
Bir zaman damgası alanı ve geç verilerin ulaşması için zaman eşiğini temsil eden bir değer belirterek filigran tanımlarsınız. Veriler, tanımlanan zaman eşiğinden sonra ulaşırsa geç kabul edilir. Örneğin, eşik 10 dakika olarak tanımlanırsa, 10 dakikalık eşikten sonra gelen kayıtlar bırakılabilir.
Tanımlanan eşikten sonra gelen kayıtlar bırakılabileceğinden, gecikme süresi ile doğruluk gereksinimlerinizi karşılayan bir eşik seçmek önemlidir. Daha küçük bir eşik seçmek kayıtların daha erken gönderilmesine neden olur, ancak geç kayıtların bırakılma olasılığının daha yüksek olduğu anlamına gelir. Daha büyük bir eşik, verilerin daha uzun beklemesi ancak büyük olasılıkla daha eksiksiz olması anlamına gelir. Büyük durum boyutu nedeniyle, daha büyük bir eşik ek bilgi işlem kaynakları da gerektirebilir. Eşik değeri verilerinize ve işleme gereksinimlerinize bağlı olduğundan, en uygun eşiği belirlemek için işlemenizin test edilmesi ve izlenmesi önemlidir.
Python'da bir filigran tanımlamak için withWatermark() işlevini kullanırsınız. SQL'de filigran tanımlamak için yan tümcesini WATERMARK kullanın:
Piton
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Filigranları stream-stream birleşimlerinde kullanma
Akış akışı birleşimleri için birleştirmenin her iki tarafında bir filigran ve bir zaman aralığı yan tümcesi tanımlamanız gerekir. Her birleştirme kaynağı verilerin tamamlanmamış bir görünümüne sahip olduğundan, akış motoruna başka eşleşme yapılamadığında bunu bildirmek için zaman aralığı koşulu gereklidir. Zaman aralığı yan tümcesi, filigranları tanımlamak için kullanılan aynı alanları kullanmalıdır.
Her akışın filigranlar için farklı eşik değerleri gerektirdiği zamanlar olabileceğinden, akışların aynı eşik değerlerine sahip olması zorunlu değildir. Veri kaybını önlemek için akış motoru, en yavaş akışı temel alan bir genel filigran tutar.
Aşağıdaki örnek, bir reklam gösterim akışı ile reklamlara kullanıcı tıklama akışını birleştirir. Bu örnekte, gösterimden sonra 3 dakika içinde bir tıklama gerçekleşmelidir. 3 dakikalık zaman aralığı geçtikten sonra, artık eşleştirilemeyecek durumdaki satırlar düşürülür.
Piton
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Filigranlarla pencereli toplamalar gerçekleştirme
Akış verileri üzerinde durum bilgisi olan yaygın bir işlem, pencereli toplama işlemidir. Pencereli toplamalar, gruplandırılmış toplamalara benzer, ancak tanımlı pencerenin parçası olan satır kümesi için toplama değerleri döndürülür.
Bir pencere belirli bir uzunluk olarak tanımlanabilir ve bu pencerenin parçası olan tüm satırlarda toplama işlemi gerçekleştirilebilir. Spark Streaming üç pencere türünü destekler:
- Atlayan (sabit) pencereler: Sabit boyutlu, çakışmayan ve bitişik zaman aralıkları serisi. Giriş kaydı yalnızca tek bir pencereye ait.
- Kayan pencereler: Yuvarlanan pencerelere benzer şekilde, kayan pencereler sabit boyutlu olur, ancak pencereler çakışabilir ve bir kayıt birden çok pencereye düşebilir.
Veriler pencerenin sonuna ek olarak filigranın uzunluğuna ulaştığında, pencere için yeni veri kabul edilmez, toplamanın sonucu yayılır ve pencerenin durumu bırakılır.
Aşağıdaki örnek, sabit bir pencere kullanarak her 5 dakikada bir gösterimlerin toplamını hesaplar. Bu örnekte, select yan tümcesi impressions_window takma adını kullanır ve ardından pencerenin kendisi GROUP BY yan tümcesinin bir parçası olarak tanımlanır. Pencere, bu örnekte clickTimestamp sütunu olan, filigranla aynı zaman damgası sütununu temel almalıdır.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Python'da saatlik sabit pencerelerde kar hesaplamaya benzer bir örnek:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Yinelenen akış kayıtlarını kaldırma
Yapılandırılmış Akış, tam olarak bir kez işleme garantisine sahiptir ancak veri kaynaklarından kayıtların otomatik olarak yinelenenlerini kaldırmaz. Örneğin, birçok ileti kuyruğunun en az bir kez garantisi olduğundan, bu ileti kuyruklarından birinden okurken yinelenen kayıtlar beklenmelidir.
dropDuplicatesWithinWatermark() fonksiyonunu kullanarak belirtilen herhangi bir alanda kayıtları tekrarlardan arındırabilirsiniz; bazı alanlar farklı olsa da (örneğin olay zamanı veya varış saati gibi) bir akıştan yinelenenleri kaldırabilirsiniz.
dropDuplicatesWithinWatermark() işlevini kullanmak için bir filigran belirtmeniz gerekir. Su işareti tarafından belirtilen zaman aralığı içinde gelen tüm yinelenen veriler iptal edilir.
Sıralı veriler önemlidir çünkü sırasız veriler filigran değerinin yanlış bir şekilde ileri atlamasına neden olur. Daha sonra, daha eski veriler geldiğinde geç kalmış olarak kabul edilir ve atılır. Filigra üzerindeki zaman damgasına göre ilk anlık görüntüyü sırayla işlemek için withEventTimeOrder seçeneğini kullanın. Seçenek withEventTimeOrder, veri kümesini tanımlayan kodda veya işlem hattı ayarlarında spark.databricks.delta.withEventTimeOrder.enabled kullanılarak bildirilebilir. Örneğin:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Uyarı
Bu withEventTimeOrder seçenek yalnızca Python ile desteklenir.
Aşağıdaki örnekte, veriler clickTimestamp tarafından sıralı bir şekilde işlenir ve birbirinin 5 saniye içinde varan tekrarlayan userId ve clickAdId sütunlarını içeren kayıtlar kaldırılır.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Durumlu işleme için işlem hattı yapılandırmasını optimize etme
Databricks, üretim sorunlarını ve aşırı gecikme süresini önlemeye yardımcı olmak için, özellikle işlemeniz büyük miktarda ara durumdan tasarruf edilmesini gerektiriyorsa durum bilgisi olan akış işlemeniz için RocksDB tabanlı durum yönetiminin etkinleştirilmesini önerir.
Sunucusuz işlem hatları, durum deposu yapılandırmalarını otomatik olarak yönetir.
İşlem hattını dağıtmadan önce aşağıdaki yapılandırmayı ayarlayarak RocksDB tabanlı durum yönetimini etkinleştirebilirsiniz:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
RocksDB yapılandırma önerileri de dahil olmak üzere RocksDB durum deposu hakkında daha fazla bilgi edinmek için bkz . Azure Databricks'te RocksDB durum depounu yapılandırma.