Partilhar via


Carregar dados com Tabelas Delta Live

Pode carregar dados de qualquer origem de dados suportada pelo Apache Spark no Azure Databricks com Tabelas Delta Live. Pode definir conjuntos de dados (tabelas e vistas) em Tabelas Dinâmicas Delta em qualquer consulta que devolva um DataFrame do Spark, incluindo dataFrames de transmissão em fluxo e Pandas para DataFrames do Spark. Para tarefas de ingestão de dados, o Databricks recomenda a utilização de tabelas de transmissão em fluxo para a maioria dos casos de utilização. As tabelas de transmissão em fluxo são boas para ingerir dados do armazenamento de objetos na cloud com o Carregador Automático ou a partir de barramentos de mensagens, como o Kafka. Os exemplos abaixo demonstram alguns padrões comuns.

Importante

Nem todas as fontes de dados têm suporte a SQL. Pode misturar blocos de notas SQL e Python num pipeline de Tabelas Dinâmicas Delta para utilizar o SQL para todas as operações além da ingestão.

Para obter detalhes sobre como trabalhar com bibliotecas não empacotadas no Delta Live Tables por padrão, consulte Manage Python dependencies for Delta Live Tables pipelines.

Carregar ficheiros a partir do armazenamento de objetos na cloud

A Databricks recomenda o uso do Auto Loader com Delta Live Tables para a maioria das tarefas de ingestão de dados do armazenamento de objetos na nuvem. O Auto Loader e o Delta Live Tables são projetados para carregar dados cada vez maiores de forma incremental e idempotente, à medida que chegam ao armazenamento em nuvem. Os exemplos a seguir usam o Auto Loader para criar conjuntos de dados a partir de arquivos CSV e JSON:

Nota

Para carregar ficheiros com o Carregador Automático num pipeline ativado para o Catálogo do Unity, deve usar localizações externas. Para saber mais sobre como utilizar o Catálogo unity com Tabelas Delta Live, veja Utilizar o Catálogo Unity com os pipelines do Delta Live Tables.

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Consulte O que é o Auto Loader? e a sintaxe SQL do Auto Loader.

Aviso

Se você usar o Auto Loader com notificações de arquivo e executar uma atualização completa para seu pipeline ou tabela de streaming, deverá limpar manualmente seus recursos. Você pode usar o CloudFilesResourceManager em um bloco de anotações para executar a limpeza.

Carregar dados de um barramento de mensagens

Você pode configurar pipelines Delta Live Tables para ingerir dados de barramentos de mensagens com tabelas de streaming. O Databricks recomenda a combinação de tabelas de streaming com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para carregamento de baixa latência a partir de barramentos de mensagens. Consulte Otimizar a utilização de cluster de pipelines Delta Live Tables com dimensionamento automático aprimorado.

Por exemplo, o código a seguir configura uma tabela de streaming para ingerir dados do Kafka:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

Você pode anotar operações downstream em SQL puro para executar transformações de streaming nesses dados, como no exemplo a seguir:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

Para obter um exemplo de como trabalhar com Hubs de Eventos, consulte Usar Hubs de Eventos do Azure como uma fonte de dados Delta Live Tables.

Consulte Configurar fontes de dados de streaming.

Carregar dados de sistemas externos

O Delta Live Tables dá suporte ao carregamento de dados de qualquer fonte de dados suportada pelo Azure Databricks. Consulte Conectar-se a fontes de dados. Você também pode carregar dados externos usando a Lakehouse Federation para fontes de dados suportadas. Como a Lakehouse Federation requer o Databricks Runtime 13.3 LTS ou superior, para usar a Lakehouse Federation seu pipeline deve ser configurado para usar o canal de visualização.

Algumas fontes de dados não têm suporte equivalente em SQL. Se você não puder usar a Lakehouse Federation com uma dessas fontes de dados, poderá usar um bloco de anotações Python autônomo para ingerir dados da fonte. Este bloco de anotações pode ser adicionado como uma biblioteca de origem com blocos de anotações SQL para criar um pipeline Delta Live Tables. O exemplo a seguir declara uma exibição materializada para acessar o estado atual dos dados em uma tabela remota do PostgreSQL:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Carregar conjuntos de dados pequenos ou estáticos a partir do armazenamento de objetos na cloud

Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento do Apache Spark. O Delta Live Tables suporta todos os formatos de ficheiro suportados pelo Apache Spark no Azure Databricks. Para obter uma lista completa, consulte Opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar tabelas Delta Live Tables:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

Nota

A SELECT * FROM format.`path`; construção SQL é comum a todos os ambientes SQL no Azure Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL com Delta Live Tables.

Acesse credenciais de armazenamento com segurança com segredos em um pipeline

Você pode usar segredos do Azure Databricks para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo em seu pipeline, use uma propriedade Spark na configuração do cluster de configurações de pipeline. Consulte Definir suas configurações de computação.

O exemplo a seguir usa um segredo para armazenar uma chave de acesso necessária para ler dados de entrada de uma conta de armazenamento do Azure Data Lake Storage Gen2 (ADLS Gen2) usando o Auto Loader. Você pode usar esse mesmo método para configurar qualquer segredo exigido pelo seu pipeline, por exemplo, chaves da AWS para acessar o S3 ou a senha para um metastore do Apache Hive.

Para saber mais sobre como trabalhar com o Azure Data Lake Storage Gen2, consulte Conectar-se ao Azure Data Lake Storage Gen2 e Blob Storage.

Nota

Você deve adicionar o prefixo spark.hadoop. à spark_conf chave de configuração que define o valor secreto.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> com o nome da conta de armazenamento ADLS Gen2.
  • <scope-name> com o nome do escopo secreto do Azure Databricks.
  • <secret-name> com o nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> com o nome do contêiner da conta de armazenamento do Azure que armazena os dados de entrada.
  • <storage-account-name> com o nome da conta de armazenamento ADLS Gen2.
  • <path-to-input-dataset> com o caminho para o conjunto de dados de entrada.

Carregar dados dos Hubs de Eventos do Azure

Os Hubs de Eventos do Azure são um serviço de streaming de dados que fornece uma interface compatível com Apache Kafka. Você pode usar o conector Kafka de Streaming Estruturado, incluído no tempo de execução do Delta Live Tables, para carregar mensagens dos Hubs de Eventos do Azure. Para saber mais sobre como carregar e processar mensagens dos Hubs de Eventos do Azure, consulte Usar Hubs de Eventos do Azure como uma fonte de dados Delta Live Tables.