Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Important
DEĞİŞTİR WHERE akışları Beta aşamasındadır.
Bu sayfada, tablo geçmişinizin tamamını yeniden işlemeden tablonun hedeflenen bir alt kümesini yeniden derlemek ve üzerine yazmak için Lakeflow Spark Bildirimli İşlem Hatlarında DEĞİşTİr WHERE akışlarının nasıl kullanılacağı açıklanır. DEĞİŞTİR WHERE akışları geç gelen verileri, üst akışta yeniden işlemeyi, şema evrimini ve geriye dönük veri tamamlama işlemlerini yönetir.
REPLACE WHERE akışında, hedef tabloda bir koşul tanımlarsınız. Koşulla eşleşen tüm satırlar silinir ve aynı koşul aralığı için kaynak sorgu yeniden değerlendirilerek değiştirilir. Predikatla eşleşmeyen satırlar olduğu gibi bırakılır.
Requirements
REPLACE WHERE akışları aşağıdaki gereksinimlere sahiptir:
- İşlem hattınız
PREVIEWkanalını kullanmalıdır. - Databricks Unity Kataloğu ve sunucusuz işlem önerir. Artımlı yenileme yalnızca sunucusuz işlemde desteklenir.
REPLACE WHERE akışları ne zaman kullanılır?
Aşağıdaki senaryolar için REPLACE WHERE akışlarını kullanın:
- Akış semantiği olmadan artımlı toplu işleme: Filigranlar gibi akış kavramlarını yönetmeden yeni satırları toplu olarak işleyin.
- Seçmeli yeniden işleme: Yalnızca bir koşulla eşleşen satırları yeniden derlerken diğer tüm satırlara dokunulmaz.
-
Standart gerçekleştirilmiş görünüm özelliklerinin ötesindeki senaryolar:
- Kaynaktan daha uzun saklama süresine sahip tabloları hedefleme
- Boyut tablosu değiştiğinde yeniden derlemeyi engelleme
- Tüm geçmişi yeniden derlemeden şema evrimi
Bir REPLACE WHERE akışı oluşturun
SQL veya Python'da REPLACE WHERE akışlarını tanımlayın.
SQL
CREATE STREAMING TABLE ile birlikte FLOW REPLACE WHERE satır içi yan tümcesini kullanın:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Alternatif olarak, uzun biçimli CREATE FLOW söz dizimini kullanın:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
Python tablo ve akış tek bir deyimde tanımlanır. Akış, tabloyla aynı adı devralır:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
replace_where parametresi pyspark sütun ifadesini veya dize koşulunu kabul eder.
Bu örneklerde, son 7 güne ait tüm satırlar kaynak sorgu kullanılarak silinip orders_enriched yeniden derlenir. Koşulu kaynak sorguya eklemeniz gerekmez. İşlem hattı motoru, kaynaktan okurken bunu otomatik olarak uygular.
Note
BY NAME SQL'de gereklidir. Sütunları konumlarına göre değil, adlarına göre eşleştirir.
Geçmiş verileri geri doldurma
Hedef tabloya zamanlanmış yenilemelerin dışında geçmiş veya düzeltilmiş satırlar yazmak için, geçmiş verilerin bulunduğu yere bağlı olarak iki mekanizma arasından seçim yapın:
- Koşul geçersiz kılmaları: Akışın kaynak sorgusunu tek seferlik bir koşul aralığı için yeniden çalıştırın. Geçmiş veriler artımlı veriyle aynı kaynaktan geldiğinde kullanın.
- DML deyimleri: Akışı atlayarak hedef tabloya doğrudan ekleyin. Geçmiş veriler artımlı verilerden farklı bir kaynakta olduğunda kullanın.
Koşul geçersiz kılmaları
İşlem hattı tanımını değiştirmeden, tek bir işlem hattı güncellemesinde REPLACE WHERE yüklemini geçersiz kılın. Koşul geçersiz kılmaları tek seferliktir, yalnızca geçerli güncelleştirmeye uygulanır ve gelecekteki çalıştırmaları etkilemez.
Örnek: İlk geçmiş veri yüklemesi
İşlem hattını ilk kez ayarlarken geçmiş verilerin bir kerelik bir yedeklemini gerçekleştirmek için:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Örnek: Belirli bir dönem için sütunu düzeltme
Sütun tanımını güncelleştirdikten sonra, hedeflenen geçmiş aralığı için değişikliği yedekleyin:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Birden çok boyutu tek bir koşul geçersiz kılmasında birleştirin:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Yardımcı işlevi: start_update_with_replace_where
Koşul geçersiz kılmaları göndermek için not defterinden işlem hattı güncelleştirme API'sini kullanın:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DML deyimleri
Eski bir tablodan yükleme gibi ilk yüklemeleri veya düzeltmeleri gerçekleştirmek için DML deyimlerini doğrudan hedef tabloda işlem hattı dışından çalıştırın:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
DML aracılığıyla eklenen satırlar DEĞİşTİr WHERE koşuluna tabi değildir ve gelecekteki bir çalıştırmanın koşul aralığında yer almadıkları sürece zamanlanmış yenilemelerde kalıcı olur.
Tam yenileme davranışı
REPLACE WHERE akışının tam yenileme işlemi, kaynak sorguyu yalnızca geçerli yüklemi kullanarak yeniden yürütür. Koşul geçersiz kılmaları veya geçerli koşul aralığının dışındaki DML deyimleri tarafından eklenen satırlar kalıcı olarak silinir.
Warning
Tam yenileme tüm mevcut verileri temizler ve akışı yalnızca tanımlanan koşulu kullanarak yeniden yürütür. Bir işlem hattı 7 günlük bir yüklemle bir yıldır çalışıyorsa, tam yenileme yapıldığında tablo yalnızca son 7 günlük verileri içerir. Tüm eski satırlar kalıcı olarak silinir.
Tabloda tam yenilemeleri önlemek için tablo özelliğini pipelines.reset.allowed olarak falseayarlayın. Bkz İşlem hattı özellikleri referansı.
Kademeli yenileme
REPLACE WHERE akışları, mümkün olduğunda artımlı yenileme kullanır; değiştirme penceresinin tamamını yeniden hesaplamak yerine, yalnızca son yenilemeden bu yana değişen kaynak verilerini yeniden işler. Artımlı yenileme sunucusuz işlem gerektirir.
Artımlı yenileme uygulandığında
Aşağıdakilerin tümü doğru olmalıdır:
- İşlem hattı sunucusuz işlem üzerinde çalışır.
- Sorgu şekli desteklenir. Desteklenen işleç kümesi için artımlı yenileme bölümüne bakın.
- Koşul, kaynak tablodan temel sütunlara başvurur. Toplama ya da pencere işlevi sonuçları gibi türetilmiş değerler üzerindeki koşullar bir veri kaynağına gönderilemez; bu da artımlı yenilemeyi devre dışı bırakır.
- Mevcut değiştirme penceresi içinde hiçbir satır dış DML tarafından değiştirilmemiştir. Geçerli pencerenin dışındaki satırları değiştiren DML etkilenmez.
- Geçerli değiştirme penceresi, önceki koşulun dışlandığı satırları içermiyor. Yüklemi daha önce işlenmemiş bir aralığı kapsayacak şekilde genişletirseniz, bu tek yenileme tam yeniden hesaplamaya döner. Sonraki yenilemeler tekrar artımlı yenileme için uygundur.
- Koşul belirleyicidir.
rand()gibi deterministik olmayan işlevler kullanan yüklemler artımlı yenilemeyi devre dışı bırakır. gibicurrent_date()zamana bağlı işlevlere izin verilir.
Herhangi bir akışın ilk yenilenmesi her zaman tam bir hesaplamadır. Herhangi bir koşul karşılanmazsa, bu yenileme geçerli değiştirme penceresinin tamamen yeniden hesaplanmasına başvurur.
Artımlı yenileme için en iyi yöntemler
REPLACE WHERE akışlarının artımlı yenileme için uygun kalması için bu yönergeleri izleyin.
Hareketli alt sınır kullanma
Hareketli bir alt sınıra sahip yüklemler, süresiz olarak artımlı yenileme için uygun kalır.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
date BETWEEN date_add(current_date(), -7) AND current_date() gibi hareketli bir üst sınır, pencereyi önceden hariç tutulan satırları kapsayacak şekilde kaydırarak tek seferlik bir tam yeniden hesaplamaya geri dönüşü tetikleyebilir.
Yüklem sütununu GROUP BY içine ekleyin.
Toplulaştırma yaparken, işleyicinin koşulu toplulaştırmanın altına itebilmesi için koşul sütununu GROUP BY içine ekleyin.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
koşul sütununda eksikse GROUP BYkoşul toplamanın altına gönderilemez ve kaynak tam olarak taranır.
Birleştirme anahtarlarında koşul sütununu ekleme
İşleme motorunun birleştirilen tüm kaynakları eleyebilmesi için yüklem sütununu birleştirme koşuluna ekleyin.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Birleştirilmiş bir tablo yüklem sütununu görünür kılmıyorsa, bu tablonun tamamı her yenilemede taranır.
Tam yeniden hesaplamaya geri dönüşü tanılama
Bir yenileme tam yeniden hesaplamaya geri düştüğünde, neden akışa yönelik planning_information olayında bildirilir. Bkz. İşlem hattı olay günlüklerini izleme. Aşağıdaki tabloda olayda bildirilen nedenler listelendi:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Harici bir DML, geçerli değiştirme penceresindeki satırları değiştirdi. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Koşul, belirleyici olmayan ifadeler kullanır. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Önceki yenilemede belirleyici olmayan bir koşul kullanılmıştır. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Koşul herhangi bir kaynağa iletilemez, geçerli pencere önceki koşul tarafından işlenmeyen satırları içerir veya çalıştırma bir koşul geçersiz kılmasını kullanır. |
Sınırlamalar
REPLACE WHERE akışlarının aşağıdaki sınırlamaları vardır:
- Hedef tablo, işlem hattı içinde oluşturulmalıdır.
- Hedef tablo başına yalnızca bir REPLACE WHERE akışına izin verilir.
- REPLACE WHERE akışı tarafından hedeflenen tablo, AUTO CDC akışı veya ekleme akışı gibi başka bir akış türü tarafından da hedeflenemez.
- REPLACE WHERE akışlarının hedeflediği tablolarda beklentiler desteklenmez.
- Databricks SQL'de oluşturulan akış tabloları için, söz dizimi ve geri doldurma farkları hakkında bağımsız akış tabloları için REPLACE WHERE akışları bölümüne bakın.
Examples
Aşağıdaki örneklerde yaygın REPLACE WHERE akış desenleri gösterilmektedir.
Örnek 1: Saklama süresi sınırlı olan bir kaynaktan geçmiş toplu verileri koruyun
Bu örnek, ham veriler kaynak tablodaki saklama süresi dolup tablodan çıksa bile (3 günlük saklama süresi), günlük toplamları süresiz olarak korur:
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Örnek 2: Boyut tablosu değiştiğinde yeniden derlemeyi engelleme
Bu örnek, boyut öznitelikleri değiştiğinde geçmiş olgu satırlarını değiştirmeden tutar:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Kullanıcının bölgesi değişirse, yalnızca son satırlar yeniden derlenir. Geçmiş satırlar, oluşturuldukları anda geçerli olan bölge değerini korur. Geçmiş satırları düzeltmek için yüklem geçersiz kılmalarını kullanarak hedefli bir geri doldurma çalıştırın.
Örnek 3: Tüm geçmişi yeniden derlemeden yeni ölçüm ekleme
Bu örnek, bir tablo tanımının nasıl geliştirileceğini ve yalnızca hedeflenen aralığın nasıl yedeklendiğini gösterir:
İlk tabloyu tanımlayın:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Sorguyu
uniq_userseklemek için güncelleştirin:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Son 30 gün için yeni ölçümü yedekleyin:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Geriye dönük doldurulan aralıktan daha eski satırlar,
uniq_usersiçinNULLiçerir.
Örnek 4: Geçmiş verilerin tamamını geriye dönük doldurmadan önce küçük bir pencere üzerinde yineleme yapın
Bu örnekte, tüm geçmiş aralığını işlemeden önce küçük bir veri penceresinde sorgu mantığının nasıl doğrulanması gösterilmektedir.
Sorguyu revize ederken her yenilemenin yalnızca son 7 günü yeniden hesaplaması için kısa bir pencereyle başlayın:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Sorgu sonlandırıldıktan sonra, bir kerelik geçmişe dönük doldurma gerçekleştirmek için koşul geçersiz kılmayı kullanın:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)