Leitura de dados compartilhados usando o compartilhamento aberto do Compartilhamento Delta (para destinatários)

Este artigo descreve como ler dados que foram compartilhados com você usando o protocolo de compartilhamento aberto do Compartilhamento Delta. No compartilhamento aberto, você usa um arquivo de credencial que foi compartilhado com um membro da sua equipe pelo provedor de dados para obter acesso de leitura seguro aos dados compartilhados. O acesso persiste enquanto a credencial for válida e o provedor continuar compartilhando os dados. Os provedores gerenciam a expiração e a rotação das credenciais. As atualizações dos dados ficam disponíveis quase em tempo real. Você pode ler e fazer cópias dos dados compartilhados, mas não pode modificar os dados de origem.

Observação

Se os dados tiverem sido compartilhados com você usando o Compartilhamento Delta do Databricks para Databricks, você não precisará de um arquivo de credencial para acessar dados e este artigo não se aplicará a você. Para obter instruções, consulte Leitura de dados compartilhados utilizando o Compartilhamento Delta do Databricks para o Databricks (para destinatários).

As seções a seguir descrevem como usar o Azure Databricks, o Apache Spark, o Pandas e o Power BI para acessar e ler dados compartilhados usando o arquivo de credenciais. Para obter uma lista completa de conectores e informações do Compartilhamento Delta sobre como usá-los, confira a documentação de código aberto do Compartilhamento Delta. Caso tenha problemas para acessar os dados compartilhados, entre em contato com o provedor de dados.

Observação

As integrações de parceiros são, a menos que indicado de outra forma, fornecidas por terceiros, e é preciso ter uma conta com o provedor apropriado para usar os respectivos produtos ou serviços. Embora o Databricks faça o melhor para manter esse conteúdo atualizado, não fazemos nenhuma declaração em relação às integrações nem à precisão do conteúdo nas páginas de integração do parceiro. Fale com os provedores apropriados sobre as integrações.

Antes de começar

Um membro da sua equipe deve baixar o arquivo de credencial compartilhado pelo provedor de dados. Confira Obter acesso no modelo de compartilhamento aberto.

Eles devem usar um canal seguro para compartilhar esse arquivo ou local de arquivo com você.

Azure Databricks: ler dados compartilhados usando conectores de compartilhamento aberto

Esta seção descreve como usar um conector de compartilhamento aberto para acessar dados compartilhados usando um notebook no workspace do Azure Databricks. Você ou outro membro de sua equipe armazena o arquivo de credencial no DBFS e, em seguida, usa-o para autenticar a conta Azure Databricks do provedor de dados e ler os dados que o provedor de dados compartilhou com você.

Observação

Se o provedor de dados estiver usando o compartilhamento do Databricks para Databricks e não compartilhou um arquivo de credencial com você, será necessário acessar os dados usando o Unity Catalog. Para obter instruções, consulte Leitura de dados compartilhados utilizando o Compartilhamento Delta do Databricks para o Databricks (para destinatários).

Neste exemplo, você criará um notebook com várias células que pode ser executado de maneira independente. Em vez disso, você pode adicionar os comandos do notebook à mesma célula e executá-los em sequência.

Etapa 1: Armazenar o arquivo de credencial no DBFS (instruções do Python)

Nesta etapa, você usará um notebook Python no Azure Databricks para armazenar o arquivo de credencial para que os usuários da sua equipe possam acessar dados compartilhados.

Pule para a próxima etapa se você ou alguém da sua equipe já tiver armazenado o arquivo de credencial no DBFS.

  1. Em um editor de texto, abra o arquivo de credenciais.

  2. No workspace do Azure Databricks, clique em Novo > Notebook.

    • Insira um nome.
    • Defina a linguagem padrão para o notebook como Python.
    • Selecione um cluster para anexá-lo ao notebook.
    • Clique em Criar.

    O notebook será aberto no editor de notebook.

  3. Para usar o Python ou o Pandas para acessar os dados compartilhados, instale o conector do Python do compartilhamento delta. No editor de notebook, cole o seguinte comando:

    %sh pip install delta-sharing
    
  4. Execute a célula.

    A biblioteca delta-sharing do Python será instalada no cluster se ainda não estiver instalada.

  5. Em uma nova célula, cole o comando a seguir, que carrega o conteúdo do arquivo de credenciais em uma pasta no DBFS. Substitua as variáveis da seguinte forma:

    • <dbfs-path>: o caminho para a pasta em que deseja salvar o arquivo de credenciais

    • <credential-file-contents>: o conteúdo do arquivo de credenciais. Este não é um caminho para o arquivo, mas o conteúdo copiado do arquivo.

      O arquivo de credenciais contém um JSON que define três campos: shareCredentialsVersion, endpoint e bearerToken.

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. Execute a célula.

    Depois que o arquivo de credenciais for carregado, você poderá excluir essa célula. Todos os usuários do workspace podem ler o arquivo de credenciais do DBFS e ele fica disponível em todos os clusters e SQL warehouses no DBFS no seu workspace. Para excluir a célula, clique no x no menu de ações de célula Ações de célula mais à direita.

