Share via


Legge i dati condivisi con la condivisione differenziale aperta (per i destinatari)

Questo articolo descrive come leggere i dati condivisi con l'utente usando il protocollo di condivisione open sharing delta. Nella condivisione aperta si usa un file di credenziali condiviso con un membro del team dal provider di dati per ottenere l'accesso in lettura sicuro ai dati condivisi. L'accesso persiste finché la credenziale è valida e il provider continua a condividere i dati. I provider gestiscono la scadenza e la rotazione delle credenziali. Aggiornamenti ai dati sono disponibili quasi in tempo reale. È possibile leggere e creare copie dei dati condivisi, ma non è possibile modificare i dati di origine.

Nota

Se i dati sono stati condivisi con l'utente che usa la condivisione databricks-to-Databricks Delta, non è necessario un file di credenziali per accedere ai dati e questo articolo non si applica all'utente. Per istruzioni, vedere Leggere i dati condivisi con Databricks-to-Databricks Delta Sharing (per i destinatari).

Le sezioni seguenti descrivono come usare Azure Databricks, Apache Spark, pandas e Power BI per accedere e leggere i dati condivisi usando il file di credenziali. Per un elenco completo dei connettori di condivisione Delta e informazioni su come usarli, vedere la documentazione open source di Condivisione delta. Se si verificano problemi di accesso ai dati condivisi, contattare il provider di dati.

Nota

Le integrazioni dei partner sono, se non diversamente indicate, fornite da terze parti e l'utente deve disporre di un account con il provider appropriato per l'uso dei propri prodotti e servizi. Anche se il contenuto verrà tenuto aggiornato il più possibile per Databricks, Microsoft non si assume responsabilità relativamente alle integrazioni dei partner o all'accuratezza del contenuto delle loro pagine. Contattare i provider appropriati per informazioni sulle integrazioni.

Operazioni preliminari

Un membro del team deve scaricare il file delle credenziali condiviso dal provider di dati. Vedere Ottenere l'accesso nel modello di condivisione aperta.

Devono usare un canale sicuro per condividere il file o il percorso del file con l'utente.

Azure Databricks: Leggere i dati condivisi usando connettori di condivisione aperti

Questa sezione descrive come usare un connettore open sharing per accedere ai dati condivisi usando un notebook nell'area di lavoro di Azure Databricks. L'utente o un altro membro del team archivia il file delle credenziali in DBFS, quindi lo si usa per eseguire l'autenticazione all'account Azure Databricks del provider di dati e leggere i dati condivisi dal provider di dati.

Nota

Se il provider di dati usa la condivisione da Databricks a Databricks e non ha condiviso un file di credenziali con l'utente, è necessario accedere ai dati usando Unity Catalog. Per istruzioni, vedere Leggere i dati condivisi con Databricks-to-Databricks Delta Sharing (per i destinatari).

In questo esempio si crea un notebook con più celle che è possibile eseguire in modo indipendente. È invece possibile aggiungere i comandi del notebook alla stessa cella ed eseguirli in sequenza.

Passaggio 1: Archiviare il file delle credenziali in DBFS (istruzioni python)

In questo passaggio si usa un notebook Python in Azure Databricks per archiviare il file delle credenziali in modo che gli utenti del team possano accedere ai dati condivisi.

Passare al passaggio successivo se l'utente o un utente del team ha già archiviato il file delle credenziali in DBFS.

  1. In un editor di testo aprire il file delle credenziali.

  2. Nell'area di lavoro di Azure Databricks fare clic su Nuovo > notebook.

    • Immetti un nome.
    • Impostare la lingua predefinita per il notebook su Python.
    • Selezionare un cluster da collegare al notebook.
    • Fai clic su Crea.

    Il notebook viene aperto nell'editor di notebook.

  3. Per usare Python o pandas per accedere ai dati condivisi, installare il connettore Python di condivisione differenziale. Nell'editor del notebook incollare il comando seguente:

    %sh pip install delta-sharing
    
  4. Eseguire la cella.

    Se delta-sharing non è già installata, la libreria Python viene installata nel cluster.

  5. In una nuova cella incollare il comando seguente, che carica il contenuto del file delle credenziali in una cartella in DBFS. Sostituire le variabili come indicato di seguito:

    • <dbfs-path>: percorso della cartella in cui si vuole salvare il file delle credenziali

    • <credential-file-contents>: contenuto del file di credenziali. Non si tratta di un percorso del file, ma del contenuto copiato del file.

      Il file di credenziali contiene JSON che definisce tre campi: shareCredentialsVersion, endpointe bearerToken.

      %scala
      dbutils.fs.put("<dbfs-path>/config.share","""
      <credential-file-contents>
      """)
      
  6. Eseguire la cella.

    Dopo aver caricato il file delle credenziali, è possibile eliminare questa cella. Tutti gli utenti dell'area di lavoro possono leggere il file delle credenziali da DBFS e il file delle credenziali è disponibile in DBFS in tutti i cluster e nei warehouse SQL dell'area di lavoro. Per eliminare la cella, fare clic su x nel menu Azioni cella azioni cella all'estrema destra.

