Compartilhar via


Mover tabelas entre pipelines

Este artigo descreve como transferir as tabelas de transmissão e visões materializadas entre pipelines. Após a movimentação, o pipeline para o qual você move o fluxo é o que atualiza a tabela, em vez do pipeline original. Isso é útil em muitos cenários, incluindo:

  • Divida um pipeline grande em pipelines menores.
  • Mesclar vários pipelines em um único pipeline maior.
  • Altere a frequência de atualização para algumas tabelas em um pipeline.
  • Mova tabelas de um pipeline que usa o modo de publicação herdado para o modo de publicação padrão. Para obter detalhes sobre o modo de publicação herdado, consulte Modo de publicação herdado para pipelines. Para ver como você pode migrar o modo de publicação para um pipeline inteiro de uma só vez, consulte Habilitar o modo de publicação padrão em um pipeline.
  • Mova tabelas entre pipelines em diferentes workspaces.

Requirements

Abaixo estão os requisitos para mover uma tabela entre pipelines.

  • Você deve usar o Databricks Runtime 16.3 ou superior ao executar o comando ALTER ..., e o Databricks Runtime 17.2 para mover a tabela entre espaços de trabalho.

  • Os pipelines de origem e de destino devem ser assim:

    • Propriedade da conta de usuário ou da entidade de serviço do Azure Databricks que executa a operação
    • Em workspaces que compartilham um metastore. Para verificar o metastore, consulte current_metastore a função.
  • O pipeline de destino deve usar o modo de publicação padrão. Isso permite que você publique tabelas em vários catálogos e esquemas.

    Como alternativa, ambos os pipelines devem usar o modo de publicação herdado e ambos devem ter o mesmo catálogo e valor de destino nas configurações. Para obter informações sobre o modo de publicação herdado, consulte o esquema LIVE (herdado).

Observação

Esse recurso não dá suporte à movimentação de um pipeline usando o modo de publicação padrão para um pipeline usando o modo de publicação herdado.

Como mover uma tabela entre pipelines

As instruções a seguir descrevem como mover uma tabela de transmissão ou uma exibição materializada de um pipeline para outro.

  1. Pare o pipeline de origem, caso esteja em execução. Aguarde que pare completamente.

  2. Remova a definição da tabela do código do pipeline de origem e armazene-a em algum lugar para referência futura.

    Inclua todas as consultas de suporte ou código necessários para que o pipeline seja executado corretamente.

  3. Em um notebook ou um editor de SQL, execute o seguinte comando SQL para reatribuir a tabela do pipeline de origem para o pipeline de destino:

    ALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
      SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");
    

    Observe que o comando SQL deve ser executado no workspace do pipeline de origem.

    O comando usa ALTER MATERIALIZED VIEW e ALTER STREAMING TABLE para as visualizações materializadas gerenciadas do Catálogo do Unity e tabelas de streaming, respectivamente. Para executar a mesma ação em uma tabela metastore do Hive, use ALTER TABLE.

    Por exemplo, se você quiser mover uma tabela de streaming nomeada sales para um pipeline com a ID abcd1234-ef56-ab78-cd90-1234efab5678, execute o seguinte comando:

    ALTER STREAMING TABLE sales
      SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");
    

    Observação

    O pipelineId deve ser um identificador de pipeline válido. O null valor não é permitido.

  4. Adicione a definição da tabela ao código do pipeline de destino.

    Observação

    Se o catálogo ou o esquema de destino diferem entre a origem e o destino, copiar a consulta exatamente pode não funcionar. Tabelas parcialmente qualificadas na definição podem ser resolvidas de forma diferente. Talvez seja necessário atualizar a definição durante a movimentação para qualificar totalmente os nomes de tabela.

    Observação

    Remova ou comente qualquer fluxo de anexação única (em Python, consultas com append_flow(once=True), no SQL, consultas com INSERT INTO ONCE) do código do pipeline de destino. Para obter mais detalhes, consulte Limitações.

A movimentação foi concluída. Agora você pode executar tanto o pipeline de origem quanto o de destino. O pipeline de destino atualizará a tabela.

Resolução de problemas

A tabela seguinte descreve erros que podem ocorrer ao mover uma tabela entre pipelines.

