Condividi tramite


Copiare e trasformare i dati in Snowflake usando Azure Data Factory o Azure Synapse Analytics

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!

Questo articolo illustra come usare l'attività di copia nelle pipeline di Azure Data Factory e Azure Synapse per copiare dati da e in Snowflake e usare Flusso di dati per trasformare i dati in Snowflake. Per altre informazioni, vedere l'articolo introduttivo per Data Factory o Azure Synapse Analytics.

Importante

Il nuovo connettore Snowflake offre un supporto nativo migliorato per Snowflake. Se si usa il connettore Snowflake legacy nella soluzione, aggiornare il connettore Snowflake prima di 31 ottobre 2024. Per informazioni dettagliate sulla differenza tra la versione legacy e quella più recente, vedere questa sezione.

Funzionalità supportate

Questo connettore Snowflake è supportato per le funzionalità seguenti:

Funzionalità supportate IR
Attività di copia (origine/sink) (1) (2)
Flusso di dati per mapping (origine/sink) (1)
Attività Lookup (1) (2)
Attività script (1) (2)

① Azure Integration Runtime ② Runtime di integrazione self-hosted

Per l'attività di copia, questo connettore Snowflake supporta le funzioni seguenti:

  • Copiare dati da Snowflake che usa il comando COPIA di Snowflake in [posizione] per ottenere prestazioni ottimali.
  • Copiare i dati in Snowflake che sfrutta il comando COPIA di Snowflake in [tabella] per ottenere prestazioni ottimali. Supporta Snowflake in Azure.
  • Se è necessario un proxy per connettersi a Snowflake da un runtime di integrazione self-hosted, è necessario configurare le variabili di ambiente per HTTP_PROXY e HTTPS_PROXY nell'host del runtime di integrazione.

Prerequisiti

Se l'archivio dati si trova all'interno di una rete locale, una rete virtuale di Azure o un cloud privato virtuale di Amazon, è necessario configurare un runtime di integrazione self-hosted per connettersi. Assicurarsi di aggiungere gli indirizzi IP usati dal runtime di integrazione self-hosted all'elenco consentito.

Se l'archivio dati è un servizio dati del cloud gestito, è possibile usare Azure Integration Runtime. Se l'accesso è limitato solo agli indirizzi IP approvati nelle regole del firewall, è possibile aggiungere gli IP di Azure Integration Runtime nell'elenco di quelli consentiti.

L'account Snowflake usato per Source o Sink deve avere l'accesso necessario USAGE per il database e l'accesso in lettura/scrittura allo schema e alle tabelle/viste al suo interno. Inoltre, deve avere anche CREATE STAGE sullo schema per poter creare la fase esterna con l'URI di firma di accesso condiviso.

I valori delle proprietà account seguenti devono essere impostati

Proprietà Descrizione Richiesto Valore predefinito
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION Specifica se richiedere un oggetto di integrazione dell'archiviazione come credenziali cloud durante la creazione di una fase esterna denominata (usando CREA FASE) per accedere a un percorso di archiviazione cloud privato. FALSE FALSE
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION Specifica se richiedere l'uso di una fase esterna denominata che fa riferimento a un oggetto di integrazione dell'archiviazione come credenziali cloud durante il caricamento o lo scaricamento di dati in un percorso di archiviazione cloud privato. FALSE FALSE

Per altre informazioni sui meccanismi di sicurezza di rete e sulle opzioni supportate da Data Factory, vedere strategie di accesso ai dati.

Operazioni preliminari

Per eseguire l'attività di copia con una pipeline, è possibile usare uno degli strumenti o SDK seguenti:

Creare un servizio collegato a Snowflake usando l'interfaccia utente

Usare la procedura seguente per creare un servizio collegato a Snowflake nell'interfaccia utente del portale di Azure.

  1. Passare alla scheda Gestisci nell'area di lavoro di Azure Data Factory o Synapse e selezionare Servizi collegati, quindi fare clic su Nuovo:

  2. Cercare Snowflake e selezionare il connettore Snowflake.

    Screenshot del connettore Snowflake.

  3. Configurare i dettagli del servizio, testare la connessione e creare il nuovo servizio collegato.

    Screenshot della configurazione del servizio collegato per Snowflake.

Dettagli di configurazione del connettore

Le sezioni seguenti forniscono informazioni dettagliate sulle proprietà che definiscono entità specifiche di un connettore Snowflake.