Passaggio 2: Usare un notebook per elencare e leggere le tabelle condivise

In questo passaggio vengono elencate le tabelle nella condivisione o il set di tabelle e partizioni condivise ed è possibile eseguire query su una tabella.

  1. Usando Python, elencare le tabelle nella condivisione.

    In una nuova cella incollare il comando seguente. Sostituire <dbfs-path> con il percorso creato nel passaggio 1: Archiviare il file delle credenziali in DBFS (istruzioni Python).

    Quando viene eseguito il codice, Python legge il file delle credenziali da DBFS nel cluster. Accedere ai dati archiviati in DBFS nel percorso /dbfs/.

    import delta_sharing
    
    client = delta_sharing.SharingClient(f"/dbfs/<dbfs-path>/config.share")
    
    client.list_all_tables()
    
  2. Eseguire la cella.

    Il risultato è una matrice di tabelle, insieme ai metadati per ogni tabella. L'output seguente mostra due tabelle:

    Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]
    

    Se l'output è vuoto o non contiene le tabelle previste, contattare il provider di dati.

  3. Eseguire una query su una tabella condivisa.

    • Uso di Scala:

      In una nuova cella incollare il comando seguente. Quando viene eseguito il codice, il file delle credenziali viene letto da DBFS tramite JVM.

      Sostituire le variabili come indicato di seguito:

      • <profile-path>: percorso DBFS del file di credenziali. Ad esempio: /<dbfs-path>/config.share.
      • <share-name>: valore di share= per la tabella.
      • <schema-name>: valore di schema= per la tabella.
      • <table-name>: valore di name= per la tabella.
      %scala
          spark.read.format("deltaSharing")
          .load("<profile-path>#<share-name>.<schema-name>.<table-name>").limit(10);
      

      Eseguire la cella. Ogni volta che si carica la tabella condivisa, i dati aggiornati dall'origine sono visualizzati.

    • Uso di SQL:

      Per eseguire query sui dati usando SQL, creare una tabella locale nell'area di lavoro dalla tabella condivisa, quindi eseguire una query sulla tabella locale. I dati condivisi non vengono archiviati o memorizzati nella cache nella tabella locale. Ogni volta che si esegue una query sulla tabella locale, viene visualizzato lo stato corrente dei dati condivisi.

      In una nuova cella incollare il comando seguente.

      Sostituire le variabili come indicato di seguito:

      • <local-table-name>: nome della tabella locale.
      • <profile-path>: percorso del file delle credenziali.
      • <share-name>: valore di share= per la tabella.
      • <schema-name>: valore di schema= per la tabella.
      • <table-name>: valore di name= per la tabella.
      %sql
      DROP TABLE IF EXISTS table_name;
      
      CREATE TABLE <local-table-name> USING deltaSharing LOCATION "<profile-path>#<share-name>.<schema-name>.<table-name>";
      
      SELECT * FROM <local-table-name> LIMIT 10;
      

      Quando si esegue il comando, i dati condivisi vengono sottoposti direttamente a query. Come test, viene eseguita una query sulla tabella e vengono restituiti i primi 10 risultati.

    Se l'output è vuoto o non contiene i dati previsti, contattare il provider di dati.

Apache Spark: leggere i dati condivisi

Seguire questa procedura per accedere ai dati condivisi usando Spark 3.x o versione successiva.

Queste istruzioni presuppongono che l'utente abbia accesso al file di credenziali condiviso dal provider di dati. Vedere Ottenere l'accesso nel modello di condivisione aperta.

Installare i connettori Python e Spark per la condivisione Delta

Per accedere ai metadati correlati ai dati condivisi, ad esempio l'elenco di tabelle condivise con l'utente, eseguire le operazioni seguenti. Questo esempio usa Python.

  1. Installare il connettore Python di condivisione differenziale:

    pip install delta-sharing
    
  2. Installare il connettore Apache Spark.

Elencare le tabelle condivise con Spark