Etapa 2: Usar um notebook para listar e ler tabelas compartilhadas

Nesta etapa, você lista as tabelas no compartilhamento ou o conjunto de tabelas e partições compartilhadas e consulta uma tabela.

  1. Usando o Python, liste as tabelas do compartilhamento.

    Em uma nova célula, cole o comando a seguir. Substitua <dbfs-path> pelo caminho que foi criado na Etapa 1: armazene o arquivo de credencial no DBFS (instruções do Python).

    Quando o código é executado, o Python lê o arquivo de credenciais do DBFS no cluster. Acesse os dados armazenados no DBFS no caminho /dbfs/.

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. Execute a célula.

    O resultado é uma matriz de tabelas, com metadados para cada tabela. A seguinte saída mostra duas tabelas:

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    Se a saída estiver vazia ou não tiver as tabelas esperadas, entre em contato com o provedor de dados.

  3. Consultar uma tabela compartilhada.

    • Usando Scala:

      Em uma nova célula, cole o comando a seguir. Quando o código é executado, o arquivo de credenciais é lido do DBFS por meio da JVM.

      Substitua as variáveis da seguinte forma:

      • <profile-path>: o caminho do DBFS do arquivo de credenciais. Por exemplo, /<dbfs-path>/config.share.
      • <share-name>: o valor de share= na tabela.
      • <schema-name>: o valor de schema= na tabela.
      • <table-name>: o valor de name= na tabela.
      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      Execute a célula. Sempre que você carregar a tabela compartilhada, verá dados atualizados da origem.

    • Como usar o SQL:

      Para consultar os dados usando o SQL, você cria uma tabela local no workspace da tabela compartilhada e consultar a tabela local. Os dados compartilhados não são armazenados nem armazenados em cache na tabela local. Sempre que você consultar a tabela local, verá o estado atual dos dados compartilhados.

      Em uma nova célula, cole o comando a seguir.

      Substitua as variáveis da seguinte forma:

      • <local-table-name>: o nome da tabela local.
      • <profile-path>: o local do arquivo de credenciais.
      • <share-name>: o valor de share= na tabela.
      • <schema-name>: o valor de schema= na tabela.
      • <table-name>: o valor de name= na tabela.
      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      Quando você executa o comando, os dados compartilhados são consultados diretamente. Como um teste, a tabela é consultada, e os dez primeiros resultados são retornados.

    Se a saída estiver vazia ou não contiver os dados esperados, entre em contato com o provedor de dados.

Apache Spark: ler dados compartilhados

Siga estas etapas para acessar dados compartilhados usando o Spark 3.x ou superior.

Essas instruções pressupõem que você tenha acesso ao arquivo de credencial que foi compartilhado pelo provedor de dados. Confira Obter acesso no modelo de compartilhamento aberto.

Instalar os conectores Delta Sharing Python e Spark

Para acessar metadados relacionados aos dados compartilhados, como a lista de tabelas compartilhadas com você, faça o seguinte. Este exemplo usa o Python.

  1. Instale o conector do Python de compartilhamento delta:

    pip install delta-sharing
    
  2. Instale o conector do Apache Spark.

Listar tabelas compartilhadas usando o Spark

Liste as tabelas do compartilhamento. No exemplo a seguir, substitua <profile-path> pelo local do arquivo de credenciais.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

O resultado é uma matriz de tabelas, com metadados para cada tabela. A seguinte saída mostra duas tabelas:

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

Se a saída estiver vazia ou não tiver as tabelas esperadas, entre em contato com o provedor de dados.

Acessar dados compartilhados usando Spark

Execute o seguinte, substituindo estas variáveis:

  • <profile-path>: o local do arquivo de credenciais.
  • <share-name>: o valor de share= na tabela.
  • <schema-name>: o valor de schema= na tabela.
  • <table-name>: o valor de name= na tabela.
  • <version-as-of>: opcional. A versão da tabela para carregar os dados. Só funciona se o provedor de dados compartilhar o histórico da tabela. Requer o delta-sharing-spark 0.5.0 ou superior.
  • <timestamp-as-of>: opcional. Carregue os dados na versão anterior ou no carimbo de data/hora fornecido. Só funciona se o provedor de dados compartilhar o histórico da tabela. Requer o delta-sharing-spark 0.6.0 ou superior.

Python

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Scala

