Formato delta in Azure Data Factory

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .

Questo articolo illustra come copiare dati da e verso un delta lake archiviato in Azure Data Lake Store Gen2 o Archiviazione BLOB di Azure usando il formato delta. Questo connettore è disponibile come set di dati inline nei flussi di dati di mapping sia come origine che come sink.

Proprietà del flusso di dati per mapping

Questo connettore è disponibile come set di dati inline nei flussi di dati di mapping sia come origine che come sink.

Proprietà di origine

Nella tabella seguente sono elencate le proprietà supportate da un'origine differenziale. È possibile modificare queste proprietà nella scheda Opzioni origine.

Nome Descrizione Richiesto Valori consentiti Proprietà script flusso di dati
Formatta Il formato deve essere delta yes delta format
File system Contenitore/file system del delta lake yes String fileSystem
Percorso della cartella Directory del delta lake yes String folderPath
Tipo di compressione Tipo di compressione della tabella delta no bzip2
gzip
deflate
ZipDeflate
snappy
lz4
compressionType
Compression level Scegliere se la compressione viene completata il più rapidamente possibile o se il file risultante deve essere compresso in modo ottimale. obbligatorio se compressedType è specificato. Optimal oppure Fastest Compressionlevel
Tempo di viaggio Scegliere se eseguire una query su uno snapshot precedente di una tabella delta no Query per timestamp: Timestamp
Query per versione: Integer
timestampAsOf
versionAsOf
Consenti nessun file trovato Se true, non viene generato un errore se non vengono trovati file no true oppure false ignoreNoFilesFound

Importa schema

Delta è disponibile solo come set di dati inline e, per impostazione predefinita, non ha uno schema associato. Per ottenere i metadati della colonna, fare clic sul pulsante Importa schema nella scheda Proiezione . In questo modo è possibile fare riferimento ai nomi delle colonne e ai tipi di dati specificati dal corpus. Per importare lo schema, una sessione di debug del flusso di dati deve essere attiva ed è necessario disporre di un file di definizione di entità CDM esistente a cui puntare.

Esempio di script di origine Delta

source(output(movieId as integer,
            title as string,
            releaseDate as date,
            rated as boolean,
            screenedOn as timestamp,
            ticketPrice as decimal(10,2)
            ),
    store: 'local',
    format: 'delta',
    versionAsOf: 0,
    allowSchemaDrift: false,
    folderPath: $tempPath + '/delta'
  ) ~> movies

Proprietà sink

Nella tabella seguente sono elencate le proprietà supportate da un sink differenziale. È possibile modificare queste proprietà nella scheda Impostazioni.

Nome Descrizione Richiesto Valori consentiti Proprietà script flusso di dati
Formatta Il formato deve essere delta yes delta format
File system Contenitore/file system del delta lake yes String fileSystem
Percorso della cartella Directory del delta lake yes String folderPath
Tipo di compressione Tipo di compressione della tabella delta no bzip2
gzip
deflate
ZipDeflate
snappy
lz4
TarGZip
tar
compressionType
Compression level Scegliere se la compressione viene completata il più rapidamente possibile o se il file risultante deve essere compresso in modo ottimale. obbligatorio se compressedType è specificato. Optimal oppure Fastest Compressionlevel
Vacuum Elimina i file precedenti alla durata specificata che non è più rilevante per la versione corrente della tabella. Quando viene specificato un valore pari o inferiore a 0, l'operazione vacuum non viene eseguita. yes Intero Vuoto
azione Tabella Indica a Azure Data Factory cosa fare con la tabella Delta di destinazione nel sink. È possibile lasciarlo così come sono e accodare nuove righe, sovrascrivere la definizione e i dati della tabella esistenti con nuovi metadati e dati oppure mantenere la struttura di tabella esistente, ma prima troncare tutte le righe, quindi inserire le nuove righe. no Nessuno, Truncate, Overwrite deltaTruncate, sovrascrittura
Metodo di aggiornamento Quando si seleziona "Consenti inserimento" da solo o quando si scrive in una nuova tabella differenziale, la destinazione riceve tutte le righe in ingresso indipendentemente dal set di criteri riga. Se i dati contengono righe di altri criteri di riga, è necessario escluderli usando una trasformazione filtro precedente.

Quando vengono selezionati tutti i metodi Update, viene eseguita un'operazione Merge, in cui le righe vengono inserite/eliminate/aggiornate in base ai criteri di riga impostati utilizzando una trasformazione Alter Row precedente.
yes true oppure false Inseribile
deletable
aggiornabile
Aggiornabile
Scrittura ottimizzata Ottenere una velocità effettiva più elevata per l'operazione di scrittura tramite l'ottimizzazione dello shuffle interno negli executor Spark. Di conseguenza, è possibile notare un minor numero di partizioni e file con dimensioni maggiori no true oppure false optimizedWrite: true
Compatta automatica Al termine di un'operazione di scrittura, Spark eseguirà automaticamente il OPTIMIZE comando per riorganizionare i dati, ottenendo così più partizioni, se necessario, per migliorare le prestazioni di lettura in futuro no true oppure false autoCompact: true

