Aracılığıyla paylaş


Tabloları işlem hatları arasında taşıyın

Bu makalede akış tablolarının ve depolanmış görünümlerin farklı işlem hatları arasında nasıl taşındığı açıklanır. Taşımadan sonra akışı taşıdığınız işlem hattı, özgün işlem hattı yerine tabloyu güncelleştirir. Bu, aşağıdakiler dahil olmak üzere birçok senaryoda kullanışlıdır:

  • Büyük bir işlem hattını daha küçük işlem hatlarına bölün.
  • Birden çok işlem hattını tek bir daha büyük işlem hattında birleştirin.
  • İşlem hattındaki bazı tabloların yenileme sıklığını değiştirin.
  • Tabloları eski yayımlama modunu kullanan bir işlem hattından varsayılan yayımlama moduna taşıyın. Eski yayımlama modu hakkında ayrıntılı bilgi için bkz. İşlem hatları için eski yayımlama modu. Bir işlem hattının tamamında yayınlama modunu tek seferde nasıl geçirebileceğinizi görmek için bkz: Bir işlem hattının varsayılan yayınlama modunu etkinleştirme.
  • Farklı çalışma alanlarındaki pipeline'lar arasında tabloları taşıyın.

Gereksinimler

Aşağıda, bir tabloyu boruhattı arasında taşımak için gerekenler belirtilmiştir.

  • Komutu çalıştırırken Databricks Runtime 16.3 veya daha üstünü, çalışma alanları arasında tablo taşımak için ise Databricks Runtime 17.2’yi kullanmanız gerekir.

  • Hem kaynak hem de hedef işlem hatları şu şekilde olmalıdır:

    • İşlemi çalıştıran Azure Databricks kullanıcı hesabına veya hizmet sorumlusuna ait
    • Meta veri depolarını paylaşan çalışma alanlarında. Meta veri deposunu denetlemek için bkz current_metastore işlevi.
  • Hedef işlem hattı varsayılan yayımlama modunu kullanmalıdır. Bu, tabloları birden çok katalogda ve şemada yayımlamanıza olanak tanır.

    Alternatif olarak, her iki işlem hattının da eski yayımlama modunu kullanması ve her ikisinin de ayarlarda aynı katalog ve hedef değere sahip olması gerekir. Eski yayımlama modu hakkında bilgi için bkz. LIVE şeması (eski).

Uyarı

Bu özellik, varsayılan yayımlama modunu kullanan bir işlem hattının eski yayımlama modunu kullanan bir işlem hattına taşınmasını desteklemez.

Tabloyu işlem hatları arasında taşıma

Aşağıdaki yönergelerde akış tablosunun veya gerçekleştirilmiş görünümün bir işlem hattından diğerine nasıl taşındığı açıklanmaktadır.

  1. Çalışıyorsa kaynak boru hattını durdurun. Tamamen durmasını bekleyin.

  2. Kaynak işlem hattının kodundan tablonun tanımını kaldırın ve gelecekte başvurmak üzere bir yerde depolayın.

    İşlem hattının doğru çalışması için gereken destekleyici sorguları veya kodları ekleyin.

  3. Bir not defterinden veya SQL düzenleyicisinden aşağıdaki SQL komutunu çalıştırarak tabloyu kaynak işlem hattından hedef işlem hattına yeniden atayın:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    SQL komutunun kaynak işlem hattının çalışma alanından çalıştırılması gerektiğini unutmayın.

    Komut, sırasıyla Unity Kataloğu tarafından yönetilen gerçekleştirilmiş görünümler için ALTER MATERIALIZED VIEW ve akış tabloları için ALTER STREAMING TABLE kullanır. Bir Hive meta veri deposu tablosunda aynı eylemi gerçekleştirmek için kullanın ALTER TABLE.

    Örneğin, adlı sales bir akış tablosunu kimliğine abcd1234-ef56-ab78-cd90-1234efab5678sahip bir işlem hattına taşımak istiyorsanız aşağıdaki komutu çalıştırabilirsiniz:

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    Uyarı

    geçerli pipelineId bir işlem hattı tanımlayıcısı olmalıdır. Değer null için izin verilmez.

  4. Tablonun tanımını hedef işlem hattının koduna ekleyin.

    Uyarı

    Katalog veya hedef şema kaynak ve hedef arasında farklıysa, sorguyu tam olarak kopyalama çalışmayabilir. Tanımdaki kısmen nitelenmiş tablolar farklı şekilde çözümlenebilir. Tablo adlarını tam olarak nitelerken tanımı güncelleştirmeniz gerekebilir.

    Uyarı

    Hedef işlem hattının kodundan herhangi bir ekleme akışını (Python'da, append_flow(once=True) içeren sorgular, SQL'de INTO ONCE ileINSERT yapılan sorgular) kaldırın veya açıklama satırı oluşturun. Daha fazla ayrıntı için bkz. Sınırlamalar.

Taşıma tamamlandı. Artık hem kaynak hem de hedef işlem hatlarını çalıştırabilirsiniz. Hedef boru hattı tabloyu günceller.

Sorun giderme

Aşağıdaki tabloda, bir tablo işlem hatları arasında taşınırken meydana gelebilecek hatalar açıklanmaktadır.