Execute o seguinte, substituindo estas variáveis:

  • <profile-path>: o local do arquivo de credenciais.
  • <share-name>: o valor de share= na tabela.
  • <schema-name>: o valor de schema= na tabela.
  • <table-name>: o valor de name= na tabela.
  • <version-as-of>: opcional. A versão da tabela para carregar os dados. Só funciona se o provedor de dados compartilhar o histórico da tabela. Requer o delta-sharing-spark 0.5.0 ou superior.
  • <timestamp-as-of>: opcional. Carregue os dados na versão anterior ou no carimbo de data/hora fornecido. Só funciona se o provedor de dados compartilhar o histórico da tabela. Requer o delta-sharing-spark 0.6.0 ou superior.
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

Acesse o feed de dados de alterações compartilhado usando o Spark

Se o histórico da tabelas tiver sido compartilhado com você e o feed de dados de alterações (CDF) estiver habilitado na tabela de origem, você poderá acessar o feed de dados de alterações executando o seguinte, substituindo essas variáveis. Requer o delta-sharing-spark 0.5.0 ou superior.

Um e somente um parâmetro inicial deve ser fornecido.

  • <profile-path>: o local do arquivo de credenciais.
  • <share-name>: o valor de share= na tabela.
  • <schema-name>: o valor de schema= na tabela.
  • <table-name>: o valor de name= na tabela.
  • <starting-version>: opcional. A versão inicial da consulta, inclusiva. Especifique como um Longo.
  • <ending-version>: opcional. A versão final da consulta, inclusiva. Se a versão final não for fornecida, a API usará a versão mais recente da tabela.
  • <starting-timestamp>: opcional. O carimbo de data inicial da consulta, isso é convertido em uma versão criada maior ou igual a esse carimbo de data. Esta é uma cadeia de caracteres no formato yyyy-mm-dd hh:mm:ss[.fffffffff].
  • <ending-timestamp>: opcional. O carimbo de data final da consulta, isso é convertido em uma versão criada menor ou igual a esse carimbo de data. Esta é uma cadeia de caracteres no formato yyyy-mm-dd hh:mm:ss[.fffffffff]

Python

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Scala

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Se a saída estiver vazia ou não contiver os dados esperados, entre em contato com o provedor de dados.

Acessar uma tabela compartilhada usando o Spark Structured Streaming

Se o histórico de tabelas for compartilhado com você, será possível transmitir a leitura dos dados compartilhados. Requer o delta-sharing-spark 0.6.0 ou superior.

Opções com suporte:

  • ignoreDeletes: ignorar transações que excluem dados.
  • ignoreChanges: reprocessar as atualizações se os arquivos foram reescritos na tabela de origem devido a uma operação de alteração de dados, como UPDATE, MERGE INTO, DELETE(dentro de partições) ou OVERWRITE. Linhas inalteradas ainda podem ser emitidas. Portanto, seus consumidores downstream devem ser capazes de lidar com duplicatas. As exclusões não são propagadas downstream. ignoreChanges incorpora ignoreDeletes. Portanto, se você usar ignoreChanges, o fluxo não será interrompido por exclusões ou atualizações na tabela de origem.
  • startingVersion: a versão da tabela compartilhada a ser iniciada. Todas as alterações de tabela que começam desta versão (inclusive) serão lidas pela fonte de streaming.
  • startingTimestamp: O carimbo de data/hora do qual começar. Todas as alterações de tabela confirmadas no carimbo de data/hora (inclusive) ou após este serão lidas pela fonte de streaming. Exemplo: "2023-01-01 00:00:00.0".
  • maxFilesPerTrigger: o número de novos arquivos a serem considerados em cada microlote.
  • maxBytesPerTrigger: a quantidade de dados processada em cada microlote. Essa opção define um "máximo flexível", o que significa que um lote processa aproximadamente essa quantidade de dados e pode processar mais do que o limite para fazer com que a consulta de streaming avance em casos em que a menor unidade de entrada é maior que esse limite.
  • readChangeFeed: o stream lê o feed de dados de alterações da tabela compartilhada.

Opções sem suporte:

  • Trigger.availableNow

Exemplos de consultas de Streaming Estruturado

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Consulte também Streaming no Azure Databricks.

Ler tabelas com vetores de exclusão ou mapeamento de colunas ativados

Importante

Esse recurso está em uma versão prévia.

Os vetores de exclusão são um recurso de otimização de armazenamento que seu provedor pode habilitar em tabelas Delta compartilhadas. Confira O que são vetores de exclusão?

O Azure Databricks também dá suporte ao mapeamento de colunas para tabelas Delta. Confira Renomear e remover colunas usando o mapeamento de colunas do Delta Lake.

