Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Artikel ini menjelaskan cara memindahkan tabel streaming dan tampilan materialisasi antar alur. Setelah pemindahan, pipeline yang Anda pindahkan akan memperbarui tabel, bukan pipeline yang asli. Ini berguna dalam banyak skenario, termasuk:
- Membagi jalur besar menjadi yang lebih kecil.
- Gabungkan beberapa alur dalam satu alur yang lebih besar.
- Ubah frekuensi refresh untuk beberapa tabel dalam alur.
- Pindahkan tabel dari alur yang menggunakan mode penerbitan warisan ke mode penerbitan default. Untuk detail tentang mode penerbitan lama, lihat Mode penerbitan lama untuk pipeline. Untuk melihat bagaimana Anda dapat memigrasikan mode penerbitan untuk seluruh alur sekaligus, lihat Mengaktifkan mode penerbitan default dalam alur.
- Memindahkan tabel antar pipeline di ruang kerja yang berbeda.
Persyaratan
Berikut ini adalah persyaratan untuk memindahkan tabel antar alur.
Anda harus menggunakan Databricks Runtime 16.3 atau lebih tinggi saat menjalankan
ALTER ...perintah, dan Databricks Runtime 17.2 untuk pemindahan tabel lintas ruang kerja.Alur sumber dan tujuan harus:
- Dimiliki oleh akun pengguna Azure Databricks atau perwakilan layanan yang menjalankan operasi
- Di ruang kerja yang berbagi metastore. Untuk memeriksa metastore, lihat
current_metastorefungsi.
Alur tujuan harus menggunakan mode penerbitan default. Ini memungkinkan Anda menerbitkan tabel ke beberapa katalog dan skema.
Secara bergantian, kedua alur harus menggunakan mode penerbitan warisan dan keduanya harus memiliki katalog dan nilai target yang sama dalam pengaturan. Untuk informasi tentang mode penerbitan lama, lihat Skema LIVE (lama).
Nota
Fitur ini tidak mendukung pemindahan alur menggunakan mode penerbitan default ke alur menggunakan mode penerbitan warisan.
Memindahkan tabel antar alur
Instruksi berikut menjelaskan cara memindahkan tabel streaming atau tampilan materialisasi dari satu alur ke alur lainnya.
Hentikan alur sumber jika sedang berjalan. Tunggu sampai benar-benar berhenti.
Hapus definisi tabel dari kode alur sumber dan simpan di suatu tempat untuk referensi di masa mendatang.
Sertakan kueri atau kode pendukung yang diperlukan agar alur berjalan dengan benar.
Dari notebook atau editor SQL, jalankan perintah SQL berikut untuk menetapkan ulang tabel dari alur sumber ke alur tujuan:
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");Perhatikan bahwa perintah SQL harus dijalankan dari ruang kerja alur sumber.
Perintah menggunakan
ALTER MATERIALIZED VIEWuntuk tampilan materialisasi yang dikelola oleh Unity Catalog danALTER STREAMING TABLEuntuk tabel streaming. Untuk melakukan tindakan yang sama pada tabel metastore Hive, gunakanALTER TABLE.Misalnya, jika Anda ingin memindahkan tabel streaming bernama
saleske alur dengan IDabcd1234-ef56-ab78-cd90-1234efab5678, Anda akan menjalankan perintah berikut:ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");Nota
pipelineIdharus berupa pengidentifikasi alur yang valid. Nilainulltidak diizinkan.Tambahkan definisi tabel ke kode alur tujuan.
Nota
Jika katalog atau skema target berbeda antara sumber dan tujuan, menyalin kueri dengan tepat mungkin tidak berfungsi. Tabel yang memenuhi syarat sebagian dalam definisi dapat diselesaikan secara berbeda. Anda mungkin perlu memperbarui definisi saat berpindah untuk sepenuhnya memenuhi syarat nama tabel.
Nota
Menghapus atau mengomentari alur tambahan sekali (dalam Python, kueri dengan append_flow(once=True), di SQL, kueri dengan INSERT INTO ONCE) dari kode alur tujuan. Untuk detail selengkapnya, lihat Batasan.
Pemindahan selesai. Anda sekarang dapat menjalankan baik jalur sumber maupun tujuan. Alur tujuan memperbarui tabel.
Troubleshooting
Tabel berikut ini menjelaskan kesalahan yang bisa terjadi saat memindahkan tabel antar alur.
| Kesalahan | Description |
|---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
Jalur sumber berada dalam mode publikasi default, dan tujuan menggunakan mode skema LIVE (lama). Ini tidak didukung. Jika sumber menggunakan mode penerbitan default, maka tujuan juga harus. |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
Hanya pemindahan tabel antar proses pipeline yang didukung. Memindahkan tabel streaming dan tampilan materialisasi yang dibuat dengan Databricks SQL tidak didukung. |
DESTINATION_PIPELINE_NOT_FOUND |
pipelines.pipelineId harus berupa alur yang valid.
pipelineId tidak boleh bernilai null. |
| Tabel gagal diperbarui di tujuan setelah pemindahan. | Untuk memitigasi dengan cepat dalam hal ini, pindahkan tabel kembali ke alur sumber dengan mengikuti instruksi yang sama. |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
Alur sumber dan tujuan harus dimiliki oleh pengguna yang melakukan operasi pemindahan. |
TABLE_ALREADY_EXISTS |
Tabel yang tercantum dalam pesan kesalahan sudah ada. Ini dapat terjadi jika tabel pendukung untuk pipeline sudah ada. Dalam hal ini, DROP tabel yang tercantum dalam kesalahan. |
Contoh dengan beberapa tabel dalam alur
Alur dapat berisi lebih dari satu tabel. Anda masih dapat memindahkan satu tabel pada satu waktu antar alur. Dalam skenario ini, ada tiga tabel (table_a, , table_b) table_cyang membaca satu sama lain secara berurutan dalam alur sumber. Kami ingin memindahkan satu tabel, table_b, ke alur lain.
Kode alur sumber awal:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Kami pindah table_b ke alur lain dengan menyalin dan menghapus definisi tabel dari sumber dan memperbarui table_bpipelineId.
Pertama, tunda jadwal apa pun dan tunggu pemutakhiran hingga selesai pada alur sumber dan sasaran. Kemudian ubah alur sumber untuk menghapus kode untuk tabel yang dipindahkan. Kode contoh alur sumber yang diperbarui menjadi:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Buka editor SQL untuk menjalankan ALTER pipelineId perintah.
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
Selanjutnya, buka alur tujuan dan tambahkan definisi table_b. Jika katalog dan skema default sama dalam pengaturan alur, tidak ada perubahan kode yang diperlukan.
Kode jalur target:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
Jika katalog dan skema bawaan berbeda dalam pengaturan alur, Anda harus menambahkan nama lengkap yang memenuhi syarat menggunakan katalog dan skema alur.
Misalnya, kode alur target dapat berupa:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
Jalankan (atau aktifkan kembali jadwal) untuk alur sumber dan target.
Pipeline sekarang terpisah. Kueri untuk table_c dibaca dari table_b (sekarang dalam alur target) dan table_b dibaca dari table_a (dalam alur sumber). Ketika Anda melakukan eksekusi yang dipicu pada pipa saluran sumber table_b, pipa saluran tersebut tidak diperbarui karena tidak lagi dikelola olehnya. Alur sumber memperlakukan table_b sebagai tabel di luar alur. Ini sebanding dengan menentukan pembacaan tampilan materialisasi dari tabel Delta di Katalog Unity yang tidak dikelola oleh alur.
Batasan
Berikut ini adalah batasan untuk memindahkan tabel antar alur.
- Tampilan materialisasi dan tabel streaming yang dibuat dengan Databricks SQL tidak didukung.
- Aliran tambahkan sekali - aliran Python append_flow(once=True) dan aliran SQL INSERT INTO ONCE - tidak didukung. Status pengoperasian mereka tidak dipertahankan, mereka dapat berjalan lagi di pipeline tujuan. Hapus atau beri komentar pada alur 'append once' dari jalur tujuan untuk menghindari menjalankan alur ini lagi.
- Tabel atau tampilan privat tidak didukung.
- Alur sumber dan tujuan harus berupa pipa. Alur null tidak didukung.
- Alur sumber dan tujuan harus berada di ruang kerja yang sama, atau di ruang kerja yang berbeda yang memiliki metastore yang sama.
- Alur sumber dan tujuan harus dimiliki oleh pengguna yang menjalankan operasi pemindahan.
- Jika alur sumber menggunakan mode penerbitan default, alur tujuan juga harus menggunakan mode penerbitan default. Anda tidak dapat memindahkan tabel dari pipa data yang menggunakan mode penerbitan default ke pipa data yang menggunakan skema LIVE (lama). Lihat skema LIVE (versi lama).
- Jika pipa sumber dan tujuan keduanya menggunakan skema LIVE (warisan), maka mereka harus memiliki nilai
catalogdantargetyang sama dalam pengaturan.