Toplu işleme REPLACE WHERE akışlarıyla

Important

DEĞİŞTİR WHERE akışları Beta aşamasındadır.

Bu sayfada, tablo geçmişinizin tamamını yeniden işlemeden tablonun hedeflenen bir alt kümesini yeniden derlemek ve üzerine yazmak için Lakeflow Spark Bildirimli İşlem Hatlarında DEĞİşTİr WHERE akışlarının nasıl kullanılacağı açıklanır. DEĞİŞTİR WHERE akışları geç gelen verileri, üst akışta yeniden işlemeyi, şema evrimini ve geriye dönük veri tamamlama işlemlerini yönetir.

REPLACE WHERE akışında, hedef tabloda bir koşul tanımlarsınız. Koşulla eşleşen tüm satırlar silinir ve aynı koşul aralığı için kaynak sorgu yeniden değerlendirilerek değiştirilir. Predikatla eşleşmeyen satırlar olduğu gibi bırakılır.

Requirements

REPLACE WHERE akışları aşağıdaki gereksinimlere sahiptir:

  • İşlem hattınız PREVIEW kanalını kullanmalıdır.
  • Databricks Unity Kataloğu ve sunucusuz işlem önerir. Artımlı yenileme yalnızca sunucusuz işlemde desteklenir.

REPLACE WHERE akışları ne zaman kullanılır?

Aşağıdaki senaryolar için REPLACE WHERE akışlarını kullanın:

  • Akış semantiği olmadan artımlı toplu işleme: Filigranlar gibi akış kavramlarını yönetmeden yeni satırları toplu olarak işleyin.
  • Seçmeli yeniden işleme: Yalnızca bir koşulla eşleşen satırları yeniden derlerken diğer tüm satırlara dokunulmaz.
  • Standart gerçekleştirilmiş görünüm özelliklerinin ötesindeki senaryolar:
    • Kaynaktan daha uzun saklama süresine sahip tabloları hedefleme
    • Boyut tablosu değiştiğinde yeniden derlemeyi engelleme
    • Tüm geçmişi yeniden derlemeden şema evrimi

Bir REPLACE WHERE akışı oluşturun

SQL veya Python'da REPLACE WHERE akışlarını tanımlayın.

SQL

CREATE STREAMING TABLE ile birlikte FLOW REPLACE WHERE satır içi yan tümcesini kullanın:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Alternatif olarak, uzun biçimli CREATE FLOW söz dizimini kullanın:

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

Python tablo ve akış tek bir deyimde tanımlanır. Akış, tabloyla aynı adı devralır:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

replace_where parametresi pyspark sütun ifadesini veya dize koşulunu kabul eder.

Bu örneklerde, son 7 güne ait tüm satırlar kaynak sorgu kullanılarak silinip orders_enriched yeniden derlenir. Koşulu kaynak sorguya eklemeniz gerekmez. İşlem hattı motoru, kaynaktan okurken bunu otomatik olarak uygular.

Note

BY NAME SQL'de gereklidir. Sütunları konumlarına göre değil, adlarına göre eşleştirir.

Geçmiş verileri geri doldurma

Hedef tabloya zamanlanmış yenilemelerin dışında geçmiş veya düzeltilmiş satırlar yazmak için, geçmiş verilerin bulunduğu yere bağlı olarak iki mekanizma arasından seçim yapın:

  • Koşul geçersiz kılmaları: Akışın kaynak sorgusunu tek seferlik bir koşul aralığı için yeniden çalıştırın. Geçmiş veriler artımlı veriyle aynı kaynaktan geldiğinde kullanın.
  • DML deyimleri: Akışı atlayarak hedef tabloya doğrudan ekleyin. Geçmiş veriler artımlı verilerden farklı bir kaynakta olduğunda kullanın.

Koşul geçersiz kılmaları

İşlem hattı tanımını değiştirmeden, tek bir işlem hattı güncellemesinde REPLACE WHERE yüklemini geçersiz kılın. Koşul geçersiz kılmaları tek seferliktir, yalnızca geçerli güncelleştirmeye uygulanır ve gelecekteki çalıştırmaları etkilemez.

Örnek: İlk geçmiş veri yüklemesi

İşlem hattını ilk kez ayarlarken geçmiş verilerin bir kerelik bir yedeklemini gerçekleştirmek için:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Örnek: Belirli bir dönem için sütunu düzeltme

