Compartilhar via


Carregar dados em pipelines

Você pode carregar dados de qualquer fonte de dados com suporte do Apache Spark no Azure Databricks usando pipelines. Você pode definir conjuntos de dados (tabelas e visões) em Pipelines Declarativos do Lakeflow Spark para qualquer consulta que retorne um DataFrame do Spark, incluindo DataFrames de streaming e Pandas em DataFrames do Spark. Para tarefas de ingestão de dados, o Databricks recomenda usar tabelas de streaming para a maioria dos casos de uso. As tabelas de streaming são boas para assimilar dados do armazenamento de objetos da nuvem usando o Auto Loader ou de barramentos de mensagens como o Kafka.

Observação

  • Nem todas as fontes de dados têm suporte sql para ingestão. Você pode misturar fontes SQL e Python em pipelines para usar o Python onde for necessário e o SQL para outras operações no mesmo pipeline.
  • Para obter detalhes sobre como trabalhar com bibliotecas não empacotadas no Lakeflow Spark Declarative Pipelines por padrão, consulte Gerenciar dependências do Python para pipelines.
  • Para obter informações gerais sobre a ingestão no Azure Databricks, consulte Conectores Padrão no Lakeflow Connect.

Os exemplos abaixo demonstram alguns padrões comuns.

Carregar de uma tabela existente

Carregue dados de qualquer tabela existente no Azure Databricks. Você pode transformar os dados usando uma consulta ou carregar a tabela para processamento adicional em seu pipeline.

O exemplo a seguir lê dados de uma tabela existente:

Python

@dp.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

Carregar arquivos do armazenamento de objetos da nuvem

O Databricks recomenda o uso do Carregador Automático em pipelines para a maioria das tarefas de ingestão de dados do armazenamento de objetos de nuvem ou de arquivos em um volume do Catálogo do Unity. O Carregador Automático e os pipelines são projetados para carregar dados cada vez maiores de forma incremental e idempotente à medida que chegam ao armazenamento em nuvem.

Veja o que é o Carregador Automático? e carregar dados do armazenamento de objetos.

O exemplo a seguir lê dados do armazenamento em nuvem usando o Carregador Automático:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
    format => "json"
  );

Os exemplos a seguir usam o Carregador Automático para criar conjuntos de dados de arquivos CSV em um volume do Catálogo do Unity:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

Observação

  • Se você usar o Carregador Automático com notificações de arquivo e executar uma atualização completa no seu pipeline ou tabela de streaming, deverá limpar manualmente os recursos. Você pode usar o CloudFilesResourceManager em um notebook para executar a limpeza.
  • Para carregar arquivos com o Carregador Automático em um pipeline habilitado para o Catálogo do Unity, você deve usar localizações externas. Para saber mais sobre como usar o Catálogo do Unity com pipelines, consulte Usar o Catálogo do Unity com pipelines.

Carregar dados de um barramento de mensagens

Você pode configurar pipelines para ingerir dados de barramentos de mensagens. O Databricks recomenda o uso de tabelas de streaming com execução contínua e dimensionamento automático aprimorado para fornecer a ingestão mais eficiente para o carregamento de baixa latência dos barramentos de mensagens. Consulte Otimize a utilização do cluster de Pipelines Declarativos do Lakeflow Spark com Dimensionamento Automático.

Por exemplo, o código a seguir configura uma tabela de streaming para ingerir dados do Kafka usando a função read_kafka :

Python

from pyspark import pipelines as dp

@dp.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

Para ingerir de outras fontes de barramento de mensagens, confira:

Carregar dados dos Hubs de Eventos do Azure

Os Hubs de Eventos do Azure são um serviço de streaming de dados que fornece uma interface compatível com o Apache Kafka. Você pode usar o Conector Kafka de Streaming Estruturado, incluído no ambiente de execução do Lakeflow Spark Declarative Pipelines, para carregar mensagens do serviço de Hubs de Eventos do Azure. Para saber mais sobre como carregar e processar mensagens dos Hubs de Eventos do Azure, consulte Usar os Hubs de Eventos do Azure como uma fonte de dados de pipeline.

Carregar dados de locais externos