Proprietà del servizio collegato

Queste proprietà generiche sono supportate per il servizio collegato Snowflake:

Proprietà Descrizione Richiesto
type La proprietà type deve essere impostata su SnowflakeV2.
accountIdentifier Nome dell'account insieme alla relativa organizzazione. Ad esempio, myorg-account123.
database Database predefinito utilizzato per la sessione dopo la connessione.
warehouse Il warehouse virtuale predefinito usato per la sessione dopo la connessione.
authenticationType Tipo di autenticazione usato per connettersi al servizio Snowflake. I valori consentiti sono: Basic (Default) e KeyPair. Per altre proprietà ed esempi su ogni valore, vedere le sezioni corrispondenti di seguito. No
ruolo Ruolo di sicurezza predefinito usato per la sessione dopo la connessione. No
connectVia Il runtime di integrazione usato per connettersi all'archivio dati. È possibile usare Azure Integration Runtime o un runtime di integrazione self-hosted (se l'archivio dati si trova in una rete privata). Se non specificato, viene usato il runtime di integrazione di Azure predefinito. No

Il connettore Snowflake supporta i tipi di autenticazione seguenti. Per informazioni dettagliate, vedere le sezioni corrispondenti.

Autenticazione di base

Per usare l’autenticazione Basic, oltre alle proprietà generiche descritte nella sezione precedente, specificare le proprietà seguenti:

Proprietà Descrizione Richiesto
utente Nome di accesso per l'utente Snowflake.
password Password per l'utente Snowflake. Contrassegnare questo campo come tipo SecureString per archiviarlo in modo sicuro. È anche possibile fare riferimento a un segreto archiviato in Azure Key Vault.

Esempio:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Password in Azure Key Vault:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Nota

I flussi di dati di mapping supportano solo l'autenticazione di base.

Autenticazione della coppia di chiavi SSH

Per usare l'autenticazione della coppia di chiavi, è necessario configurare e creare un utente di autenticazione della coppia di chiavi in Snowflake facendo riferimento all'autenticazione della coppia di chiavi e alla rotazione della coppia di chiavi. Successivamente, prendere nota della chiave privata e della passphrase (facoltativa), che si usa per definire il servizio collegato.

Oltre alle proprietà generiche descritte nella sezione precedente, specificare le proprietà seguenti:

Proprietà Descrizione Richiesto
utente Nome di accesso per l'utente Snowflake.
privateKey Chiave privata usata per l'autenticazione della coppia di chiavi.

Per assicurarsi che la chiave privata sia valida quando viene inviata ad Azure Data Factory e considerando che il file privateKey include caratteri di nuova riga (\n), è essenziale formattare correttamente il contenuto privateKey nel formato letterale stringa. Questo processo comporta l'aggiunta esplicita di \n a ogni nuova riga.
privateKeyPassphrase Passphrase usata per decrittografare la chiave privata, se crittografata. No

