CREATE STREAMING TABLE (işlem hatları)

Akış tablosu, akış veya artımlı veri işleme desteğine sahip bir tablodur. Akış tabloları işlem hatları tarafından desteklenir. Akış tablosu her yenilendiğinde, kaynak tablolara eklenen veriler akış tablosuna eklenir. Akış tablolarını el ile veya bir zamanlamaya göre yenileyebilirsiniz.

Yenilemeleri gerçekleştirme veya zamanlama hakkında daha fazla bilgi edinmek için bkz. İşlem hattı güncelleştirmesini çalıştırma.

Sözdizimi

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ {flow_clause | AS query} ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ column_constraint ] [, ...]
    [ , table_constraint ] [...] )

   column_properties
      { NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
  { USING DELTA
    PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    LOCATION path |
    COMMENT view_comment |
    TBLPROPERTIES clause |
    WITH { ROW FILTER clause } } [ ... ]
   } [ ... ]

flow_clause
  FLOW { { INSERT [ONCE] BY NAME query } |
  { AUTO CDC auto_cdc_flow_spec } }

Parametreler

  • REFRESH

    Belirtilirse, tabloyu oluşturur veya var olan bir tabloyu ve içeriğini güncelleştirir.

  • ÖZEL

    Özel bir akış tablosu oluşturur.

    • Bunlar kataloğa eklenmez ve yalnızca tanımlama işlem hattı içinde erişilebilir
    • Katalogdaki mevcut bir nesneyle aynı ada sahip olabilirler. İşlem hattı içinde, bir özel akış tablosu ve katalogdaki bir nesne aynı ada sahipse, ada yapılan başvurular özel akış tablosuna çözümlenir.
    • Özel akış tabloları yalnızca bir güncelleştirmede değil, işlem hattının tüm ömrü boyunca kalıcı hale gelir.

    Özel akış tabloları daha önce TEMPORARY parametresi kullanılarak oluşturulmuştur.

  • table_name

    Yeni oluşturulan tablonun adı. Tam nitelikli tablo adı benzersiz olmalıdır.

  • tablo_özellikleri

    Bu isteğe bağlı yan tümcesi sütunların listesini, türlerini, özelliklerini, açıklamalarını ve sütun kısıtlamalarını tanımlar.

  • tablo_kısıtlaması

    Şema belirtirken birincil ve yabancı anahtarlar tanımlayabilirsiniz. Kısıtlamalar bilgilendirme amaçlıdır ve uygulanmaz. SQL dil referansındaki CONSTRAINT maddesi'yi inceleyin.

    Uyarı

    Tablo kısıtlamalarını tanımlamak için işlem hattınızın Unity Kataloğu özellikli bir işlem hattı olması gerekir.

  • tablo koşulları

    İsteğe bağlı olarak tablo için bölümleme, açıklamalar ve kullanıcı tanımlı özellikler belirtin. Her alt yan tümce yalnızca bir kez belirtilebilir.

    • DELTA KULLANMA

      Veri biçimini belirtir. Tek seçenek DELTA'dır.

      Bu yan tümce isteğe bağlıdır ve varsayılan olarak DELTA'ya ayarlanmıştır.

    • BÖLÜMLERE AYRILDI

      Tablodaki bölümleme için kullanılacak bir veya daha fazla sütunun isteğe bağlı listesi. CLUSTER BY ile birbirini dışlar.

      Sıvı kümeleme, kümeleme için esnek, iyileştirilmiş bir çözüm sağlar. İşlem hatları için CLUSTER BY yerine PARTITIONED BY kullanmayı göz önünde bulundurun.

    • CLUSTER BY

      Tabloda sıvı kümelemeye olanak tanıyın ve kümeleme anahtarları olarak kullanılacak sütunları tanımlayın. ile CLUSTER BY AUTOotomatik sıvı kümelemeyi kullanın ve Databricks sorgu performansını iyileştirmek için kümeleme anahtarlarını akıllı bir şekilde seçer. PARTITIONED BY ile birbirini dışlar.

      Bkz Tablolar için sıvı kümeleme kullanma.

    • YER

      Tablo verileri için isteğe bağlı bir depolama konumu. Ayarlanmadığı takdirde, sistem otomatik olarak varsayılan olarak işlem hattı depolama konumunu kullanır.

    • YORUM

      Tabloyu açıklamak için isteğe bağlı STRING bir sabit.

    • TBLPROPERTIES

      Tablo için isteğe bağlı tablo özellikleri listesi.

    • İLE ROW FILTER

    Tabloya bir satır filtresi işlevi ekler. Bu tablo için gelecekteki sorgular, işlevin TRUE olarak değerlendirildiği satırların bir alt kümesini alır. Bu, işlevin belirli satırları filtreleyip filtrelememeye karar vermek için çağıran kullanıcının kimlik ve grup üyeliklerini incelemesine izin verdiğinden, ayrıntılı erişim denetimi için kullanışlıdır.

    Bkz: ROW FILTER yan tümcesi.

    • AKIŞI

      İsteğe bağlı olarak tablo oluşturma ile satır içi bir akış tanımlar. Akış, tablonun içeriğini yenileyen durum bilgisi olan bir sorgudur. Belirtilmezse FLOW , bunun yerine kullanabilir AS query veya ile CREATE FLOWakışları ayrı olarak tanımlayabilirsiniz. Aşağıdaki akış türlerinden birini belirtebilirsiniz:

      • INSERT AD GÖRE

        Tabloya sütun adına göre veri ekler. Seçenek sağlanmazsa ONCE , sorgu bir akış sorgusu olmalıdır. Akış semantiğini kullanarak kaynaktan okumak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir.

        Uyarı

        FLOW INSERT BY NAME , kullanmakla AS queryeşdeğerdir. Aşağıdaki iki deyim aynı davranışa sahiptir:

        CREATE OR REFRESH STREAMING TABLE raw_data
        AS SELECT * FROM STREAM read_files('abfss://my_path');
        
        CREATE OR REFRESH STREAMING TABLE raw_data
        FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');
        
      • BİR DEFA

        İsteğe bağlı olarak, akışı bir kerelik akış olarak tanımlar( örneğin, bir geri doldurma). Sağlandığında ONCE , sorgu bir akış sorgusu değildir ve akış varsayılan olarak bir kez çalışır. Tablo tam yenileme ile yenilenirse akış ONCE yeniden çalıştırarak verileri yeniden oluşturur. ONCE yalnızca akışlar için INSERT BY NAME geçerlidir.

      • AUTO CDC

        Önemli

        Databricks Runtime 17.3 ve üzeri ile Pipelines kanalında PREVIEW kullanılabilir.

        AUTO CDC Kaynaktan tabloya veri yakalama (CDC) değişiklik kayıtlarını işleyen bir akış tanımlar. Kaynak veriler CDC semantiği içerdiğinde kullanın AUTO CDC . Bkz AUTO CDC API'leri: İşlem hatlarıyla değişiklik verilerini yakalamayı basitleştirin.

  • AS sorgusu

    Bu yan tümce, queryverilerini kullanarak tabloyu doldurur. Bu sorgu bir akış sorgusu olmalıdır. Kaynaktan okumak üzere akış semantiğini kullanmak için STREAM anahtar sözcüğünü kullanın. Okuma işlemi var olan bir kayıtta bir değişiklik veya silme işlemiyle karşılaşırsa bir hata oluşur. Statik veya yalnızca ekleme kaynaklarından okumak en güvenlidir. Değişiklik işlemeleri olan verileri almak için, hataları işlemek için okuma seçeneğini ekleyebilirsiniz SkipChangeCommits .

    Birlikte bir query ve bir table_specification belirttiğinizde, table_specification içinde belirtilen tablo şeması querytarafından döndürülen tüm sütunları içermelidir, aksi takdirde bir hata alırsınız. table_specification belirtilen ancak query tarafından döndürülmeyen tüm sütunlar sorgulandığında null değerleri döndürür.

    Akış verileri hakkında daha fazla bilgi için bkz. İşlem hatları ile veri dönüştürme.

    • Okuma Seçenekleri

      Verilerin kaynaktan nasıl okunacağını yapılandırmak için sorguda okuma seçeneklerini belirtebilirsiniz. Örneğin, kaynak verilerdeki değişiklik işlemelerini atlamayı belirtebilirsiniz skipChangeCommits . Okuma seçenekleri, sorgunun yan tümcesinde WITH harita olarak belirtilir. Örneğin:

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS=TRUE, STARTINGVERSION=X)
      

      =TRUE isteğe bağlıdır, bu nedenle aşağıdakine benzer bir boole seçeneği de belirtebilirsiniz:

      SELECT * FROM STREAM source_table WITH (SKIPCHANGECOMMITS)
      

      Uyarı

      Okuma seçenekleri yalnızca Databricks Runtime 17.3 ve üzeri için desteklenir.

      Delta için aşağıdaki okuma seçenekleri desteklenir. Her seçenekle ilgili ayrıntılar için bkz . Delta tablosu akış okuma ve yazma işlemleri.

      • maxFilesPerTrigger
      • maxBytesPerTrigger
      • startingVersion
      • startingTimestamp
      • readChangeFeed
      • withEventTimeOrder
      • skipChangeCommits

