Partilhar via


Padrões comuns de carregamento de dados

O Auto Loader simplifica uma série de tarefas comuns de ingestão de dados. Esta referência rápida fornece exemplos de vários padrões populares.

Filtrando diretórios ou ficheiros usando padrões de glob

Os padrões de Glob podem ser usados para filtrar diretórios e arquivos quando fornecidos no caminho.

Padrão Descrição
? Corresponde a qualquer caractere
* Corresponde a zero ou mais caracteres
[abc] Corresponde a um único caractere do conjunto de caracteres {a,b,c}.
[a-z] Corresponde a um único caractere do intervalo de caracteres {a... z}.
[^a] Corresponde a um único caractere que não é do conjunto de caracteres ou intervalo {a}. Observe que o ^ carácter deve aparecer imediatamente à direita do colchete de abertura.
{ab,cd} Corresponde a uma string do conjunto de strings {ab, cd}.
{ab,c{de, fh}} Corresponde a uma string do conjunto de strings {ab, cde, cfh}.

Use o path para fornecer padrões de prefixo, por exemplo:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

linguagem de programação Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

Importante

Você precisa usar a opção pathGlobFilter para fornecer explicitamente padrões de sufixo. O path fornece apenas um filtro de prefixo.

Por exemplo, se você quiser analisar apenas png arquivos em um diretório que contém arquivos com sufixos diferentes, você pode fazer:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

linguagem de programação Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

Nota

O comportamento de globbing padrão do Auto Loader é diferente do comportamento padrão de outras fontes de arquivos do Spark. Adicione .option("cloudFiles.useStrictGlobber", "true") à sua leitura para usar o globbing que corresponde ao comportamento padrão do Spark em relação às fontes de arquivo. Consulte a tabela a seguir para obter mais informações sobre globbing:

Padrão Caminho do ficheiro Globber padrão Globber rigoroso
/a/b /a/b/c/file.txt Sim Sim
/a/b /a/b_dir/c/file.txt Não Não
/a/b /a/b.txt Não Não
/a/b/ /a/b.txt Não Não
/a/*/c/ /a/b/c/file.txt Sim Sim
/a/*/c/ /a/b/c/d/file.txt Sim Sim
/a/*/c/ /a/b/x/y/c/file.txt Sim Não
/a/*/c /a/b/c_file.txt Sim Não
/a/*/c/ /a/b/c_file.txt Sim Não
/a/*/c/ /a/*/cookie/file.txt Sim Não
/a/b* /a/b.txt Sim Sim
/a/b* /a/b/file.txt Sim Sim
/a/{0.txt,1.txt} /a/0.txt Sim Sim
/a/*/{0.txt,1.txt} /a/0.txt Não Não
/a/b/[cde-h]/i/ /a/b/c/i/file.txt Sim Sim

Habilite ETL fácil

Uma maneira fácil de colocar seus dados no Delta Lake sem perder nenhum dado é usar o seguinte padrão e habilitar a inferência de esquema com o Auto Loader. O Databricks recomenda executar o código a seguir em um trabalho do Azure Databricks para que ele reinicie automaticamente seu fluxo quando o esquema dos dados de origem for alterado. Por padrão, o esquema é inferido como tipos de cadeia de caracteres, quaisquer erros de análise (não deve haver nenhum se tudo permanecer como uma cadeia de caracteres) irão para _rescued_data, e quaisquer novas colunas falharão no fluxo e evoluirão o esquema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

linguagem de programação Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Evite a perda de dados em dados bem estruturados

Quando você conhece seu esquema, mas quer saber sempre que recebe dados inesperados, o Databricks recomenda o uso do rescuedDataColumn.

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

linguagem de programação Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Se quiser que o fluxo pare de processar se for introduzido um novo campo que não corresponda ao seu esquema, você pode adicionar:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Habilite pipelines de dados semiestruturados flexíveis

Quando você está recebendo dados de um fornecedor que introduz novas colunas para as informações que eles fornecem, você pode não estar ciente exatamente de quando eles fazem isso, ou você pode não ter a largura de banda para atualizar seu pipeline de dados. Agora você pode aproveitar a evolução do esquema para reiniciar o fluxo e permitir que o Auto Loader atualize o esquema inferido automaticamente. Você também pode utilizar o schemaHints para alguns dos campos "sem esquema" que o fornecedor possivelmente está a disponibilizar.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

linguagem de programação Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Transformar dados JSON aninhados

Como o Auto Loader infere as colunas JSON de nível superior como cadeias de caracteres, você pode ficar com objetos JSON aninhados que exigem transformações adicionais. Você pode usar as APIs de acesso a dados semiestruturados para transformar ainda mais o conteúdo JSON complexo.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

linguagem de programação Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

Inferir dados JSON aninhados

Quando tiveres dados aninhados, poderás usar a opção cloudFiles.inferColumnTypes para inferir a estrutura aninhada dos dados e outros tipos de coluna.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

linguagem de programação Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

Carregar arquivos CSV sem cabeçalhos

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

linguagem de programação Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Impor um esquema em arquivos CSV com cabeçalhos

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

linguagem de programação Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

Ingerir imagem ou dados binários em Delta Lake para ML

Depois que os dados são armazenados no Delta Lake, você pode executar inferência distribuída nos dados. Consulte Executar inferência distribuída usando pandas UDF.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

linguagem de programação Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

Sintaxe do Auto Loader para Lakeflow Declarative Pipelines

Lakeflow Declarative Pipelines fornece sintaxe Python ligeiramente modificada para Auto Loader e adiciona suporte SQL para Auto Loader.

Os exemplos a seguir usam o Auto Loader para criar conjuntos de dados a partir de arquivos CSV e JSON:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders/",
  format => "json")

Você pode usar as opções de formato suportadas para o Auto Loader. As opções para read_files são pares chave-valor. Para obter detalhes sobre os formatos e opções suportados, consulte Opções .

Por exemplo:

CREATE OR REFRESH STREAMING TABLE my_table
AS SELECT *
  FROM STREAM read_files(
    "/Volumes/my_volume/path/to/files/*",
    option-key => option-value,
    ...
  )

O exemplo a seguir lê dados de arquivos CSV delimitados por tabulações com um cabeçalho:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv",
  delimiter => "\t",
  header => "true"
)

Você pode usar o schema para especificar o formato manualmente, você deve especificar o schema para formatos que não suportam inferência de esquema:

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
  format => "parquet",
  schema => "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING"
)

Nota

O Lakeflow Declarative Pipelines configura e gerencia automaticamente o esquema e os diretórios de pontos de verificação ao usar o Auto Loader para ler arquivos. No entanto, se você configurar manualmente qualquer um desses diretórios, a execução de uma atualização completa não afetará o conteúdo dos diretórios configurados. O Databricks recomenda o uso dos diretórios configurados automaticamente para evitar efeitos colaterais inesperados durante o processamento.