Erro Description
DESTINATION_PIPELINE_NOT_IN_DIRECT_PUBLISHING_MODE O pipeline de origem fica no modo padrão de publicação e o destino usa o modo de esquema LIVE (legado). Isso não tem suporte. Se a origem usar o modo de publicação padrão, o destino também deverá ser usado.
PIPELINE_TYPE_NOT_WORKSPACE_PIPELINE_TYPE Há suporte apenas para mover tabelas entre pipelines. Não há suporte para mover tabelas de streaming e exibições materializadas criadas com o Databricks SQL.
DESTINATION_PIPELINE_NOT_FOUND O pipeline pipelines.pipelineId precisa ser válido. O pipelineId não pode ser nulo.
A tabela falha ao atualizar no destino após a movimentação. Para mitigar rapidamente nesse caso, mova a tabela de volta ao pipeline de origem seguindo as mesmas instruções.
PIPELINE_PERMISSION_DENIED_NOT_OWNER Os pipelines de origem e de destino devem pertencer ao usuário que está realizando a operação de movimentação.
TABLE_ALREADY_EXISTS A tabela listada na mensagem de erro já existe. Isso pode acontecer se já existir uma tabela de backup para o pipeline. Nesse caso, DROP a tabela é a listada pelo erro.

Exemplo com várias tabelas em um pipeline

Os pipelines podem conter mais de uma tabela. Você ainda pode mover uma tabela de cada vez entre pipelines. Nesse cenário, há três tabelas (table_a, table_b, table_c) que são lidas umas das outras sequencialmente no pipeline de origem. Queremos mover uma tabela table_b para outro pipeline.

Código de pipeline de origem inicial:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Table to be moved to new pipeline:
@dp.table
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Movemos table_b para outro pipeline copiando e removendo a definição da tabela da origem e atualizando o pipelineId de table_b.

Primeiro, pause os agendamentos e aguarde a conclusão das atualizações nos pipelines de origem e de destino. Em seguida, modifique o pipeline de origem para remover o código da tabela que está sendo movida. O exemplo de código atualizado do pipeline de origem é:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table
def table_a():
    return spark.read.table("source_table")

# Removed, to be in new pipeline:
# @dp.table
# def table_b():
#     return (
#         spark.read.table("table_a")
#         .select(col("column1"), col("column2"))
#     )

@dp.table
def table_c():
    return (
        spark.read.table("table_b")
        .groupBy(col("column1"))
        .agg(sum("column2").alias("sum_column2"))
    )

Vá para o editor do SQL para executar o ALTER pipelineId comando.

ALTER MATERIALIZED VIEW table_b
  SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");

Em seguida, vá para o pipeline de destino e adicione a definição de table_b. Se o catálogo e o esquema padrão forem os mesmos nas configurações de pipeline, nenhuma alteração de código será necessária.

O código do pipeline de destino:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="table_b")
def table_b():
    return (
        spark.read.table("table_a")
        .select(col("column1"), col("column2"))
    )

Se o catálogo e o esquema padrão forem diferentes nas configurações de pipeline, você deverá adicionar o nome totalmente qualificado usando o catálogo e o esquema do pipeline.

Por exemplo, o código do pipeline de destino pode ser:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.table(name="source_catalog.source_schema.table_b")
def table_b():
    return (
        spark.read.table("source_catalog.source_schema.table_a")
        .select(col("column1"), col("column2"))
    )

Execute (ou reative os agendamentos) para ambos os pipelines de origem e de destino.

Os pipelines agora estão desconectados. A consulta para table_c lê de table_b (agora no pipeline de destino) e table_b lê de table_a (no pipeline de origem). Quando você executa uma execução ativada no pipeline de origem, o table_b não é atualizado porque já não está sob gestão do pipeline de origem. O pipeline de origem trata table_b como uma tabela externa ao pipeline. Isso é comparável à definição de uma leitura de exibição materializada de uma tabela Delta no Catálogo do Unity que não é gerenciada pelo pipeline.

Limitações

Veja abaixo as limitações para mover tabelas entre pipelines.

  • Não há suporte para exibições materializadas e tabelas de streaming criadas com o Databricks SQL.
  • Os fluxos de anexação única – Python append_flow(once=True) e SQL INSERT INTO ONCE – não são suportados. Os status de execução deles não são preservados, eles podem ser executados novamente no pipeline de destino. Remova ou comente os fluxos de anexação única do pipeline de destino para evitar executar esses fluxos novamente.
  • Não há suporte para tabelas ou exibições privadas.
  • Os fluxos de origem e de destino devem ser pipelines. Não há suporte para os pipelines nulos.
  • Os pipelines de origem e de destino devem estar no mesmo workspace ou em workspaces diferentes que compartilham o mesmo metastore.
  • Os pipelines de origem e de destino devem pertencer ao usuário que está executando a operação de movimentação.
  • Se o pipeline de origem usar o modo de publicação padrão, o pipeline de destino também deverá estar usando o modo de publicação padrão. Você não pode mover uma tabela de um pipeline usando o modo de publicação padrão para um pipeline que usa o esquema LIVE (herdado). Confira Esquema LIVE (herdado).
  • Se os pipelines de origem e de destino estiverem usando o esquema LIVE (herdado), eles deverão ter o mesmo catalog e target valores nas configurações.