O Lakeflow Spark Declarative Pipelines dá suporte ao carregamento de dados de qualquer fonte de dados com suporte do Azure Databricks. Consulte Conectar-se a fontes de dados e serviços externos. Você também pode carregar dados externos usando Lakehouse Federation para fontes de dados com suporte. Como a Lakehouse Federation requer o Databricks Runtime 13.3 LTS ou superior para usar o Lakehouse Federation, seu pipeline deve ser configurado para usar o canal de visualização.

Algumas fontes de dados não têm suporte equivalente no SQL. Se você não puder usar a Federação Lakehouse com uma dessas fontes de dados, poderá usar o Python para importar dados da origem. Você pode adicionar arquivos de origem Python e SQL ao mesmo pipeline. O exemplo a seguir declara uma exibição materializada para acessar o estado atual dos dados em uma tabela remota do PostgreSQL:

import dp

@dp.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

Carregar conjuntos de dados pequenos ou estáticos do armazenamento de objetos de nuvem

Você pode carregar conjuntos de dados pequenos ou estáticos usando a sintaxe de carregamento do Apache Spark. O Lakeflow Spark Declarative Pipelines dá suporte a todos os formatos de arquivo compatíveis com o Apache Spark no Azure Databricks. Para obter uma lista completa, consulte as opções de formato de dados.

Os exemplos a seguir demonstram o carregamento de JSON para criar uma tabela:

Python

@dp.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

Observação

A função SQL read_files é comum a todos os ambientes SQL no Azure Databricks. É o padrão recomendado para acesso direto a arquivos usando SQL em pipelines. Para obter mais informações, consulte Opções.

Carregar dados de uma fonte de dados personalizada do Python

As fontes de dados personalizadas do Python permitem carregar dados em formatos personalizados. Você pode escrever código para ler e gravar em uma fonte de dados externa específica ou aproveitar o código Python existente em seus sistemas existentes para ler dados de seus próprios sistemas internos. Para obter mais detalhes sobre como desenvolver fontes de dados do Python, consulte fontes de dados personalizadas do PySpark.

Para usar uma fonte de dados personalizada do Python para carregar dados em um pipeline, registre-a com um nome de formato, como my_custom_datasource, em seguida, leia a partir dele:

from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
    return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
    return spark.readStream.format("my_custom_datasource").load()

Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem

Observação

  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option(). Você não pode usar esse sinalizador em uma função dp.read_stream().
  • Você não pode usar a skipChangeCommits flag quando a tabela de streaming de origem é definida como destino de uma função create_auto_cdc_flow().

Por padrão, as tabelas de streaming exigem fontes somente de acréscimo. Quando uma tabela de streaming usa outra tabela de streaming como origem e a tabela de streaming de origem requer atualizações ou exclusões, por exemplo, processamento GDPR "direito de ser esquecido", o sinalizador skipChangeCommits pode ser definido ao ler a tabela de streaming de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

@dp.table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

Acessar as credenciais de armazenamento com segurança com os segredos em um pipeline

Você pode usar segredos do Azure Databricks para armazenar credenciais, como chaves de acesso ou senhas. Para configurar o segredo nop seu pipeline, use uma propriedade Spark na configuração do cluster de configurações do pipeline. Consulte Configuração clássica de computação para pipelines.

O exemplo a seguir usa um segredo para armazenar uma chave de acesso necessária para ler dados de entrada de uma conta de armazenamento do ADLS (Azure Data Lake Storage) usando o Carregador Automático. Você pode usar esse mesmo método para configurar um segredo exigido pelo pipeline, por exemplo, chaves AWS para acessar o S3 ou a senha para um metastore do Hive do Apache.

Para saber mais sobre como trabalhar com o Azure Data Lake Storage, consulte Connect to Azure Data Lake Storage and Blob Storage.

Observação

Você deve adicionar o prefixo spark.hadoop. à chave de configuração spark_conf que define o valor secreto.

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

Substitua

  • <storage-account-name> com o nome da conta de armazenamento do ADLS.
  • <scope-name> com o nome do escopo do segredo do Azure Databricks.
  • <secret-name> com o nome da chave que contém a chave de acesso da conta de armazenamento do Azure.
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Substitua

  • <container-name> com o nome do contêiner da conta de armazenamento do Azure que armazena os dados de entrada.
  • <storage-account-name> com o nome da conta de armazenamento do ADLS.
  • <path-to-input-dataset> com o caminho do conjunto de dados de entrada.