Esempio:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "KeyPair",
            "user": "<username>",
            "privateKey": {
                "type": "SecureString",
                "value": "<privateKey>"
            },
            "privateKeyPassphrase": { 
                "type": "SecureString",
                "value": "<privateKeyPassphrase>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Proprietà del set di dati

Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione dei set di dati, vedere l'articolo Set di dati.

Per il set di dati Snowflake sono supportate le proprietà seguenti.

Proprietà Descrizione Richiesto
type La proprietà type del set di dati deve essere impostata su SnowflakeV2Table.
schema Nome dello schema. Si noti che il nome dello schema fa distinzione tra maiuscole e minuscole. No per l'origine, Sì per il sink
table Nome della tabella/vista. Si noti che il nome della tabella fa distinzione tra maiuscole e minuscole. No per l'origine, Sì per il sink

Esempio:

{
    "name": "SnowflakeV2Dataset",
    "properties": {
        "type": "SnowflakeV2Table",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

Proprietà dell'attività di copia

Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione delle attività, vedere l'articolo sulle pipeline. In questa sezione viene fornito un elenco delle proprietà supportate dall'origine e dal sink Snowflake.

Snowflake come fonte

Il connettore Snowflake usa il comando COPIA di Snowflake in [posizione] per ottenere prestazioni ottimali.

Se l'archivio dati sink e il formato sono supportati in modo nativo dal comando Snowflake COPY, è possibile usare l'attività Copy per copiare direttamente da Snowflake al sink. Per informazioni dettagliate, vedere Copia diretta da Snowflake. In caso contrario, usare la copia a fasi predefinita da Snowflake.

Per copiare dati da Snowflake, nella sezione Origine attività di copia sono supportate le proprietà seguenti.

Proprietà Descrizione Richiesto
type La proprietà Tipo dell'origine dell'attività di copia deve essere impostata su SnowflakeV2Source.
query Specifica la query SQL per leggere i dati da Snowflake. Se i nomi dello schema, della tabella e delle colonne contengono lettere minuscole, virgolette l'identificatore dell'oggetto nella query, ad esempio select * from "schema"."myTable".
L'esecuzione di stored procedure non è supportata.
No
exportSettings Impostazioni avanzate usate per recuperare i dati da Snowflake. È possibile configurare quelli supportati dal comando COPY nel quale il servizio passerà quando si richiama l'istruzione.
In exportSettings:
type Tipo di comando di esportazione, impostato su SnowflakeExportCopyCommand.
storageIntegration Specificare il nome dell'integrazione di archiviazione creata in Snowflake. Per i passaggi preliminari dell'uso dell'integrazione dell'archiviazione, vedere Configurazione di un'integrazione dell'archiviazione Snowflake. No
additionalCopyOptions Opzioni di copia aggiuntive, fornite come dizionario di coppie chiave-valore. Esempi: MAX_FILE_SIZE, OVERWRITE. Per altre informazioni, vedere Opzioni di copia Snowflake. No
additionalFormatOptions Opzioni di formato di file aggiuntive fornite al comando COPY come dizionario di coppie chiave-valore. Esempi: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Per altre informazioni, vedere Opzioni del tipo di formato Snowflake. No

Nota

Assicurarsi di disporre dell'autorizzazione per eseguire il comando seguente e accedere allo schema INFORMATION_SCHEMA e alla tabella COLUMNS.

  • COPY INTO <location>

Copia diretta da Snowflake

Se l'archivio dati sink e il formato soddisfano i criteri descritti in questa sezione, è possibile usare l'attività Copia per copiare direttamente da Snowflake al sink. Il servizio controlla le impostazioni e, se i criteri seguenti non vengono soddisfatti, l'esecuzione dell'attività di copia non riesce:

  • Quando si specifica storageIntegration nell'origine:

    L'archivio dati sink è l'archivio BLOB di Azure a cui si fa riferimento nella fase esterna in Snowflake. Prima di copiare i dati, è necessario completare i passaggi seguenti:

    1. Creare un servizio collegato archiviazione BLOB di Azure per l'archiviazione BLOB di Azure sink con qualsiasi tipo di autenticazione supportato.

    2. Concedere almeno il ruolo collaboratore ai dati dei BLOB di archiviazione all'entità servizio Snowflake nell'archiviazione BLOB di Azure sink controllo di accesso (IAM).

  • Quando non si specifica storageIntegration nell'origine:

    Il servizio collegato sink è Archiviazione BLOB di Azure con autenticazione della firma di accesso condiviso. Se si vuole copiare direttamente i dati in Azure Data Lake Storage Gen2 nel formato supportato seguente, è possibile creare un servizio collegato di Archiviazione BLOB di Azure con l'autenticazione SAS per l'account Azure Data Lake Storage Gen2, per evitare di usare la copia di staging da Snowflake.

  • Il formato di dati sink è Parquet, testo delimitatoo JSON con le configurazioni seguenti:

    • Per il formato Parquet , il codec di compressione è Nessuno, Snappy o Lzo.
    • Per il formato testo delimitato:
      • rowDelimiter è \r\n, o qualsiasi singolo carattere.
      • compression può essere nessuna compressione, gzip, bzip2 o deflate.
      • encodingName è impostato sul valore predefinito o su utf-8.
      • quoteChar è virgolette doppie, virgolette singole o stringa vuota (senza virgolette).
    • Per il formato JSON, la copia diretta supporta solo il caso in cui il risultato della tabella Snowflake o della query di origine abbia solo una singola colonna e il tipo di dati di questa colonna sia VARIANT, OBJECT o ARRAY.
      • compression può essere nessuna compressione, gzip, bzip2 o deflate.
      • encodingName è impostato sul valore predefinito o su utf-8.
      • filePattern nel sink dell'attività di copia viene lasciato come predefinito o impostato su setOfObjects.
  • Nell'origine dell'attività di copia additionalColumns non è specificato.

  • Il mapping delle colonne non è specificato.

Esempio:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",
                "query": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Copia di staging da Snowflake

Quando l'archivio dati sink o il formato non è compatibile in modo nativo con il comando Snowflake COPIA, come indicato nell'ultima sezione, abilitare la copia a fasi predefinita usando un'istanza di archiviazione BLOB di Azure temporanea. La funzionalità copia di staging assicura inoltre una migliore velocità effettiva. Il servizio esporta i dati da Snowflake nell'archiviazione di staging, quindi copia i dati nel sink e infine pulisce i dati temporanei dall'archiviazione di staging. Per informazioni dettagliate sulla copia dei dati tramite una gestione temporanea, vedere Copia di staging.

Per usare questa funzionalità, creare un servizio collegato di archiviazione BLOB di Azure che fa riferimento all'account di archiviazione di Azure come staging provvisorio. Specificare quindi le proprietà enableStaging e stagingSettings nell'attività Copia.

  • Quando si specifica storageIntegration nell'origine, l'archiviazione BLOB di Azure temporanea di staging deve essere quella a cui si fa riferimento nella fase esterna in Snowflake. Assicurarsi di creare un servizio collegato archiviazione BLOB di Azure con qualsiasi autenticazione supportata e concedere almeno un ruolo collaboratore ai dati dei BLOB di archiviazione all'entità servizio Snowflake nell'entità servizio di gestione temporanea di Archiviazione BLOB di Azure controllo di accesso (IAM).

  • Quando non si specifica storageIntegration nell'origine, il servizio collegato Archiviazione BLOB di Azure di staging deve usare l'autenticazione della firma di accesso condiviso, come richiesto dal comando Snowflake COPY. Assicurarsi di concedere l'autorizzazione di accesso appropriata a Snowflake nell'archiviazione BLOB di Azure di staging. Per altre informazioni, vedere questo articolo.

Esempio:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",               
                "query": "SELECT * FROM MyTable",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"                    
                }
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Quando si esegue una copia di staging da Snowflake, è fondamentale impostare il comportamento di copia sink su Unisci file. Questa impostazione garantisce che tutti i file partizionati vengano gestiti e uniti correttamente, impedendo che si verifichi un problema dovuto alla sola copia dell'ultimo file partizionato.

Configurazione di esempio

{
    "type": "Copy",
    "source": {
        "type": "SnowflakeSource",
        "query": "SELECT * FROM my_table"
    },
    "sink": {
        "type": "AzureBlobStorage",
        "copyBehavior": "MergeFiles"
    }
}

Nota

Se non si imposta il comportamento di copia sink su Unisci file, è possibile che venga copiato solo l'ultimo file partizionato.

Snowflake come sink

Il connettore Snowflake usa il comando COPIA in [tabella] di Snowflake per ottenere prestazioni ottimali. Supporta la scrittura di dati in Snowflake in Azure.

Se l'archivio dati e il formato di origine sono supportati in modo nativo dal comando COPIA di Snowflake, è possibile usare l'attività Copia per copiare direttamente dall'origine a Snowflake. Per informazioni dettagliate, vedere Copia diretta in Snowflake. In caso contrario, usare la copia a fasi predefinita in Snowflake.

Per copiare i dati in Snowflake, nella sezione sink dell’attività di copia sono supportate le proprietà seguenti.

Proprietà Descrizione Richiesto
type Proprietà type del sink dell'attività di copia, impostata su SnowflakeV2Sink.
preCopyScript Specificare una query SQL per l'attività di copia da eseguire prima di scrivere i dati in Snowflake in ogni esecuzione. Usare questa proprietà per pulire i dati precaricati. No
importSettings Impostazioni avanzate usate per scrivere dati in Snowflake. È possibile configurare quelli supportati dal comando COPY nel quale il servizio passerà quando si richiama l'istruzione.
In importSettings:
type Tipo di comando di importazione, impostato su SnowflakeImportCopyCommand.
storageIntegration Specificare il nome dell'integrazione di archiviazione creata in Snowflake. Per i passaggi preliminari dell'uso dell'integrazione dell'archiviazione, vedere Configurazione di un'integrazione dell'archiviazione Snowflake. No
additionalCopyOptions Opzioni di copia aggiuntive, fornite come dizionario di coppie chiave-valore. Esempi: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Per altre informazioni, vedere Opzioni di copia Snowflake. No
additionalFormatOptions Opzioni di formato di file aggiuntive fornite al comando COPIA, fornite come dizionario di coppie chiave-valore. Esempi: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Per altre informazioni, vedere Opzioni del tipo di formato Snowflake. No

Nota

Assicurarsi di disporre dell'autorizzazione per eseguire il comando seguente e accedere allo schema INFORMATION_SCHEMA e alla tabella COLUMNS.

  • SELECT CURRENT_REGION()
  • COPY INTO <table>
  • SHOW REGIONS
  • CREATE OR REPLACE STAGE
  • DROP STAGE

Copia diretta in Snowflake

Se l'archivio dati di origine e il formato soddisfano i criteri descritti in questa sezione, è possibile usare l'attività Copia per copiare direttamente dall'origine a Snowflake. Il servizio controlla le impostazioni e, se i criteri seguenti non vengono soddisfatti, l'esecuzione dell'attività di copia non riesce:

  • Quando si specifica storageIntegration nel sink:

    L'archivio dati di origine è l'archivio BLOB di Azure a cui si fa riferimento nella fase esterna in Snowflake. Prima di copiare i dati, è necessario completare i passaggi seguenti:

    1. Creare un servizio collegato archiviazione BLOB di Azure per l'archiviazione BLOB di Azure di origine con qualsiasi tipo di autenticazione supportato.

    2. Concedere almeno il ruolo lettore di dati BLOB di archiviazione all'entità servizio Snowflake nell'archivio BLOB di Azure di origine controllo di accesso (IAM).

  • Quando non si specifica storageIntegration nel sink:

    Il servizio collegato di origine è Archiviazione BLOB di Azure con autenticazione della firma di accesso condiviso. Se si vuole copiare direttamente i dati da Azure Data Lake Storage Gen2 nel formato supportato seguente, è possibile creare un servizio collegato di Archiviazione BLOB di Azure con l'autenticazione di firma di accesso condiviso per l'account Azure Data Lake Storage Gen2, per evitare di usare la copia di staging in Snowflake.

  • Il formato dei dati di origine è Parquet, ORC o Testo delimitato, con le configurazioni seguenti:

    • Per il formato Parquet, il codec di compressione è Nessuno o Snappy.

    • Per il formato testo delimitato:

      • rowDelimiter è \r\n, o qualsiasi singolo carattere. Se il delimitatore di riga non è "\r\n", firstRowAsHeader deve essere falsee skipLineCount non è specificato.
      • compression può essere nessuna compressione, gzip, bzip2 o deflate.
      • encodingName viene lasciato come predefinito o impostato su "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2 " ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8855", "ISO-885 9-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".
      • quoteChar è virgolette doppie, virgolette singole o stringa vuota (senza virgolette).
    • Per il formato JSON, la copia diretta supporta solo il caso in cui la tabella Snowflake sink ha solo una colonna e il tipo di dati di questa colonna è VARIANT, OBJECT o ARRAY.

      • compression può essere nessuna compressione, gzip, bzip2 o deflate.
      • encodingName è impostato sul valore predefinito o su utf-8.
      • Il mapping delle colonne non è specificato.
  • Nell'origine dell'attività Copia:

    • additionalColumns non è specificato.
    • Se l'origine è una cartella, recursive è impostata su true.
    • prefix, modifiedDateTimeStart, modifiedDateTimeEnd e enablePartitionDiscovery non sono specificati.

Esempio:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE"
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            }
        }
    }
]