Se o seu provedor compartilhou uma tabela com vetores de exclusão ou mapeamento de colunas habilitados, você poderá ler a tabela usando a computação em execução delta-sharing-spark 3.1 ou superior. Se você estiver usando clusters do Databricks, poderá executar leituras em lote usando um cluster que está executando o Databricks Runtime 14.1 ou superior. As consultas de streaming e CDF exigem o Databricks Runtime 14.2 ou superior.

Você pode executar consultas em lote como estão, pois elas podem resolver responseFormat automaticamente com base nos recursos de tabela da tabela compartilhada.

Para ler um CDF (feed de dados de alteração) ou executar consultas de streaming em tabelas compartilhadas com vetores de exclusão ou mapeamento de colunas habilitado, você deve definir a opção adicional responseFormat=delta.

Os exemplos a seguir mostram consultas em lote, de streaming e CDF:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: ler dados compartilhados

Siga estas etapas para acessar os dados compartilhados no Pandas 0.25.3 ou superior.

Essas instruções pressupõem que você tenha acesso ao arquivo de credencial que foi compartilhado pelo provedor de dados. Confira Obter acesso no modelo de compartilhamento aberto.

Instale o conector do Python de compartilhamento delta

Para acessar metadados relacionados aos dados compartilhados, como a lista de tabelas compartilhadas com você, instale o conector do Python do compartilhamento delta.

pip install delta-sharing

Listar tabelas compartilhadas usando o Pandas

Para listar as tabelas no compartilhamento, execute o seguinte, substituindo <profile-path>/config.share pelo local do arquivo de credencial.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

Se a saída estiver vazia ou não tiver as tabelas esperadas, entre em contato com o provedor de dados.

Acessar dados compartilhados usando Pandas

Para acessar dados compartilhados no Pandas usando o Python, execute o seguinte, substituindo as variáveis da seguinte maneira:

  • <profile-path>: o local do arquivo de credenciais.
  • <share-name>: o valor de share= na tabela.
  • <schema-name>: o valor de schema= na tabela.
  • <table-name>: o valor de name= na tabela.
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

Acessar um feed de dados de alterações compartilhados usando o Pandas

Para acessar o feed de dados alterados para uma tabela compartilhada em pandas usando Python, execute o seguinte, substituindo as variáveis ​​da seguinte maneira. Um feed de dados de alterações pode não estar disponível, dependendo se o provedor de dados compartilhou ou não o feed de dados de alterações para a tabela.

  • <starting-version>: opcional. A versão inicial da consulta, inclusiva.
  • <ending-version>: opcional. A versão final da consulta, inclusiva.
  • <starting-timestamp>: opcional. O carimbo de data/hora inicial da consulta. Isso é convertido em uma versão criada maior ou igual a esse carimbo de data.
  • <ending-timestamp>: opcional. O carimbo de data/hora final da consulta. Isso é convertido em uma versão criada anterior ou igual a esse carimbo de data.
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

Se a saída estiver vazia ou não contiver os dados esperados, entre em contato com o provedor de dados.

Power BI: ler dados compartilhados

O conector do Compartilhamento Delta do Power BI permite a você descobrir, analisar e visualizar conjuntos de dados compartilhados com eles por meio do protocolo aberto do Compartilhamento Delta.

Requisitos

Conectar-se ao Databricks

Para se conectar ao Azure Databricks usando o conector de Compartilhamento Delta, faça o seguinte:

  1. Abra o arquivo de credencial compartilhada com um editor de texto para recuperar a URL do ponto de extremidade e o token.
  2. Abra o Power BI Desktop.
  3. No menu Obter Dados, procure Compartilhamento Delta.
  4. Selecione o conector e clique em Conectar.
  5. Insira a URL do ponto de extremidade que você copiou do arquivo de credenciais para o campo URL do Servidor do Compartilhamento Delta.
  6. Opcionalmente, na guia Opções Avançadas, defina um Limite de Linhas para o número máximo de linhas que pode ser baixado. Isso é definido como um milhão de linhas por padrão.
  7. Clique em OK.
  8. Em Autenticação, copie o token que você recuperou do arquivo de credenciais para Token de Portador.
  9. Clique em Conectar.

Limitações do conector do Compartilhamento Delta do Power BI

O Conector de Compartilhamento Delta do Power BI tem as seguintes limitações:

  • Os dados carregados pelo conector precisam caber na memória do computador. Para garantir isso, o conector limita o número de linhas importadas ao Limite de Linhas definido na guia Opções Avançadas no Power BI Desktop.

Solicitar uma nova credencial

Se a URL de ativação da credencial ou a credencial baixada for perdida, corrompida ou comprometida, ou se a credencial expirar sem que o provedor envie uma nova, entre em contato com o provedor para solicitar uma nova credencial.