Sütun tanımını güncelleştirdikten sonra, hedeflenen geçmiş aralığı için değişikliği yedekleyin:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Birden çok boyutu tek bir koşul geçersiz kılmasında birleştirin:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Yardımcı işlevi: start_update_with_replace_where

Koşul geçersiz kılmaları göndermek için not defterinden işlem hattı güncelleştirme API'sini kullanın:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

DML deyimleri

Eski bir tablodan yükleme gibi ilk yüklemeleri veya düzeltmeleri gerçekleştirmek için DML deyimlerini doğrudan hedef tabloda işlem hattı dışından çalıştırın:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

DML aracılığıyla eklenen satırlar DEĞİşTİr WHERE koşuluna tabi değildir ve gelecekteki bir çalıştırmanın koşul aralığında yer almadıkları sürece zamanlanmış yenilemelerde kalıcı olur.

Tam yenileme davranışı

REPLACE WHERE akışının tam yenileme işlemi, kaynak sorguyu yalnızca geçerli yüklemi kullanarak yeniden yürütür. Koşul geçersiz kılmaları veya geçerli koşul aralığının dışındaki DML deyimleri tarafından eklenen satırlar kalıcı olarak silinir.

Warning

Tam yenileme tüm mevcut verileri temizler ve akışı yalnızca tanımlanan koşulu kullanarak yeniden yürütür. Bir işlem hattı 7 günlük bir yüklemle bir yıldır çalışıyorsa, tam yenileme yapıldığında tablo yalnızca son 7 günlük verileri içerir. Tüm eski satırlar kalıcı olarak silinir.

Tabloda tam yenilemeleri önlemek için tablo özelliğini pipelines.reset.allowed olarak falseayarlayın. Bkz İşlem hattı özellikleri referansı.

Kademeli yenileme

REPLACE WHERE akışları, mümkün olduğunda artımlı yenileme kullanır; değiştirme penceresinin tamamını yeniden hesaplamak yerine, yalnızca son yenilemeden bu yana değişen kaynak verilerini yeniden işler. Artımlı yenileme sunucusuz işlem gerektirir.

Artımlı yenileme uygulandığında

Aşağıdakilerin tümü doğru olmalıdır:

  • İşlem hattı sunucusuz işlem üzerinde çalışır.
  • Sorgu şekli desteklenir. Desteklenen işleç kümesi için artımlı yenileme bölümüne bakın.
  • Koşul, kaynak tablodan temel sütunlara başvurur. Toplama ya da pencere işlevi sonuçları gibi türetilmiş değerler üzerindeki koşullar bir veri kaynağına gönderilemez; bu da artımlı yenilemeyi devre dışı bırakır.
  • Mevcut değiştirme penceresi içinde hiçbir satır dış DML tarafından değiştirilmemiştir. Geçerli pencerenin dışındaki satırları değiştiren DML etkilenmez.
  • Geçerli değiştirme penceresi, önceki koşulun dışlandığı satırları içermiyor. Yüklemi daha önce işlenmemiş bir aralığı kapsayacak şekilde genişletirseniz, bu tek yenileme tam yeniden hesaplamaya döner. Sonraki yenilemeler tekrar artımlı yenileme için uygundur.
  • Koşul belirleyicidir. rand() gibi deterministik olmayan işlevler kullanan yüklemler artımlı yenilemeyi devre dışı bırakır. gibi current_date() zamana bağlı işlevlere izin verilir.

Herhangi bir akışın ilk yenilenmesi her zaman tam bir hesaplamadır. Herhangi bir koşul karşılanmazsa, bu yenileme geçerli değiştirme penceresinin tamamen yeniden hesaplanmasına başvurur.

Artımlı yenileme için en iyi yöntemler

REPLACE WHERE akışlarının artımlı yenileme için uygun kalması için bu yönergeleri izleyin.

Hareketli alt sınır kullanma

Hareketli bir alt sınıra sahip yüklemler, süresiz olarak artımlı yenileme için uygun kalır.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

date BETWEEN date_add(current_date(), -7) AND current_date() gibi hareketli bir üst sınır, pencereyi önceden hariç tutulan satırları kapsayacak şekilde kaydırarak tek seferlik bir tam yeniden hesaplamaya geri dönüşü tetikleyebilir.

Yüklem sütununu GROUP BY içine ekleyin.