Hata Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE Kaynak işlem hattı varsayılan yayımlama modundadır ve hedef LIVE şema (eski) modunu kullanır. Bu desteklenmez. Kaynak varsayılan yayımlama modunu kullanıyorsa hedef de kullanmalıdır.
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE Yalnızca tabloların boru hatları arasında taşınması desteklenir. Databricks SQL ile oluşturulan akış tablolarının ve gerçekleştirilmiş görünümlerin taşınması desteklenmez.
DESTINATION_PIPELINE_NOT_FOUND pipelines.pipelineId geçerli bir işlem hattı olmalıdır. pipelineId null olamaz.
Taşıma sonrasında tablo hedefte güncelleştirilemiyor. Bu durumda hızla azaltmak için, aynı yönergeleri izleyerek tabloyu kaynak işlem hattına geri taşıyın.
PIPELINE_PERMISSION_DENIED_NOT_OWNER Hem kaynak hem de hedef işlem hatlarının taşıma işlemini gerçekleştiren kullanıcıya ait olması gerekir.
TABLE_ALREADY_EXISTS Hata iletisinde listelenen tablo zaten var. İşlem hattı için bir yedekleme tablosu zaten varsa bu durum oluşabilir. Bu durumda, DROP hatada listelenen tablo.

İşlem hattında birden çok tablo içeren örnek

İşlem hatları birden fazla tablo içerebilir. İşlem hatları arasında aynı anda bir tablo taşımaya devam edebilirsiniz. Bu senaryoda, kaynak işlem hattında birbirinden sırayla okuyan üç tablo (table_a, table_b, table_c) vardır. Bir tablo olan table_böğesini başka bir işlem hattına taşımak istiyoruz.

İlk kaynak işlem hattı kodu:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Kaynak table_b'dan tablo tanımını kopyalayıp kaldırarak ve table_b'in pipelineId'sini güncelleyerek başka bir işlem hattına geçiririz.

İlk olarak, tüm zamanlamaları duraklatın ve hem kaynak hem de hedef işlem hatlarında güncelleştirmelerin tamamlanmasını bekleyin. Ardından, taşınan tablonun kodunu kaldırmak için kaynak işlem hattını değiştirin. Güncelleştirilmiş kaynak işlem hattı örnek kodu şöyle olur:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

ALTER pipelineId komutunu çalıştırmak için SQL düzenleyicisine gidin.

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Ardından, hedef işlem hattına gidin ve table_b tanımını ekleyin. varsayılan katalog ve şema işlem hattı ayarlarında aynıysa kod değişikliği gerekmez.

Hedef boru hattı kodu:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

Varsayılan katalog ve şema işlem hattı ayarlarında farklıysa, işlem hattının kataloğunu ve şemasını kullanarak tam adı eklemeniz gerekir.

Örneğin, hedef işlem hattı kodu şu olabilir:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

Hem kaynak hem de hedef işlem hatları için zamanlamaları çalıştırın (veya yeniden etkinleştirin).

İşlem hatları artık kopuk. Sorgu table_c 'den table_b okunur (şimdi hedef işlem hattında) ve table_b 'den table_a okunur (kaynak işlem hattında). Kaynak işlem hattında tetiklenen bir yürütme yaptığınızda, artık kaynak işlem hattı table_b tarafından yönetilmediği için güncelleştirilmez. Kaynak işlem hattı table_b 'ü işlem hattı dışında bir tablo olarak değerlendirir. Bu, bir işlem hattı tarafından yönetilmeyen ve Unity Kataloğu'nda bulunan bir Delta tablosundan veri okuyan gerçekleştirilmiş bir görünümü tanımlamaya benzer.

Sınırlama

Tabloları işlem hatları arasında taşımak için sınırlamalar aşağıdadır.

  • Databricks SQL ile oluşturulan gerçekleştirilmiş görünümler ve akış tabloları desteklenmez.
  • Bir kez ekleme akışı - Python append_flow(once=True) akışları ve SQL INSERT INTO ONCE akışları desteklenmez. Çalışma durumları korunmaz; bu yüzden, hedef işlem hattında yeniden çalıştırılma ihtimalleri yüksektir. Tekrar çalıştırılmasını önlemek için hedef işlem hattındaki "append once flows" akışlarını kaldırın veya bunlara açıklama ekleyin.
  • Özel tablolar veya görünümler desteklenmez.
  • Kaynak ve hedef işlem hatları birer işlem hattı olmalıdır. Null işlem hatları desteklenmez.
  • Kaynak ve hedef işlem hatları ya aynı çalışma alanında ya da aynı meta depoyu paylaşan farklı çalışma alanlarında bulunmalıdır.
  • Hem kaynak hem de hedef işlem hatları taşıma işlemini çalıştıran kullanıcıya ait olmalıdır.
  • Kaynak işlem hattı varsayılan yayımlama modunu kullanıyorsa, hedef işlem hattının da varsayılan yayımlama modunu kullanması gerekir. Varsayılan yayımlama modunu kullanarak bir tabloyu, LIVE şemasını (eski) kullanan bir işlem hattına taşıyamazsınız. LIVE şeması (eski) sayfasına bakın.
  • Kaynak ve hedef işlem hatlarının her ikisi de LIVE şemasını (eski) kullanıyorsa, ayarlarda aynı catalog ve target değerlere sahip olmaları gerekir.