Copia di staging in Snowflake

Quando l'archivio dati o il formato di origine non sono compatibili in modo nativo con il comando Snowflake COPY, come indicato nell'ultima sezione, abilitare la copia a fasi predefinita usando un'istanza di archiviazione Blob di Azure temporanea. La funzionalità copia di staging assicura inoltre una migliore velocità effettiva. Il servizio converte automaticamente i dati in modo da soddisfare i requisiti di formato dei dati di Snowflake. Richiama quindi il comando COPIA per caricare i dati in Snowflake. Infine, pulisce i dati temporanei dall'archiviazione BLOB. Per informazioni dettagliate sulla copia dei dati tramite una gestione temporanea, vedere Copia di staging.

Per usare questa funzionalità, creare un servizio collegato di archiviazione BLOB di Azure che fa riferimento all'account di archiviazione di Azure come staging provvisorio. Specificare quindi le proprietà enableStaging e stagingSettings nell'attività Copia.

  • Quando si specifica storageIntegration nel sink, l'archiviazione BLOB di Azure temporanea di staging deve essere quella a cui si fa riferimento nella fase esterna in Snowflake. Assicurarsi di creare un servizio collegato archiviazione BLOB di Azure con qualsiasi autenticazione supportata e concedere almeno un ruolo lettore ai dati dei BLOB di archiviazione all'entità servizio Snowflake nell'entità servizio di gestione temporanea di Archiviazione BLOB di Azure controllo di accesso (IAM).

  • Quando non si specifica storageIntegration nel sink, il servizio collegato Archiviazione BLOB di Azure di staging deve usare l'autenticazione della firma di accesso condiviso come richiesto dal comando Snowflake COPIA.

