Compartilhar via


Padrões comuns de carregamento de dados

O Carregador Automático simplifica uma série de tarefas comuns de ingestão de dados. Esta referência rápida fornece exemplos para vários padrões populares.

Ingerir dados do armazenamento de objetos de nuvem como variante

O Carregador Automático pode carregar todos os dados das fontes de arquivo com suporte como uma única VARIANT coluna em uma tabela de destino. Como VARIANT é flexível para alterações de esquema e tipo e mantém a confidencialidade de maiúsculas e minúsculas e NULL valores presentes na fonte de dados, esse padrão é robusto para a maioria dos cenários de ingestão. Para obter detalhes, consulte Ingestão de dados do armazenamento de objetos de nuvem como variante.

Filtrar diretórios ou arquivos usando padrões glob

Padrões glob podem ser usados para filtrar diretórios e arquivos quando fornecidos no caminho.

Padrão Descrição
? Corresponde a qualquer caractere único
* Corresponde a zero ou mais caracteres
[abc] Corresponde a um único caractere do conjunto de caracteres {a,b,c}.
[a-z] Corresponde a um único caractere do intervalo de caracteres {a…z}.
[^a] Corresponde a um único caractere que não é do conjunto de caracteres ou do intervalo {a}. Observe que o caractere ^ deve ocorrer imediatamente à direita do colchete de abertura.
{ab,cd} Corresponde a uma cadeia de caracteres do conjunto de cadeias de caracteres {ab, cd}.
{ab,c{de, fh}} Corresponde a uma cadeia de caracteres do conjunto de cadeias de caracteres {ab, cde, cfh}.

Use o path para fornecer padrões de prefixo, por exemplo:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala (linguagem de programação)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Importante

Você precisa usar a opção pathGlobFilter para fornecer explicitamente os padrões de sufixo. O path fornece apenas um filtro de prefixo.

Por exemplo, se quiser analisar apenas arquivos png dentro de um diretório que contém arquivos com sufixos diferentes, você poderá:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala (linguagem de programação)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Observação

O comportamento padrão de globbing do Carregador Automático é diferente do comportamento padrão de outras fontes de arquivo do Spark. Adicione .option("cloudFiles.useStrictGlobber", "true") à leitura para usar o globbing que corresponde ao comportamento padrão do Spark em relação às fontes de arquivo. Confira a tabela a seguir para saber mais sobre globbing:

Padrão Caminho do arquivo Globber padrão Globber estrito
/a/b /a/b/c/file.txt Sim Sim
/a/b /a/b_dir/c/file.txt Não Não
/a/b /a/b.txt Não Não
/a/b/ /a/b.txt Não Não
/a/*/c/ /a/b/c/file.txt Sim Sim
/a/*/c/ /a/b/c/d/file.txt Sim Sim
/a/*/c/ /a/b/x/y/c/file.txt Sim Não
/a/*/c /a/b/c_file.txt Sim Não
/a/*/c/ /a/b/c_file.txt Sim Não
/a/*/c/ /a/*/cookie/file.txt Sim Não
/a/b* /a/b.txt Sim Sim
/a/b* /a/b/file.txt Sim Sim
/a/{0.txt,1.txt} /a/0.txt Sim Sim
/a/*/{0.txt,1.txt} /a/0.txt Não Não
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Sim Sim

Habilitar ETL fácil

Uma maneira fácil de colocar seus dados no Delta Lake sem perder dados é usar o padrão a seguir e habilite a inferência de esquema com o Carregador Automático. O Databricks recomenda executar o código a seguir em um Azure Databricks para que ele reinicie automaticamente o fluxo quando o esquema dos dados de origem for alterado. Por padrão, o esquema é inferido como tipos de cadeia de caracteres, qualquer erro de análise (não deve haver nenhum se tudo permanecer como uma cadeia de caracteres) irá para _rescued_data e quaisquer novas colunas falharão no fluxo e evoluirão o esquema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala (linguagem de programação)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Evitar perda de dados em dados bem estruturados

Quando você conhece seu esquema, mas deseja saber sempre que receber dados inesperados, o Databricks recomenda usar o rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala (linguagem de programação)

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Se você quiser que seu fluxo pare o processamento se um novo campo for introduzido que não corresponda ao esquema, você poderá adicionar:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Habilitar pipelines de dados semiestruturados flexíveis

Quando você estiver recebendo dados de um fornecedor que introduz novas colunas às informações que eles fornecem, talvez você não esteja ciente exatamente quando eles o fizerem ou talvez você não tenha a largura de banda para atualizar o pipeline de dados. Agora você pode aproveitar a evolução do esquema para reiniciar o fluxo e permitir que o Carregador Automático atualize o esquema inferido automaticamente. Você também pode aproveitar schemaHints para alguns dos campos "sem esquema" que o fornecedor pode estar fornecendo.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala (linguagem de programação)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformar dados JSON aninhados

Como o Carregador Automático infere as colunas JSON de nível superior como cadeias de caracteres, você pode ficar com objetos JSON aninhados que exigirão transformações posteriores. Você pode usar as APIs de acesso a dados semiestruturados para transformar posteriormente o conteúdo JSON complexo.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala (linguagem de programação)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inferir dados JSON aninhados

No caso de dados aninhados, é possível usar a opção cloudFiles.inferColumnTypes para inferir a estrutura aninhada deles e outros tipos de coluna.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala (linguagem de programação)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Carregar arquivos CSV sem cabeçalhos

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala (linguagem de programação)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Impor um esquema a arquivos CSV com cabeçalhos

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala (linguagem de programação)

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingerir dados binários ou de imagem no Delta Lake para ML

Após os dados serem armazenados no Delta Lake, é possível executar a inferência distribuída nos dados. Confira Executar a inferência distribuída usando uma UDF do Pandas.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala (linguagem de programação)

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Sintaxe do Auto Loader para Pipelines Declarativos do Lakeflow Spark

O Lakeflow Spark Declarative Pipelines fornece uma sintaxe python ligeiramente modificada para o Carregador Automático e adiciona suporte a SQL para Carregador Automático.

Os exemplos a seguir usam o Carregador automático para criar conjuntos de dados dos arquivos CSV e JSON:

Python

@dp.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dp.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders/",
  format => "json")

Você pode usar opções de formato com suporte para o Carregador Automático. As opções para read_files são pares chave-valor. Para obter detalhes sobre formatos e opções com suporte, consulte Opções.

Por exemplo:

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

O seguinte exemplo lê dados dos arquivos CSV delimitados por tabulação com um cabeçalho:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv",
  delimiter => "\t",
  header => "true"
)

Você pode usar o schema para especificar o formato manualmente; você precisa especificar o schema para os formatos que não dão suporte à inferência de esquema:

Python

@dp.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
  format => "parquet",
  schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)

Observação

O Lakeflow Spark Declarative Pipelines configura e gerencia automaticamente os diretórios de esquema e ponto de verificação ao usar o Carregador Automático para ler arquivos. No entanto, se você configurar manualmente qualquer um desses diretórios, a execução de uma atualização completa não afetará o conteúdo dos diretórios configurados. O Databricks recomenda usar os diretórios configurados automaticamente para evitar efeitos colaterais inesperados durante o processamento.