Gerekli izinler

İşlem hattı için çalıştırma kullanıcısının aşağıdaki izinlere sahip olması gerekmektedir.

  • SELECT ayrıcalığı, akış tablosunun atıfta bulunduğu temel tablolar üzerindedir.
  • Üst katalogda USE CATALOG ayrıcalığı ve üst şemadaki USE SCHEMA ayrıcalığı.
  • CREATE MATERIALIZED VIEW ayrıcalığı, akış tablosu için şema üzerinde yetkidir.

Bir kullanıcının akış tablosunun içinde tanımlandığı işlem hattını güncelleştirebilmesi için şunlar gerekir:

  • Üst katalogda USE CATALOG ayrıcalığı ve üst şemadaki USE SCHEMA ayrıcalığı.
  • Akış tablosunun veya REFRESH ayrıcalığının sahipliği.
  • Akış tablosunun sahibi, akış tablosunun başvurduğu temel tablolar üzerinde SELECT ayrıcalığına sahip olmalıdır.

Kullanıcının sonuçta elde edilen akış tablosunu sorgulayabilmesi için şunlar gerekir:

  • Üst katalogda USE CATALOG ayrıcalığı ve üst şemadaki USE SCHEMA ayrıcalığı.
  • SELECT akış tablosu üzerinde ayrıcalığa sahip olun.

Sınırlamalar

  • En son verileri almak için yalnızca tablo sahipleri akış tablolarını yenileyebilir.
  • akış tablolarında ALTER TABLE komutlara izin verilmez. Tablonun tanımı ve özellikleri CREATE OR REFRESH veya ALTER STREAMING TABLE deyimiyle değiştirilmelidir.
  • tablo şemasını INSERT INTOve MERGE gibi DML komutları aracılığıyla geliştirme desteklenmez.
  • Akış tablolarında aşağıdaki komutlar desteklenmez:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Tabloyu yeniden adlandırmak veya sahibini değiştirmek desteklenmemektedir.
  • Oluşturulan sütunlar, kimlik sütunları ve varsayılan sütunlar desteklenmez.

Örnekler

-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;

-- Define a streaming table with an inline append flow:
CREATE OR REFRESH STREAMING TABLE raw_data
FLOW INSERT BY NAME SELECT * FROM STREAM read_files('abfss://my_path');

-- Define a streaming table with an inline AUTO CDC flow:
CREATE OR REFRESH STREAMING TABLE target
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;