Esempio:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Proprietà del flusso di dati per mapping

Quando si trasformano i dati nel flusso di dati di mapping, è possibile leggere e scrivere nelle tabelle in Snowflake. Per altre informazioni, vedere la trasformazione origine e la trasformazione sink nei flussi di dati per mapping. È possibile scegliere di usare un set di dati Snowflake o un set di dati inline come tipo di origine e sink.

Trasformazione origine

Nella tabella seguente sono elencate le proprietà supportate dall'origine di Snowflake. È possibile modificare queste proprietà nella scheda Opzioni origine. Il connettore usa trasferimento dati interno di Snowflake.

Nome Descrizione Richiesto Valori consentiti Proprietà script del flusso di dati
Tabella Se si seleziona Tabella come input, il flusso di dati recupera tutti i dati dalla tabella specificata nel set di dati Snowflake o nelle opzioni di origine quando si usa il set di dati inline. No String (solo per set di dati inline)
tableName
schemaName
Query Se si seleziona Query come input, immettere una query per recuperare i dati da Snowflake. Questa impostazione esegue l'override di qualsiasi tabella scelta nel set di dati.
Se i nomi dello schema, della tabella e delle colonne contengono lettere minuscole, virgolette l'identificatore dell'oggetto nella query, ad esempio select * from "schema"."myTable".
No String query
Abilitare l'estrazione incrementale (anteprima) Usare questa opzione per indicare ad ADF di elaborare solo le righe modificate dall'ultima esecuzione della pipeline. No Booleano enableCdc
Colonna incrementale Quando si usa la funzionalità di estrazione incrementale, è necessario scegliere la colonna data/ora/numerica da usare come limite nella tabella di origine. No String waterMarkColumn
Abilitare Rilevamento modifiche di Snowflake (anteprima) Questa opzione consente a Azure Data Factory di sfruttare la tecnologia Snowflake Change data capture per elaborare solo i dati differenziali dall'esecuzione precedente della pipeline. Questa opzione carica automaticamente i dati differenziali con operazioni di inserimento, aggiornamento ed eliminazione di righe senza richiedere alcuna colonna incrementale. No Booleano enableNativeCdc
Modifiche nette Quando si usa il rilevamento delle modifiche Snowflake, è possibile usare questa opzione per ottenere le righe modificate deduplicate o le modifiche complete. Le righe modificate deduplicate mostreranno solo le versioni più recenti delle righe modificate da un determinato punto nel tempo, mentre le modifiche complete mostreranno tutte le versioni di ogni riga modificata, incluse quelle eliminate o aggiornate. Ad esempio, se si aggiorna una riga, verrà visualizzata una versione di eliminazione e una versione di inserimento in modifiche complete, ma solo la versione di inserimento nelle righe modificate deduplicate. A seconda del caso d'uso, è possibile scegliere l'opzione più adatta alle proprie esigenze. L'opzione predefinita è false, il che significa modifiche complete. No Booleano netChanges
Includere colonne di sistema Quando si usa il rilevamento delle modifiche Snowflake, è possibile usare l'opzione systemColumns per controllare se le colonne del flusso di metadati fornite da Snowflake sono incluse o escluse nell'output del rilevamento delle modifiche. Per impostazione predefinita, systemColumns è impostato su true, ovvero le colonne del flusso di metadati sono incluse. È possibile impostare systemColumns su false se si desidera escluderli. No Booleano systemColumns
Iniziare a leggere dall'inizio L'impostazione di questa opzione con l'estrazione incrementale e il rilevamento delle modifiche indicherà ad Azure Data Factory di leggere tutte le righe alla prima esecuzione di una pipeline con estrazione incrementale attivata. No Booleano skipInitialLoad

