Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Este artigo descreve como mover tabelas de streaming e visões materializadas entre pipelines. Após a mudança, o pipeline para o qual você move o fluxo atualiza a tabela, em vez do pipeline original. Isso é útil em muitos cenários, incluindo:
- Divida um gasoduto grande em outros menores.
- Mescle vários pipelines em um único maior.
- Altere a frequência de atualização de algumas tabelas em um pipeline.
- Mova tabelas de um pipeline que usa o modo de publicação herdado para o modo de publicação padrão. Para obter detalhes sobre o modo de publicação herdado, consulte Modo de publicação herdado para pipelines. Para ver como você pode migrar o modo de publicação para um pipeline inteiro de uma só vez, consulte Habilitar o modo de publicação padrão em um pipeline.
- Mova tabelas entre pipelines em espaços de trabalho diferentes.
Requerimentos
A seguir estão os requisitos para movimentar uma tabela entre diferentes pipelines.
Você deve usar o Databricks Runtime 16.3 ou superior ao executar o comando
ALTER ..., e o Databricks Runtime 17.2 para mover tabelas entre espaços de trabalho.Os pipelines de origem e de destino devem ser:
- De propriedade da conta de usuário do Azure Databricks ou da entidade de serviço que executa a operação
- Em espaços de trabalho que compartilham um metastore. Para verificar o metastore, consulte a função
current_metastore.
O pipeline de destino deve utilizar o modo padrão de publicação. Isso permite publicar tabelas em vários catálogos e esquemas.
Como alternativa, ambos os pipelines devem usar o modo de publicação herdado e ambos devem ter o mesmo catálogo e valor de destino nas configurações. Para obter informações sobre o modo de publicação herdado, consulte Esquema LIVE (legado).
Observação
Esse recurso não oferece suporte à movimentação de um pipeline usando o modo de publicação padrão para um pipeline usando o modo de publicação herdado.
Mover uma tabela entre pipelines
As instruções a seguir descrevem como mover uma tabela de streaming ou uma exibição materializada de um pipeline para outro.
Pare a execução do pipeline de origem se ele estiver em execução. Espere até que pare completamente.
Remova a definição da tabela do código do pipeline de origem e armazene-a em algum lugar para referência futura.
Inclua quaisquer consultas de suporte ou código necessários para que o pipeline seja executado corretamente.
Em um bloco de anotações ou um editor SQL, execute o seguinte comando SQL para reatribuir a tabela do pipeline de origem ao pipeline de destino:
ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name> SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");Observe que o comando SQL deve ser executado a partir do espaço de trabalho do pipeline de origem.
O comando usa
ALTER MATERIALIZED VIEWeALTER STREAMING TABLEpara as visualizações materializadas geridas e tabelas de streaming do Unity Catalog, respetivamente. Para executar a mesma ação em uma tabela de metastore do Hive, useALTER TABLE.Por exemplo, se quiser mover uma tabela de streaming nomeada
salespara um pipeline com a IDabcd1234-ef56-ab78-cd90-1234efab5678, execute o seguinte comando:ALTER STREAMING TABLE sales SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");Observação
O
pipelineIddeve ser um identificador de pipeline válido. Onullvalor não é permitido.Adicione a definição da tabela ao código do pipeline de destino.
Observação
Se o esquema de catálogo ou destino diferir entre a origem e o destino, copiar a consulta exatamente pode não funcionar. As tabelas parcialmente qualificadas na definição podem ser resolvidas de forma diferente. Pode ser necessário atualizar a definição durante a transição para especificar completamente os nomes das tabelas.
Observação
Remova ou comente qualquer fluxo de acréscimo único (em Python, consultas com append_flow(once=True), em SQL, consultas com INSERT INTO ONCE) do código do pipeline do destino. Para obter mais detalhes, consulte Limitações.
A mudança está concluída. Pode agora executar tanto os pipelines de origem como de destino. O pipeline de destino atualiza a tabela.
Solução de problemas
Tabela a seguir descreve erros que podem acontecer ao mover uma tabela entre as pipelines.
| Erro | Description |
|---|---|
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE |
O pipeline de origem está no modo de publicação padrão e o destino usa o modo de esquema LIVE (legado). Isso não é suportado. Se a origem usa o modo de publicação padrão, o destino também deve. |
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE |
Somente a movimentação de tabelas entre pipelines é suportada. Não há suporte para mover tabelas de streaming e exibições materializadas criadas com Databricks SQL. |
DESTINATION_PIPELINE_NOT_FOUND |
O pipelines.pipelineId deve ser um pipeline válido. O pipelineId não pode ser nulo. |
| A tabela não é atualizada no destino após a mudança. | Para atenuar rapidamente nesse caso, mova a tabela de volta para o pipeline de origem seguindo as mesmas instruções. |
PIPELINE_PERMISSION_DENIED_NOT_OWNER |
Os pipelines de origem e de destino devem pertencer ao usuário que executa a operação de movimentação. |
TABLE_ALREADY_EXISTS |
A tabela listada na mensagem de erro já existe. Isso pode acontecer se já existir uma tabela de suporte para o pipeline. Neste caso, DROP a tabela listada no erro. |
Exemplo com várias tabelas num pipeline
Os pipelines podem conter mais de uma tabela. Você ainda pode mover uma tabela de cada vez entre pipelines. Nesse cenário, há três tabelas (table_a, table_b, table_c) que leem umas das outras sequencialmente no pipeline de origem. Queremos mover uma tabela, table_b, para outro pipeline.
Código inicial de pipeline de origem:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dp.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Transferimos table_b para outro pipeline, copiando e removendo a definição de tabela da origem e atualizando o pipelineId de table_b.
Primeiro, pause todos os agendamentos e aguarde a conclusão das atualizações nos pipelines de origem e alvo. Em seguida, modifique o pipeline de origem para remover o código da tabela que está sendo movida. O código de exemplo da pipeline de origem atualizado passa a ser:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dp.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dp.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Vá para o editor SQL para executar o ALTER pipelineId comando.
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
Em seguida, vá para o pipeline de destino e adicione a definição de table_b. Se o catálogo e o esquema padrão forem os mesmos nas configurações de pipeline, nenhuma alteração de código será necessária.
O código do pipeline de destino:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
Se o catálogo e o esquema padrão diferirem nas configurações do pipeline, você deverá adicionar o nome totalmente qualificado usando o catálogo e o esquema do pipeline.
Por exemplo, o código do pipeline de destino pode ser:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
Executar (ou ative novamente programações) para os pipelines tanto de origem quanto de destino.
Os gasodutos estão agora desarticulados. A consulta para table_c lê de table_b (agora no pipeline de destino) e table_b lê de table_a (no pipeline de origem). Quando se faz uma execução acionada no pipeline de origem, table_b não é atualizado porque não é mais gerido pelo pipeline de origem. O pipeline de origem trata table_b como uma tabela externa ao pipeline. Isso é comparável à definição de uma leitura de exibição materializada de uma tabela Delta no Unity Catalog que não é gerenciada pelo pipeline.
Limitações
As seguintes são as limitações para mover tabelas entre diferentes pipelines.
- Não há suporte para exibições materializadas e tabelas de streaming criadas com Databricks SQL.
- Os fluxos append once - fluxos Python append_flow(once=True) e fluxos SQL INSERT INTO ONCE - não são suportados. Seu status de execução não é preservado, eles podem ser executados novamente no pipeline de destino. Remova ou comente o append once flows do pipeline de destino para evitar a execução desses fluxos novamente.
- Não há suporte para tabelas ou exibições privadas.
- As origens e destinos devem ser pipelines. Não há suporte para pipelines nulos.
- Os pipelines de origem e de destino devem estar no mesmo espaço de trabalho ou em espaços de trabalho diferentes que compartilham o mesmo metastore.
- Os pipelines de origem e de destino devem pertencer ao usuário que executa a operação de transferência.
- Se o pipeline de origem usar o modo de publicação padrão, o pipeline de destino também deverá estar usando o modo de publicação padrão. Não é possível mover uma tabela de um pipeline usando o modo de publicação padrão para um pipeline que usa o esquema LIVE (legado). Ver LIVE schema (legacy).
- Se os pipelines de origem e destino estiverem a usar o esquema LIVE (legado), então devem ter os mesmos valores de
catalogetargetnas definições.