Esempio di script sink delta

Lo script del flusso di dati associato è:

moviesAltered sink(
          input(movieId as integer,
                title as string
            ),
           mapColumn(
                movieId,
                title
            ),
           insertable: true,
           updateable: true,
           deletable: true,
           upsertable: false,
           keys: ['movieId'],
            store: 'local',
           format: 'delta',
           vacuum: 180,
           folderPath: $tempPath + '/delta'
           ) ~> movieDB

Sink differenziale con eliminazione delle partizioni

Con questa opzione in Update method above (ad esempio update/upsert/delete), è possibile limitare il numero di partizioni ispezionate. Solo le partizioni che soddisfano questa condizione vengono recuperate dall'archivio di destinazione. È possibile specificare un set fisso di valori che una colonna di partizione può richiedere.

Screenshot of partition pruning options are available to limit the inspection.

Esempio di script sink delta con eliminazione della partizione

Di seguito è riportato uno script di esempio.

DerivedColumn1 sink( 
      input(movieId as integer,
            title as string
           ), 
      allowSchemaDrift: true,
      validateSchema: false,
      format: 'delta',
      container: 'deltaContainer',
      folderPath: 'deltaPath',
      mergeSchema: false,
      autoCompact: false,
      optimizedWrite: false,
      vacuum: 0,
      deletable:false,
      insertable:true,
      updateable:true,
      upsertable:false,
      keys:['movieId'],
      pruneCondition:['part_col' -> ([5, 8])],
      skipDuplicateMapInputs: true,
      skipDuplicateMapOutputs: true) ~> sink2
 

Delta leggerà solo 2 partizioni in cui part_col == 5 e 8 dall'archivio delta di destinazione anziché da tutte le partizioni. part_col è una colonna in base alla quale i dati differenziali di destinazione sono partizionati. Non è necessario che sia presente nei dati di origine.

Opzioni di ottimizzazione sink delta

Nella scheda Impostazioni sono disponibili altre tre opzioni per ottimizzare la trasformazione del sink differenziale.

  • Quando l'opzione Merge schema è abilitata, consente l'evoluzione dello schema, ovvero tutte le colonne presenti nel flusso in ingresso corrente ma non nella tabella Delta di destinazione vengono aggiunte automaticamente al relativo schema. Questa opzione è supportata in tutti i metodi di aggiornamento.

  • Quando la compattazione automatica è abilitata, dopo una singola scrittura, la trasformazione controlla se i file possono essere ulteriormente compattati ed esegue un processo OPTIMIZE rapido (con dimensioni di file da 128 MB invece di 1 GB) per compattare ulteriormente i file per le partizioni con il maggior numero di file di piccole dimensioni. La compattazione automatica consente di unire un numero elevato di file di piccole dimensioni in un numero minore di file di grandi dimensioni. La compattazione automatica viene avviata solo quando sono presenti almeno 50 file. Una volta eseguita un'operazione di compattazione, crea una nuova versione della tabella e scrive un nuovo file contenente i dati di diversi file precedenti in un formato compresso compatto.

  • Quando l'opzione Ottimizza scrittura è abilitata, la trasformazione sink ottimizza in modo dinamico le dimensioni delle partizioni in base ai dati effettivi tentando di scrivere file di 128 MB per ogni partizione di tabella. Si tratta di una dimensione approssimativa e può variare a seconda delle caratteristiche del set di dati. Le scritture ottimizzate migliorano l'efficienza complessiva delle scritture e delle letture successive. Organizza le partizioni in modo che le prestazioni delle letture successive migliorino.

Suggerimento

Il processo di scrittura ottimizzato rallenterà il processo ETL complessivo perché il sink eseguirà il comando Spark Delta Lake Optimize dopo l'elaborazione dei dati. È consigliabile usare temporaneamente la scrittura ottimizzata. Ad esempio, se si dispone di una pipeline di dati oraria, eseguire un flusso di dati con ottimizzazione scrittura giornaliera.

Limitazioni note

Quando si scrive in un sink differenziale, esiste una limitazione nota in cui il numero di righe scritte non verrà visualizzato nell'output di monitoraggio.

  • Creare una trasformazione di origine nel flusso di dati di mapping.
  • Creare una trasformazione sink nel flusso di dati di mapping.
  • Creare una trasformazione alter row per contrassegnare le righe come inserimento, aggiornamento, upsert o eliminazione.