Esempi di script di origine Snowflake

Quando si usa il set di dati Snowflake come tipo di origine, lo script del flusso di dati associato è:

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

Se si usa un set di dati inline, lo script del flusso di dati associato è:

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

Rilevamento modifiche native

Azure Data Factory supporta ora una funzionalità nativa in Snowflake nota come rilevamento delle modifiche, che comporta il rilevamento delle modifiche sotto forma di log. Questa funzionalità di Snowflake consente di tenere traccia delle modifiche apportate ai dati nel tempo, rendendola utile per il caricamento e il controllo incrementali dei dati. Per utilizzare questa funzionalità, quando si abilita Change Data Capture e si seleziona Rilevamento modifiche di Snowflake, viene creato un oggetto Stream per la tabella di origine che abilita il rilevamento delle modifiche nella tabella snowflake di origine. Successivamente, viene usata la clausola MODIFICHE nella query per recuperare solo i dati nuovi o aggiornati dalla tabella di origine. È anche consigliabile pianificare la pipeline in modo che le modifiche vengano utilizzate entro l'intervallo di tempo di conservazione dei dati impostato per la tabella di origine snowflake. In caso contrario, l'utente potrebbe vedere un comportamento incoerente nelle modifiche acquisite.

Trasformazione sink

Nella tabella seguente sono elencate le proprietà supportate dal sink di Snowflake. È possibile modificare queste proprietà nella scheda Impostazioni. Quando si usa il set di dati inline, verranno visualizzate impostazioni aggiuntive, che corrispondono alle proprietà descritte nella sezione proprietà del set di dati. Il connettore usa il trasferimento interno dei dati di Snowflake.