Elencare le tabelle nella condivisione. Nell'esempio seguente sostituire <profile-path> con il percorso del file di credenziali.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

Il risultato è una matrice di tabelle, insieme ai metadati per ogni tabella. L'output seguente mostra due tabelle:

Out[10]: [Table(name='example_table', share='example_share_0', schema='default'), Table(name='other_example_table', share='example_share_0', schema='default')]

Se l'output è vuoto o non contiene le tabelle previste, contattare il provider di dati.

Accedere ai dati condivisi con Spark

Eseguire le operazioni seguenti, sostituendo queste variabili:

  • <profile-path>: percorso del file delle credenziali.
  • <share-name>: valore di share= per la tabella.
  • <schema-name>: valore di schema= per la tabella.
  • <table-name>: valore di name= per la tabella.
  • <version-as-of>:Opzionale. Versione della tabella per caricare i dati. Funziona solo se il provider di dati condivide la cronologia della tabella. Richiede delta-sharing-spark 0.5.0 o versione successiva.
  • <timestamp-as-of>:Opzionale. Caricare i dati nella versione precedente o in corrispondenza del timestamp specificato. Funziona solo se il provider di dati condivide la cronologia della tabella. Richiede delta-sharing-spark 0.6.0 o versione successiva.

Python

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", version=<version-as-of>)