Toplulaştırma yaparken, işleyicinin koşulu toplulaştırmanın altına itebilmesi için koşul sütununu GROUP BY içine ekleyin.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

koşul sütununda eksikse GROUP BYkoşul toplamanın altına gönderilemez ve kaynak tam olarak taranır.

Birleştirme anahtarlarında koşul sütununu ekleme

İşleme motorunun birleştirilen tüm kaynakları eleyebilmesi için yüklem sütununu birleştirme koşuluna ekleyin.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Birleştirilmiş bir tablo yüklem sütununu görünür kılmıyorsa, bu tablonun tamamı her yenilemede taranır.

Tam yeniden hesaplamaya geri dönüşü tanılama

Bir yenileme tam yeniden hesaplamaya geri düştüğünde, neden akışa yönelik planning_information olayında bildirilir. Bkz. İşlem hattı olay günlüklerini izleme. Aşağıdaki tabloda olayda bildirilen nedenler listelendi:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Harici bir DML, geçerli değiştirme penceresindeki satırları değiştirdi.
REPLACE_WHERE_NOT_DETERMINISTIC Koşul, belirleyici olmayan ifadeler kullanır.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Önceki yenilemede belirleyici olmayan bir koşul kullanılmıştır.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Koşul herhangi bir kaynağa iletilemez, geçerli pencere önceki koşul tarafından işlenmeyen satırları içerir veya çalıştırma bir koşul geçersiz kılmasını kullanır.

Sınırlamalar

REPLACE WHERE akışlarının aşağıdaki sınırlamaları vardır:

  • Hedef tablo, işlem hattı içinde oluşturulmalıdır.
  • Hedef tablo başına yalnızca bir REPLACE WHERE akışına izin verilir.
  • REPLACE WHERE akışı tarafından hedeflenen tablo, AUTO CDC akışı veya ekleme akışı gibi başka bir akış türü tarafından da hedeflenemez.
  • REPLACE WHERE akışlarının hedeflediği tablolarda beklentiler desteklenmez.
  • Databricks SQL'de oluşturulan akış tabloları için, söz dizimi ve geri doldurma farkları hakkında bağımsız akış tabloları için REPLACE WHERE akışları bölümüne bakın.

Examples

Aşağıdaki örneklerde yaygın REPLACE WHERE akış desenleri gösterilmektedir.

Örnek 1: Saklama süresi sınırlı olan bir kaynaktan geçmiş toplu verileri koruyun

Bu örnek, ham veriler kaynak tablodaki saklama süresi dolup tablodan çıksa bile (3 günlük saklama süresi), günlük toplamları süresiz olarak korur:

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Örnek 2: Boyut tablosu değiştiğinde yeniden derlemeyi engelleme

Bu örnek, boyut öznitelikleri değiştiğinde geçmiş olgu satırlarını değiştirmeden tutar:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Kullanıcının bölgesi değişirse, yalnızca son satırlar yeniden derlenir. Geçmiş satırlar, oluşturuldukları anda geçerli olan bölge değerini korur. Geçmiş satırları düzeltmek için yüklem geçersiz kılmalarını kullanarak hedefli bir geri doldurma çalıştırın.

Örnek 3: Tüm geçmişi yeniden derlemeden yeni ölçüm ekleme

Bu örnek, bir tablo tanımının nasıl geliştirileceğini ve yalnızca hedeflenen aralığın nasıl yedeklendiğini gösterir:

  1. İlk tabloyu tanımlayın:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Sorguyu uniq_users eklemek için güncelleştirin:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Son 30 gün için yeni ölçümü yedekleyin:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    Geriye dönük doldurulan aralıktan daha eski satırlar, uniq_users için NULL içerir.

Örnek 4: Geçmiş verilerin tamamını geriye dönük doldurmadan önce küçük bir pencere üzerinde yineleme yapın

Bu örnekte, tüm geçmiş aralığını işlemeden önce küçük bir veri penceresinde sorgu mantığının nasıl doğrulanması gösterilmektedir.

Sorguyu revize ederken her yenilemenin yalnızca son 7 günü yeniden hesaplaması için kısa bir pencereyle başlayın:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

Sorgu sonlandırıldıktan sonra, bir kerelik geçmişe dönük doldurma gerçekleştirmek için koşul geçersiz kılmayı kullanın:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)