Nome Descrizione Richiesto Valori consentiti Proprietà script del flusso di dati
Metodo di aggiornamento Specificare le operazioni consentite nella destinazione di Snowflake.
Per operazioni di aggiornamento, upsert o eliminazione di righe, è necessaria una trasformazione Altera riga perché i tag siano applicati alle righe per queste azioni.
true oppure false deletable
insertable
updateable
upsertable
Colonne chiave Per le operazioni di aggiornamento, upsert ed eliminazione è necessario impostare una o più colonne chiave per determinare quale riga modificare. No Matrice keys
azione Tabella determina se ricreare o rimuovere tutte le righe dalla tabella di destinazione prima della scrittura.
- Nessuno: non verrà eseguita alcuna azione sulla tabella.
- Ricrea: la tabella verrà eliminata e ricreata. Questa opzione è obbligatoria se si crea una nuova tabella in modo dinamico.
- Tronca: verranno rimosse tutte le righe della tabella di destinazione.
No true oppure false recreate
truncate

Esempi di script sink Snowflake

Quando si usa il set di dati Snowflake come tipo di sink, lo script del flusso di dati associato è:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Se si usa un set di dati inline, lo script del flusso di dati associato è:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Ottimizzazione pushdown query

Impostando il livello di registrazione della pipeline su Nessuno, si esclude la trasmissione delle metriche di trasformazione intermedia, impedendo potenziali ostacoli alle ottimizzazioni spark e abilitando l'ottimizzazione pushdown delle query fornita da Snowflake. Questa ottimizzazione pushdown consente miglioramenti sostanziali delle prestazioni per tabelle Snowflake di grandi dimensioni con set di dati estesi.

Nota

Non sono supportate tabelle temporanee in Snowflake, perché sono locali per la sessione o per l'utente che li crea, rendendole inaccessibili ad altre sessioni e soggette a sovrascrivere come tabelle regolari da Snowflake. Anche se Snowflake offre tabelle temporanee come alternativa, accessibili a livello globale, richiedono l'eliminazione manuale, contraddicendo l'obiettivo principale dell'uso delle tabelle temp, evitando qualsiasi operazione di eliminazione nello schema di origine.

Proprietà dell'attività Lookup

Per altre informazioni sulle proprietà, vedere Attività di ricerca.

Aggiornare il servizio collegato Snowflake

Per aggiornare il servizio collegato Snowflake, è possibile eseguire un aggiornamento side-by-side o un aggiornamento sul posto.