spark.read.format("deltaSharing")\
.option("versionAsOf", <version-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

delta_sharing.load_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>", timestamp=<timestamp-as-of>)

spark.read.format("deltaSharing")\
.option("timestampAsOf", <timestamp-as-of>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")\
.limit(10))

Scala

Eseguire le operazioni seguenti, sostituendo queste variabili:

  • <profile-path>: percorso del file delle credenziali.
  • <share-name>: valore di share= per la tabella.
  • <schema-name>: valore di schema= per la tabella.
  • <table-name>: valore di name= per la tabella.
  • <version-as-of>:Opzionale. Versione della tabella per caricare i dati. Funziona solo se il provider di dati condivide la cronologia della tabella. Richiede delta-sharing-spark 0.5.0 o versione successiva.
  • <timestamp-as-of>:Opzionale. Caricare i dati nella versione precedente o in corrispondenza del timestamp specificato. Funziona solo se il provider di dati condivide la cronologia della tabella. Richiede delta-sharing-spark 0.6.0 o versione successiva.
spark.read.format("deltaSharing")
.option("versionAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

spark.read.format("deltaSharing")
.option("timestampAsOf", <version-as-of>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
.limit(10)

Accedere al feed di dati delle modifiche condiviso con Spark

Se la cronologia delle tabelle è stata condivisa con l'utente e il feed di dati delle modifiche (CDF) è abilitato nella tabella di origine, è possibile accedere al feed di dati delle modifiche eseguendo quanto segue, sostituendo queste variabili. Richiede delta-sharing-spark 0.5.0 o versione successiva.

È necessario specificare uno e un solo parametro iniziale.

  • <profile-path>: percorso del file delle credenziali.
  • <share-name>: valore di share= per la tabella.
  • <schema-name>: valore di schema= per la tabella.
  • <table-name>: valore di name= per la tabella.
  • <starting-version>:Opzionale. Versione iniziale della query, inclusiva. Specificare come long.
  • <ending-version>:Opzionale. Versione finale della query, inclusiva. Se la versione finale non viene specificata, l'API usa la versione più recente della tabella.
  • <starting-timestamp>:Opzionale. Il timestamp iniziale della query, che viene convertito in una versione creata successivamente o uguale a questo timestamp. Specificare come stringa nel formato yyyy-mm-dd hh:mm:ss[.fffffffff].
  • <ending-timestamp>:Opzionale. Timestamp finale della query, convertito in una versione creata in precedenza o uguale a questo timestamp. Specificare come stringa nel formato yyyy-mm-dd hh:mm:ss[.fffffffff]

Python

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<ending-version>)

delta_sharing.load_table_changes_as_spark(f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("statingVersion", <starting-version>)\
.option("endingVersion", <ending-version>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")\
.option("startingTimestamp", <starting-timestamp>)\
.option("endingTimestamp", <ending-timestamp>)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Scala

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("statingVersion", <starting-version>)
.option("endingVersion", <ending-version>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

spark.read.format("deltaSharing").option("readChangeFeed", "true")
.option("startingTimestamp", <starting-timestamp>)
.option("endingTimestamp", <ending-timestamp>)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Se l'output è vuoto o non contiene i dati previsti, contattare il provider di dati.

Accedere a una tabella condivisa usando Spark Structured Streaming

Se la cronologia delle tabelle viene condivisa con l'utente, è possibile trasmettere in streaming i dati condivisi. Richiede delta-sharing-spark 0.6.0 o versione successiva.

Opzioni supportate:

  • ignoreDeletes: ignora le transazioni che eliminano i dati.
  • ignoreChanges: consente di rielaborare gli aggiornamenti se i file sono stati riscritti nella tabella di origine a causa di un'operazione di modifica dei dati, ad UPDATEesempio , MERGE INTODELETE (all'interno di partizioni) o OVERWRITE. Le righe non modificate possono comunque essere generate. Pertanto, i consumer downstream devono essere in grado di gestire i duplicati. Le eliminazioni non vengono propagate downstream. ignoreChanges include ignoreDeletes. Pertanto, se si usa ignoreChanges, il flusso non verrà interrotto dalle eliminazioni o dagli aggiornamenti alla tabella di origine.
  • startingVersion: versione della tabella condivisa da cui iniziare. Tutte le modifiche alla tabella a partire da questa versione (incluse) verranno lette dall'origine di streaming.
  • startingTimestamp: timestamp da cui iniziare. Tutte le modifiche apportate alla tabella di cui è stato eseguito il commit o dopo il timestamp (inclusivo) verranno lette dall'origine di streaming. Esempio: "2023-01-01 00:00:00.0".
  • maxFilesPerTrigger: numero di nuovi file da considerare in ogni micro batch.
  • maxBytesPerTrigger: quantità di dati elaborati in ogni micro batch. Questa opzione imposta un valore "soft max", ovvero un batch elabora approssimativamente questa quantità di dati e può elaborare più del limite per far avanzare la query di streaming nei casi in cui l'unità di input più piccola è maggiore di questo limite.
  • readChangeFeed: flusso legge il feed di dati delle modifiche della tabella condivisa.

Opzioni non supportate:

  • Trigger.availableNow

Query structured streaming di esempio

Scala
spark.readStream.format("deltaSharing")
.option("startingVersion", 0)
.option("ignoreChanges", true)
.option("maxFilesPerTrigger", 10)
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")
Python
spark.readStream.format("deltaSharing")\
.option("startingVersion", 0)\
.option("ignoreDeletes", true)\
.option("maxBytesPerTrigger", 10000)\
.load("<profile-path>#<share-name>.<schema-name>.<table-name>")

Vedere anche Streaming in Azure Databricks.

Leggere le tabelle con vettori di eliminazione o mapping di colonne abilitati

Importante

Questa funzionalità è disponibile in anteprima pubblica.

I vettori di eliminazione sono una funzionalità di ottimizzazione dell'archiviazione che il provider può abilitare nelle tabelle Delta condivise. Vedere Che cosa sono i vettori di eliminazione?.

Azure Databricks supporta anche il mapping delle colonne per le tabelle Delta. Vedere Rinominare ed eliminare colonne con mapping di colonne Delta Lake.

Se il provider ha condiviso una tabella con vettori di eliminazione o mapping di colonne abilitati, è possibile leggere la tabella usando il calcolo in esecuzione delta-sharing-spark 3.1 o versione successiva. Se si usano cluster Databricks, è possibile eseguire operazioni di lettura batch usando un cluster che esegue Databricks Runtime 14.1 o versione successiva. CDF e query di streaming richiedono Databricks Runtime 14.2 o versione successiva.

È possibile eseguire query batch così com'è, perché possono essere risolte responseFormat automaticamente in base alle funzionalità di tabella della tabella condivisa.

Per leggere un feed di dati delle modifiche (CDF) o per eseguire query di streaming su tabelle condivise con vettori di eliminazione o mapping di colonne abilitati, è necessario impostare l'opzione responseFormat=deltaaggiuntiva .

Gli esempi seguenti illustrano le query batch, CDF e streaming:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
        .builder()
        .appName("...")
        .master("...")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()

val tablePath = "<profile-file-path>#<share-name>.<schema-name>.<table-name>"

// Batch query
spark.read.format("deltaSharing").load(tablePath)

// CDF query
spark.read.format("deltaSharing")
  .option("readChangeFeed", "true")
  .option("responseFormat", "delta")
  .option("startingVersion", 1)
  .load(tablePath)

// Streaming query
spark.readStream.format("deltaSharing").option("responseFormat", "delta").load(tablePath)

Pandas: legge i dati condivisi

Seguire questa procedura per accedere ai dati condivisi in pandas 0.25.3 o versione successiva.

Queste istruzioni presuppongono che l'utente abbia accesso al file di credenziali condiviso dal provider di dati. Vedere Ottenere l'accesso nel modello di condivisione aperta.

Installare il connettore Python di condivisione Delta

Per accedere ai metadati correlati ai dati condivisi, ad esempio l'elenco di tabelle condivise con l'utente, è necessario installare il connettore Python di condivisione differenziale.

pip install delta-sharing

Elencare le tabelle condivise con pandas

Per elencare le tabelle nella condivisione, eseguire le operazioni seguenti, sostituendo <profile-path>/config.share con il percorso del file delle credenziali.

import delta_sharing

client = delta_sharing.SharingClient(f"<profile-path>/config.share")

client.list_all_tables()

Se l'output è vuoto o non contiene le tabelle previste, contattare il provider di dati.

Accedere ai dati condivisi con pandas

Per accedere ai dati condivisi in Pandas usando Python, eseguire le operazioni seguenti, sostituendo le variabili come indicato di seguito:

  • <profile-path>: percorso del file delle credenziali.
  • <share-name>: valore di share= per la tabella.
  • <schema-name>: valore di schema= per la tabella.
  • <table-name>: valore di name= per la tabella.
import delta_sharing
delta_sharing.load_as_pandas(f"<profile-path>#<share-name>.<schema-name>.<table-name>")

Accedere a un feed di dati delle modifiche condiviso usando pandas

Per accedere al feed di dati delle modifiche per una tabella condivisa in pandas usando Python, eseguire le operazioni seguenti, sostituendo le variabili come indicato di seguito. Un feed di dati delle modifiche potrebbe non essere disponibile, a seconda che il provider di dati abbia condiviso o meno il feed di dati delle modifiche per la tabella.

  • <starting-version>:Opzionale. Versione iniziale della query, inclusiva.
  • <ending-version>:Opzionale. Versione finale della query, inclusiva.
  • <starting-timestamp>:Opzionale. Timestamp iniziale della query. Questa operazione viene convertita in una versione creata successivamente o uguale a questo timestamp.
  • <ending-timestamp>:Opzionale. Timestamp finale della query. Questa operazione viene convertita in una versione creata in precedenza o uguale a questo timestamp.
import delta_sharing
delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_version=<starting-version>,
  ending_version=<starting-version>)

delta_sharing.load_table_changes_as_pandas(
  f"<profile-path>#<share-name>.<schema-name>.<table-name>",
  starting_timestamp=<starting-timestamp>,
  ending_timestamp=<ending-timestamp>)

Se l'output è vuoto o non contiene i dati previsti, contattare il provider di dati.

Power BI: Leggere i dati condivisi

Il connettore di condivisione Delta di Power BI consente di individuare, analizzare e visualizzare i set di dati condivisi con l'utente tramite il protocollo aperto di condivisione Delta.

Requisiti

  • Power BI Desktop 2.99.621.0 o versione successiva.
  • Accesso al file di credenziali condiviso dal provider di dati. Vedere Ottenere l'accesso nel modello di condivisione aperta.

Connessione a Databricks

Per connettersi ad Azure Databricks usando il connettore di condivisione delta, eseguire le operazioni seguenti:

  1. Aprire il file delle credenziali condivise con un editor di testo per recuperare l'URL dell'endpoint e il token.
  2. Apri Power BI Desktop.
  3. Nel menu Recupera dati cercare Condivisione delta.
  4. Selezionare il connettore e fare clic su Connessione.
  5. Immettere l'URL dell'endpoint copiato dal file delle credenziali nel campo URL server di condivisione delta.
  6. Facoltativamente, nella scheda Opzioni avanzate impostare un limite di righe per il numero massimo di righe che è possibile scaricare. Questa opzione è impostata su 1 milione di righe per impostazione predefinita.
  7. Fare clic su OK.
  8. Per Autenticazione copiare il token recuperato dal file delle credenziali in Bearer Token.
  9. Fare clic su Connetti.

Limitazioni del connettore di condivisione Delta di Power BI

Il Connessione or di condivisione Delta di Power BI presenta le limitazioni seguenti:

  • I dati caricati dal connettore devono rientrare nella memoria del computer. A tale scopo, il connettore limita il numero di righe importate al limite di righe impostato nella scheda Opzioni avanzate in Power BI Desktop.

Richiedere una nuova credenziale

Se l'URL di attivazione delle credenziali o le credenziali scaricate vengono perse, danneggiate o compromesse oppure le credenziali scadono senza che il provider invii una nuova credenziale, contattare il provider per richiedere una nuova credenziale.