Aggiornamento affiancato

Per eseguire un aggiornamento side-by-side, completare la procedura seguente:

  1. Creare un nuovo servizio collegato Snowflake e configurarlo facendo riferimento alle proprietà del servizio collegato.
  2. Creare un set di dati basato sul servizio collegato Snowflake appena creato.
  3. Sostituire il nuovo servizio collegato e il set di dati con quelli esistenti nelle pipeline che hanno come destinazione gli oggetti legacy.

Aggiornamento sul posto

Per eseguire un aggiornamento sul posto, è necessario modificare il payload del servizio collegato esistente.

  1. Aggiornare il tipo da "Snowflake" a "SnowflakeV2".

  2. Modificare il payload del servizio collegato dal formato legacy al nuovo modello. È possibile compilare ogni campo dall'interfaccia utente dopo aver modificato il tipo indicato in precedenza oppure aggiornare il payload direttamente tramite l'editor JSON. Per le proprietà di connessione supportate, vedere la sezione Proprietà del servizio collegato in questo articolo. Gli esempi seguenti illustrano le differenze nel payload per i connettori Snowflake legacy e nuovi:

    Payload JSON del connettore Snowflake legacy:

    { 
    

    "name": "Snowflake1", "type": "Microsoft.DataFactory/factory/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC>", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }


**New Snowflake connector JSON payload:**
```json
{ 
       "name": "Snowflake2", 
       "type": "Microsoft.DataFactory/factories/linkedservices", 
    "properties": { 
           "parameters": { 
             "schema": { 
                   "type": "string", 
                   "defaultValue": "PUBLIC" 
               } 
           }, 
        "annotations": [], 
           "type": "SnowflakeV2", 
           "typeProperties": { 
               "authenticationType": "Basic", 
               "accountIdentifier": "<FAKE_Account", 
               "user": "FAKE_USER", 
               "database": "FAKE_DB", 
            "warehouse": "FAKE_DW", 
               "encryptedCredential": "<placeholder>" 
           }, 
           "connectVia": { 
               "referenceName": "AutoResolveIntegrationRuntime", 
               "type": "IntegrationRuntimeReference" 
        } 
       } 
} 
  1. Aggiornare il set di dati per usare il nuovo servizio collegato. È possibile creare un nuovo set di dati in base al servizio collegato appena creato oppure aggiornare la proprietà type di un set di dati esistente da SnowflakeTable a SnowflakeV2Table.

Differenze tra Snowflake e Snowflake (legacy)

Il connettore Snowflake offre nuove funzionalità ed è compatibile con la maggior parte delle funzionalità del connettore Snowflake (legacy). La tabella seguente illustra le differenze di funzionalità tra Snowflake e Snowflake (legacy).

Snowflake Snowflake (legacy)
Supportare l'autenticazione della coppia di chiavi e di base. Supportare l'autenticazione di base.
I parametri script non sono attualmente supportati nell'attività Script. In alternativa, usare espressioni dinamiche per i parametri di script. Per altre informazioni, vedere Espressioni e funzioni in Azure Data Factory e Azure Synapse Analytics. Supportare i parametri di script nell'attività Script.
Supportare BigDecimal nell'attività Lookup. Il tipo NUMERO, come definito in Snowflake, verrà visualizzato come stringa nell'attività Lookup. BigDecimal non è supportato nell'attività Lookup.
La proprietà legacy connectionstring è deprecata a favore dei parametri obbligatori Account, Warehouse, Database, Schema, e Role Nel connettore Snowflake legacy, la proprietà connectionstring è stata usata per stabilire una connessione.

Per determinare la versione del connettore Snowflake usato nel servizio collegato Snowflake esistente, controllare la proprietà type. La versione legacy è identificata da "type": "Snowflake", mentre la versione V2 più recente è identificata da "type": "SnowflakeV2".

La versione V2 offre differenti miglioramenti rispetto alla versione legacy, tra cui:

Scalabilità automatica: regola automaticamente le risorse in base al carico del traffico.

Operazione con più zone di disponibilità: offre resilienza operando tra più zone di disponibilità.

Supporto IP statico: migliora la sicurezza consentendo l'uso di indirizzi IP statici.

Per un elenco degli archivi dati supportati come origini e sink dall'attività Copy, vedere Archivi dati e formati supportati.