Freigeben über


Daten in Snowflake mithilfe von Data Factory oder Azure Synapse Analytics kopieren und transformieren

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

In diesem Artikel wird beschrieben, wie Sie Daten mithilfe der Copy-Aktivität in Azure Data Factory- und Azure Synapse-Pipelines aus und in Snowflake kopieren sowie Daten mithilfe des Tools „Datenfluss“ in Snowflake transformieren. Weitere Informationen finden Sie im Einführungsartikel zu Data Factory oder Azure Synapse Analytics.

Wichtig

Der neue Snowflake-Connector bietet verbesserte native Snowflake-Unterstützung. Wenn Sie den Legacy-Snowflake-Connector in Ihrer Lösung verwenden, sollten Sie möglichst bald ein Upgrade Ihres Snowflake-Connectors durchführen. Ausführliche Informationen zum Unterschied zwischen der Legacy- und der neuesten Version finden Sie in diesem Abschnitt .

Unterstützte Funktionen

Für den Snowflake-Connector werden die folgenden Funktionen unterstützt:

Unterstützte Funktionen IR
Kopieraktivität (Quelle/Senke) ① ②
Zuordnungsdatenfluss (Quelle/Senke)
Lookup-Aktivität ① ②
Skriptaktivität ① ②

① Azure Integration Runtime ② Selbstgehostete Integration Runtime

Für die Kopieraktivität unterstützt dieser Snowflake-Connector die folgenden Funktionen:

  • Kopieren von Daten aus Snowflake unter Verwendung des Snowflake-Befehls COPY into [Speicherort], um optimale Leistung zu erzielen.
  • Kopieren von Daten in Snowflake unter Verwendung des Snowflake-Befehls COPY into [Tabelle], um optimale Leistung zu erzielen. Snowflake in Azure wird unterstützt.
  • Wenn für einen Proxy eine Verbindung mit Snowflake über eine selbstgehostete Integration Runtime erforderlich ist, müssen Sie die Umgebungsvariablen HTTP_PROXY und HTTPS_PROXY auf dem Integration Runtime-Host konfigurieren.

Voraussetzungen

Wenn sich Ihr Datenspeicher in einem lokalen Netzwerk, in einem virtuellen Azure-Netzwerk oder in einer virtuellen privaten Amazon-Cloud befindet, müssen Sie eine selbstgehostete Integration Runtime konfigurieren, um eine Verbindung herzustellen. Die von der selbstgehosteten Integration Runtime verwendeten IP-Adressen müssen der Positivliste hinzugefügt werden.

Handelt es sich bei Ihrem Datenspeicher um einen verwalteten Clouddatendienst, können Sie die Azure Integration Runtime verwenden. Ist der Zugriff auf IP-Adressen beschränkt, die in den Firewallregeln genehmigt sind, können Sie Azure Integration Runtime-IP-Adressen zur Positivliste hinzufügen.

Das für „Quelle“ oder „Senke“ verwendete Snowflake-Konto benötigt USAGE-Zugriff auf die Datenbank und Lese-/Schreibzugriff auf das Schema und die zugehörigen Tabellen/Sichten. Außerdem muss es die Berechtigung CREATE STAGE für das Schema haben, um die Phase „Extern“ mit SAS-URI erstellen zu können.

Die folgenden Werte für Kontoeigenschaften müssen festgelegt werden

Eigenschaft Beschreibung Erforderlich Standard
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION Gibt an, ob bei der Erstellung einer benannten externen Phase (mit CREATE STAGE) ein Speicherintegrationsobjekt als Anmeldeinformationen für den Zugriff auf einen privaten Cloudspeicherort benötigt wird. FALSE FALSE
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION Gibt an, ob eine benannte externe Phase, die auf ein Speicherintegrationsobjekt verweist, als Anmeldeinformationen für die Cloud verwendet werden soll, wenn Daten aus einem privaten Cloudspeicherort geladen oder in diesen entladen werden. FALSE FALSE

Weitere Informationen zu den von Data Factory unterstützten Netzwerksicherheitsmechanismen und -optionen finden Sie unter Datenzugriffsstrategien.

Erste Schritte

Sie können eines der folgenden Tools oder SDKs verwenden, um die Kopieraktivität mit einer Pipeline zu verwenden:

Erstellen eines verknüpften Diensts mit Snowflake über die Benutzeroberfläche

Verwenden Sie die folgenden Schritte, um einen verknüpften Dienst mit Snowflake auf der Azure-Portal Benutzeroberfläche zu erstellen.

  1. Navigieren Sie in Ihrem Azure Data Factory- oder Synapse-Arbeitsbereich zu der Registerkarte „Verwalten“, wählen Sie „Verknüpfte Dienste“ aus und klicken Sie dann auf „Neu“:

  2. Suchen Sie nach Snowflake, und wählen Sie dann den Snowflake-Connector aus.

    Ein Screenshot, der den Snowflake-Connector zeigt.

  3. Konfigurieren Sie die Dienstdetails, testen Sie die Verbindung, und erstellen Sie den neuen verknüpften Dienst.

    Ein Screenshot, der die Konfiguration des verknüpften Diensts für Snowflake zeigt.

Details zur Connector-Konfiguration

Die folgenden Abschnitte enthalten Details zu Eigenschaften, die zum Definieren von Entitäten speziell für einen Snowflake-Connector verwendet werden.

Eigenschaften des verknüpften Diensts

Diese generischen Eigenschaften werden für den mit Snowflake verknüpften Dienst unterstützt:

Eigenschaft Beschreibung Erforderlich
Typ Die „type“-Eigenschaft muss auf SnowflakeV2 festgelegt sein. Ja
accountIdentifier Der Name des Kontos zusammen mit seiner Organisation. Beispiel: meineOrg-Konto123. Ja
database Die Standarddatenbank, die nach dem Herstellen einer Verbindung für die Sitzung verwendet wird. Ja
warehouse Das standardmäßige virtuelle Warehouse, das nach dem Herstellen einer Verbindung für die Sitzung verwendet wird. Ja
authenticationType Typ der Authentifizierung für die Verbindung mit dem Snowflake-Dienst. Zulässige Werte sind: Basic (Standard) und KeyPair. Weitere Informationen zu anderen Eigenschaften und Beispiele finden Sie weiter unten in den jeweiligen Abschnitten. No
role Die Standardsicherheitsrolle, die nach dem Herstellen einer Verbindung für die Sitzung verwendet wird. No
connectVia Die Integration Runtime, die zum Herstellen einer Verbindung mit dem Datenspeicher verwendet wird. Sie können die Azure Integration Runtime oder eine selbstgehostete Integration Runtime verwenden (wenn sich der Datenspeicher in einem privaten Netzwerk befindet). Wenn kein Wert angegeben ist, wird die standardmäßige Azure Integration Runtime verwendet. No

Dieser Snowflake-Connector unterstützt die folgenden Authentifizierungstypen. Weitere Informationen finden Sie in den entsprechenden Abschnitten.

Standardauthentifizierung

Um die Standardauthentifizierung zu verwenden, geben Sie zusätzlich zu den im vorherigen Abschnitt beschriebenen allgemeinen Eigenschaften die folgenden Eigenschaften an:

Eigenschaft Beschreibung Erforderlich
user Anmeldename für Snowflake-Benutzer. Ja
Kennwort Das Kennwort für Snowflake-Benutzer. Markieren Sie dieses Feld als Typ SecureString, um es sicher zu speichern. Sie können auch auf ein Geheimnis verweisen, das in Azure Key Vault gespeichert ist. Ja

Beispiel:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Kennwort in Azure Key Vault:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Hinweis

Der Zuordnungsdatenfluss unterstützt nur die Standardauthentifizierung.

Authentifizierung per Schlüsselpaar

Um die Authentifizierung per Schlüsselpaar zu verwenden, müssen Sie einen Schlüsselpaar-Authentifizierungsbenutzer in Snowflake konfigurieren und erstellen. Informationen dazu finden Sie unter Schlüsselpaarauthentifizierung und Schlüsselpaarrotation. Notieren Sie sich anschließend den privaten Schlüssel und die Passphrase (optional), die Sie zum Definieren des verknüpften Diensts verwenden.

Geben Sie zusätzlich zu den im vorherigen Abschnitt beschriebenen generischen Eigenschaften die folgenden Eigenschaften an:

Eigenschaft Beschreibung Erforderlich
user Anmeldename für Snowflake-Benutzer. Ja
privateKey Der private Schlüssel, der für die Schlüsselpaarauthentifizierung verwendet wird.

Um sicherzustellen, dass der private Schlüssel gültig ist, wenn er an Azure Data Factory gesendet wird, und in Anbetracht der Tatsache, dass die privateKey-Datei Neue-Zeile-Zeichen (\n) enthält, ist es wichtig, den privateKey-Inhalt in seiner Zeichenfolgenliteralform korrekt zu formatieren. Dieser Vorgang umfasst das explizite Hinzufügen von \n zu jeder neuen Zeile.
Ja
privateKeyPassphrase Die Passphrase, die zum Entschlüsseln des privaten Schlüssels verwendet wird, wenn er verschlüsselt ist. No

Beispiel:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "KeyPair",
            "user": "<username>",
            "privateKey": {
                "type": "SecureString",
                "value": "<privateKey>"
            },
            "privateKeyPassphrase": { 
                "type": "SecureString",
                "value": "<privateKeyPassphrase>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Dataset-Eigenschaften

Eine vollständige Liste mit den Abschnitten und Eigenschaften, die zum Definieren von Datasets zur Verfügung stehen, finden Sie im Artikel zu Datasets.

Folgende Eigenschaften werden für das Snowflake-Dataset unterstützt.

Eigenschaft Beschreibung Erforderlich
Typ Die „type“-Eigenschaft des Datasets muss auf SnowflakeV2Table festgelegt werden. Ja
schema Name des Schemas. Beachten Sie, das beim Schemanamen die Groß- und Kleinschreibung beachtet wird. Quelle: Nein, Senke: Ja
table Name der Tabelle/Ansicht. Beachten Sie, dann beim Tabellennamen wird die Groß- und Kleinschreibung beachtet wird. Quelle: Nein, Senke: Ja

Beispiel:

{
    "name": "SnowflakeV2Dataset",
    "properties": {
        "type": "SnowflakeV2Table",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

Eigenschaften der Kopieraktivität

Eine vollständige Liste mit den Abschnitten und Eigenschaften zum Definieren von Aktivitäten finden Sie im Artikel Pipelines. Dieser Abschnitt enthält eine Liste der Eigenschaften, die von der Snowflake-Quelle und -Senke unterstützt werden.

Snowflake als Quelle

Der Snowflake-Connector verwendet den Snowflake-Befehl COPY into [Speicherort], um optimale Leistung zu erzielen.

Wenn der Senkendatenspeicher und das Format vom Snowflake-Befehl „COPY“ nativ unterstützt werden, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus Snowflake in die Senke durchführen. Details finden Sie unter Direktes Kopieren aus Snowflake. Verwenden Sie andernfalls das integrierte gestaffelte Kopieren aus Snowflake.

Beim Kopieren von Daten aus Snowflake werden die folgenden Eigenschaften im Abschnitt source der Kopieraktivität unterstützt.

Eigenschaft Beschreibung Erforderlich
Typ Die „type“-Eigenschaft der Quelle der Copy-Aktivität muss auf SnowflakeV2Source festgelegt werden. Ja
Abfrage Gibt die SQL-Abfrage an, mit der Daten aus Snowflake gelesen werden. Wenn der Name des Schemas, der Tabelle und Spalten Kleinbuchstaben enthält, geben Sie den Objektbezeichner in der Abfrage an, z. B. select * from "schema"."myTable".
Die Ausführung der gespeicherten Prozedur wird nicht unterstützt.
No
exportSettings Erweiterte Einstellungen, die zum Abrufen von Daten aus Snowflake verwendet werden. Sie können die vom Befehl „COPY into“ unterstützten Einstellungen konfigurieren, die der Dienst beim Aufrufen der Anweisung durchläuft. Ja
Unter exportSettings:
type Der Typ des Exportbefehls, festgelegt auf SnowflakeExportCopyCommand. Ja
storageIntegration Gibt den Namen der Speicherintegration an, die Sie in Ihrer Snowflake-Instanz erstellt haben. Die Schritte, die Sie vor der Verwendung der Speicherintegration ausführen müssen, finden Sie unter Konfigurieren einer Snowflake-Speicherintegration. No
additionalCopyOptions Zusätzliche Kopieroptionen, die als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: MAX_FILE_SIZE, OVERWRITE. Weitere Informationen finden Sie unter Snowflake Copy Options (Snowflake-Kopieroptionen). Nein
additionalFormatOptions Zusätzliche Dateiformatoptionen, die im Befehl „COPY“ als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Weitere Informationen finden Sie unter Snowflake Format Type Options (Snowflake-Formattypoptionen). Nein

Hinweis

Stellen Sie sicher, dass Sie die Berechtigung haben, den folgenden Befehl auszuführen sowie auf das Schema INFORMATION_SCHEMA und die Tabelle COLUMNS zugreifen zu können.

  • COPY INTO <location>

Direktes Kopieren aus Snowflake

Wenn der Senkendatenspeicher und das Format die in diesem Abschnitt beschriebenen Kriterien erfüllen, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus Snowflake in die Senke durchführen. Der Dienst überprüft die Einstellungen und gibt bei der Copy-Aktivitätsausführung einen Fehler aus, wenn die folgenden Kriterien nicht erfüllt werden:

  • Wenn Sie storageIntegration in der Quelle angeben:

    Der Senkendatenspeicher ist die Azure Blob Storage-Instanz, auf die Sie in der externen Stage in Snowflake verwiesen haben. Sie müssen die folgenden Schritte ausführen, bevor Sie Daten kopieren:

    1. Erstellen Sie einen mit Azure Blob Storage verknüpften Dienst für die Senkeninstanz von Azure Blob Storage, der über alle unterstützten Authentifizierungstypen verfügt.

    2. Gewähren Sie dem Snowflake-Dienstprinzipal in der Zugriffssteuerung (IAM) der Senkeninstanz von Azure Blob Storage mindestens die Rolle Mitwirkender an Storage-Blobdaten.

  • Wenn Sie storageIntegration nicht in der Quelle angeben:

    Der verknüpfte Senkendienst ist Azure Blob Storage mit SAS-Authentifizierung. Wenn Sie Daten im folgenden unterstützten Format direkt zu Azure Data Lake Storage Gen2 kopieren möchten, können Sie eine verknüpfte Azure Blob Storage-Instanz mit SAS-Authentifizierung für Ihr Azure Data Lake Storage Gen2-Konto erstellen. Damit vermeiden Sie gestagte Kopiervorgänge aus Snowflake.

  • Das Senkendatenformat lautet Parquet, Durch Trennzeichen getrennter Text oder JSON mit den folgenden Konfigurationen:

    • Beim Format Parquet wird einer der Komprimierungscodecs None, Snappy oder Lzo verwendet.
    • Beim Format Durch Trennzeichen getrennter Text:
      • rowDelimiter ist \r\n oder ein beliebiges einzelnes Zeichen.
      • compression kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.
      • encodingName wird als Standardwert übernommen oder ist auf utf-8 festgelegt.
      • quoteChar ist als doppeltes Anführungszeichen, einfaches Anführungszeichen oder als leere Zeichenfolge (kein Anführungszeichen) festgelegt.
    • Für das JSON-Format unterstützt direktes Kopieren nur den Fall, dass die Snowflake-Quelltabelle oder das Abfrageergebnis nur eine einzelne Spalte hat und der Datentyp dieser Spalte VARIANT, OBJECT oder ARRAY ist.
      • compression kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.
      • encodingName wird als Standardwert übernommen oder ist auf utf-8 festgelegt.
      • filePattern in der Senke der Kopieraktivität wird auf dem Standardwert belassen oder auf setOfObjects festgelegt.
  • In der Quelle der Copy-Aktivität ist additionalColumns nicht angegeben.

  • Die Spaltenzuordnung ist nicht angegeben.

Beispiel:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",
                "query": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Gestaffeltes Kopieren aus Snowflake

Wenn der Senkendatenspeicher oder das Format nicht nativ mit dem Snowflake-Befehl „COPY“ kompatibel ist (wie im letzten Abschnitt beschrieben), aktivieren Sie mithilfe einer Azure Blob Storage-Zwischeninstanz das integrierte gestaffelte Kopieren. Das Feature für gestaffeltes Kopieren bietet Ihnen auch einen höheren Durchsatz. Der Dienst exportiert Daten aus Snowflake in den Stagingspeicher, kopiert dann die Daten in die Senke und bereinigt schließlich die temporären Daten aus dem Stagingspeicher. Ausführliche Informationen zum Kopieren von Daten mithilfe von Staging finden Sie unter Gestaffeltes Kopieren.

Um diese Funktion verwenden zu können, erstellen Sie einen mit Azure Blob Storage verknüpften Dienst, der auf das Azure Storage-Konto als Stagingzwischenspeicher verweist. Geben Sie dann die Eigenschaften enableStaging und stagingSettings in der Kopieraktivität an.

  • Wenn Sie storageIntegration in der Quelle angeben, sollte das Zwischenstaginginstanz von Azure Blob Storage die sein, auf die Sie in der externen Stage in Snowflake verwiesen haben. Achten Sie darauf, einen mit Azure Blob Storage verknüpften Dienst dafür zu erstellen, der über alle unterstützten Authentifizierungen verfügt, und gewähren Sie dem Snowflake-Dienstprinzipal in der Zugriffssteuerung (IAM) der Azure Blob Storage-Staginginstanz mindestens die Rolle Mitwirkender an Storage-Blobdaten.

  • Wenn Sie storageIntegration nicht in der Quelle angeben, muss der mit Azure Blob Storage verknüpfte Stagingdienst die SAS-Authentifizierung verwenden, die für den Snowflake-Befehl „COPY“ erforderlich ist. Erteilen Sie Snowflake die erforderlichen Zugriffsberechtigungen in Azure Blob Storage-Staginginstanz. Weitere Informationen hierzu finden Sie in diesem Artikel.

Beispiel:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",               
                "query": "SELECT * FROM MyTable",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"                    
                }
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Beim Ausführen einer mehrstufigen Kopie von Snowflake ist es wichtig, das Verhalten der Senkenkopie auf Dateien zusammenführen festzulegen. Diese Einstellung stellt sicher, dass alle partitionierten Dateien ordnungsgemäß verarbeitet und zusammengeführt werden und verhindern, dass nur die letzte partitionierte Datei kopiert wird.

Beispielkonfiguration

{
    "type": "Copy",
    "source": {
        "type": "SnowflakeSource",
        "query": "SELECT * FROM my_table"
    },
    "sink": {
        "type": "AzureBlobStorage",
        "copyBehavior": "MergeFiles"
    }
}

Hinweis

Wird das Verhalten der Senkenkopie nicht auf Dateien zusammenführen festgelegt, kann dies dazu führen, dass nur die letzte partitionierte Datei kopiert wird.

Snowflake als Senke

Der Snowflake-Connector verwendet den Snowflake-Befehl COPY into [Tabelle], um optimale Leistung zu erzielen. Er unterstützt das Schreiben von Daten in Snowflake in Azure.

Wenn der Quelldatenspeicher und das Format vom Snowflake-Befehl „COPY“ nativ unterstützt werden, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus der Quelle in Snowflake durchführen. Details finden Sie unter Direktes Kopieren in Snowflake. Verwenden Sie andernfalls das integrierte gestaffelte Kopieren in Snowflake.

Beim Kopieren von Daten in Snowflake werden die folgenden Eigenschaften im Abschnitt sink der Kopieraktivität unterstützt.

Eigenschaft Beschreibung Erforderlich
Typ Die „type“-Eigenschaft der Senke der Copy-Aktivität, festgelegt auf SnowflakeV2Sink. Ja
preCopyScript Geben Sie eine SQL-Abfrage an, die bei jeder Ausführung von der Kopieraktivität ausgeführt werden soll, bevor Daten in Snowflake geschrieben werden. Sie können diese Eigenschaft nutzen, um vorab geladene Daten zu bereinigen. Nein
importSettings Erweiterte Einstellungen, die zum Schreiben von Daten in Snowflake verwendet werden. Sie können die vom Befehl „COPY into“ unterstützten Einstellungen konfigurieren, die der Dienst beim Aufrufen der Anweisung durchläuft. Ja
Unter importSettings:
type Der Typ des Importbefehls, festgelegt auf SnowflakeImportCopyCommand. Ja
storageIntegration Gibt den Namen der Speicherintegration an, die Sie in Ihrer Snowflake-Instanz erstellt haben. Die Schritte, die Sie vor der Verwendung der Speicherintegration ausführen müssen, finden Sie unter Konfigurieren einer Snowflake-Speicherintegration. No
additionalCopyOptions Zusätzliche Kopieroptionen, die als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Weitere Informationen finden Sie unter Snowflake Copy Options (Snowflake-Kopieroptionen). Nein
additionalFormatOptions Zusätzliche Dateiformatoptionen, die im Befehl „COPY“ als Wörterbuch mit Schlüssel-Wert-Paaren bereitgestellt werden. Beispiele: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Weitere Informationen finden Sie unter Snowflake Format Type Options (Snowflake-Formattypoptionen). Nein

Hinweis

Stellen Sie sicher, dass Sie die Berechtigung haben, den folgenden Befehl auszuführen sowie auf das Schema INFORMATION_SCHEMA und die Tabelle COLUMNS zugreifen zu können.

  • SELECT CURRENT_REGION()
  • COPY INTO <table>
  • SHOW REGIONS
  • CREATE OR REPLACE STAGE
  • DROP STAGE

Direktes Kopieren in Snowflake

Wenn der Quelldatenspeicher und das Format die in diesem Abschnitt beschriebenen Kriterien erfüllen, können Sie mit der Kopieraktivität Kopiervorgänge direkt aus der Quelle in Snowflake durchführen. Der Dienst überprüft die Einstellungen und gibt bei der Copy-Aktivitätsausführung einen Fehler aus, wenn die folgenden Kriterien nicht erfüllt werden:

  • Wenn Sie storageIntegration in der Senke angeben:

    Der Quelldatenspeicher ist die Azure Blob Storage-Instanz, auf die Sie in der externen Stage in Snowflake verwiesen haben. Sie müssen die folgenden Schritte ausführen, bevor Sie Daten kopieren:

    1. Erstellen Sie einen mit Azure Blob Storage verknüpften Dienst für die Azure Blob Storage-Quellinstanz, der über alle unterstützten Authentifizierungstypen verfügt.

    2. Gewähren Sie dem Snowflake-Dienstprinzipal mindestens die Rolle Storage-Blobdatenleser in der Zugriffssteuerung (IAM) der Azure Blob Storage-Quellinstanz.

  • Wenn Sie storageIntegration nicht in der Senke angeben:

    Der verknüpfte Quelldienst ist Azure Blob Storage mit SAS-Authentifizierung. Wenn Sie Daten im folgenden unterstützten Format direkt aus Azure Data Lake Storage Gen2 kopieren möchten, können Sie eine verknüpfte Azure Blob Storage-Instanz mit SAS-Authentifizierung für Ihr Azure Data Lake Storage Gen2-Konto erstellen. Damit vermeiden Sie gestagte Kopiervorgänge zu Snowflake.

  • Das Quelldatenformat lautet Parquet, Durch Trennzeichen getrennter Text oder JSON mit den folgenden Konfigurationen:

    • Beim Format Parquet wird einer der Komprimierungscodecs None oder Snappy verwendet.

    • Beim Format Durch Trennzeichen getrennter Text:

      • rowDelimiter ist \r\n oder ein beliebiges einzelnes Zeichen. Wenn das Zeilentrennzeichen nicht „\r\n“ ist, muss firstRowAsHeader auf false festgelegt sein. skipLineCount ist nicht angegeben.
      • compression kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.
      • encodingName wird als Standardwert übernommen oder ist auf „UTF-8“, „UTF-16“, „UTF-16BE“, „UTF-32“, „UTF-32BE“, „BIG5“, „EUC-JP“, „EUC-KR“, „GB18030“, „ISO-2022-JP“, „ISO-2022-KR“, „ISO-8859-1“, „ISO-8859-2“, „ISO-8859-5“, „ISO-8859-6“, „ISO-8859-7“, „ISO-8859-8“, „ISO-8859-9“, „WINDOWS-1250“, „WINDOWS-1251“, „WINDOWS-1252“, „WINDOWS-1253“, „WINDOWS-1254“ oder „WINDOWS-1255“ festgelegt.
      • quoteChar ist als doppeltes Anführungszeichen, einfaches Anführungszeichen oder als leere Zeichenfolge (kein Anführungszeichen) festgelegt.
    • Für das JSON-Format unterstützt direktes Kopieren nur den Fall, dass die Snowflake-Senkentabelle nur eine einzelne Spalte hat und der Datentyp dieser Spalte VARIANT, OBJECT oder ARRAY ist.

      • compression kann auf keine Komprimierung oder auf gzip, bzip2 oder deflate festgelegt sein.
      • encodingName wird als Standardwert übernommen oder ist auf utf-8 festgelegt.
      • Die Spaltenzuordnung ist nicht angegeben.
  • In der Quelle der Kopieraktivität:

    • additionalColumns ist nicht angegeben.
    • Wenn es sich bei der Quelle um einen Ordner handelt, wird recursive auf TRUE festgelegt.
    • prefix, modifiedDateTimeStart, modifiedDateTimeEnd und enablePartitionDiscovery wurden nicht angegeben.

Beispiel:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE"
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            }
        }
    }
]

Gestaffeltes Kopieren in Snowflake

Wenn der Quelldatenspeicher oder das Format nicht nativ mit dem Snowflake-Befehl „COPY“ kompatibel ist (wie im letzten Abschnitt beschrieben), aktivieren Sie mithilfe einer Azure Blob Storage-Zwischeninstanz das integrierte gestaffelte Kopieren. Das Feature für gestaffeltes Kopieren bietet Ihnen auch einen höheren Durchsatz. Der Dienst konvertiert die Daten automatisch, damit das Datenformat den Anforderungen von Snowflake entspricht. Dann wird der Befehl „COPY“ aufgerufen, um die Daten in Snowflake zu laden. Abschließend werden Sie die temporären Daten in Blob Storage bereinigt. Ausführliche Informationen zum Kopieren von Daten mithilfe von Staging finden Sie unter Gestaffeltes Kopieren.

Um diese Funktion verwenden zu können, erstellen Sie einen mit Azure Blob Storage verknüpften Dienst, der auf das Azure Storage-Konto als Stagingzwischenspeicher verweist. Geben Sie dann die Eigenschaften enableStaging und stagingSettings in der Kopieraktivität an.

  • Wenn Sie storageIntegration in der Senke angeben, sollte das Zwischenstaginginstanz von Azure Blob Storage die sein, auf die Sie in der externen Stage in Snowflake verwiesen haben. Achten Sie darauf, einen mit Azure Blob Storage verknüpften Dienst dafür zu erstellen, der über alle unterstützten Authentifizierungen verfügt, und gewähren Sie dem Snowflake-Dienstprinzipal in der Zugriffssteuerung (IAM) der Azure Blob Storage-Staginginstanz mindestens die Rolle Storage-Blobdatenleser.

  • Wenn Sie storageIntegration nicht in der Senke angeben, muss der mit Azure Blob Storage verknüpfte Stagingdienst die SAS-Authentifizierung verwenden, die für den Snowflake-Befehl „COPY“ erforderlich ist.

Beispiel:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Eigenschaften von Mapping Data Flow

Beim Transformieren von Daten im Zuordnungsdatenfluss können Sie in Snowflake Tabellen lesen und in diese schreiben. Weitere Informationen finden Sie unter Quellentransformation und Senkentransformation in Zuordnungsdatenflüssen. Sie können ein Snowflake-Dataset oder Inlinedataset als Quell- und Senkentyp verwenden.

Quellentransformation

In der folgenden Tabelle sind die von einer Snowflake-Quelle unterstützten Eigenschaften aufgeführt. Sie können diese Eigenschaften auf der Registerkarte Quelloptionen bearbeiten. Der Connector verwendet die interne Datenübertragung von Snowflake.

Name BESCHREIBUNG Erforderlich Zulässige Werte Datenflussskript-Eigenschaft
Tabelle Wenn Sie „Tabelle“ als Eingabe auswählen, ruft der Datenfluss alle Daten aus der Tabelle ab, die im Snowflake-Dataset oder bei Verwendung des Inlinedatasets in den Quelloptionen angegeben ist. Nein String (nur für Inlinedataset)
tableName
schemaName
Abfrage Wenn Sie „Abfrage“ als Eingabe auswählen, geben Sie eine Abfrage zum Abrufen von Daten aus Snowflake ein. Diese Einstellung überschreibt jede Tabelle, die Sie im Dataset ausgewählt haben.
Wenn der Name des Schemas, der Tabelle und Spalten Kleinbuchstaben enthält, geben Sie den Objektbezeichner in der Abfrage an, z. B. select * from "schema"."myTable".
Nein String Abfrage
Inkrementelle Extrahierung aktivieren (Vorschau) Verwenden Sie diese Option, um ADF mitzuteilen, dass nur Zeilen verarbeitet werden sollen, die seit der letzten Ausführung der Pipeline geändert wurden. Nein Boolean enableCdc
Inkrementelle Spalte Wenn Sie das Feature für das inkrementelle Extrahieren verwenden, müssen Sie die Datums-/Uhrzeit-/numerische Spalte auswählen, die Sie als Grenzwert in der Quelltabelle verwenden möchten. Nein String waterMarkColumn
Aktivieren der Snowflake-Änderungsnachverfolgung (Vorschau) Mit dieser Option kann ADF die Snowflake-Change Data Capture-Technologie nutzen, um nur die Deltadaten seit der vorherigen Pipelineausführung zu verarbeiten. Mit dieser Option werden die Deltadaten automatisch mit Vorgängen für Zeileneinfügung, Zeilenaktualisierung und Zeilenlöschung geladen, ohne dass eine inkrementelle Spalte erforderlich ist. Nein Boolean enableNativeCdc
Netto-Änderungen Wenn Sie die Snowflake-Änderungsnachverfolgung verwenden, können Sie diese Option verwenden, um deduplizierte geänderte Zeilen oder umfassende Änderungen abzurufen. Deduplizierte geänderte Zeilen zeigen nur die neuesten Versionen der Zeilen an, die seit einem bestimmten Zeitpunkt geändert wurden, während umfassende Änderungen alle Versionen jeder geänderten Zeile anzeigen, einschließlich der Zeilen, die gelöscht oder aktualisiert wurden. Wenn Sie z. B. eine Zeile aktualisieren, wird eine Löschversion und eine Einfügeversion in den umfassenden Änderungen angezeigt, aber nur die Einfügeversion in deduplizierten geänderten Zeilen. Je nach Anwendungsfall können Sie die Option auswählen, die Ihren Bedürfnissen entspricht. Die Standardoption ist FALSCH, was umfassende Änderungen bedeutet. Nein Boolean netChanges
Systemspalten einschließen Bei Verwendung der Snowflake-Änderungsnachverfolgung können Sie mithilfe der Option „systemColumns“ steuern, ob die von Snowflake bereitgestellten Metadatenstromspalten in die Ausgabe der Änderungsnachverfolgung einbezogen oder ausgeschlossen werden. Standardmäßig ist „systemColumns“ auf WAHR festgelegt, was bedeutet, dass die Metadatenstromspalten eingeschlossen sind. Sie können „systemColumns“ auf FALSCH festlegen, wenn Sie diese ausschließen möchten. Nein Boolean systemColumns
Lesen von Anfang an beginnen Wenn Sie diese Option mit dem inkrementellen Extrahieren und der Änderungsnachverfolgung festlegen, wird ADF angewiesen, alle Zeilen bei der ersten Ausführung einer Pipeline mit aktiviertem inkrementellem Extrahieren zu lesen. Nein Boolean skipInitialLoad

Beispiele für Snowflake-Quellskripts

Wenn Sie das Snowflake-Dataset als Quelltyp verwenden, sieht das zugehörige Datenflussskript wie folgt aus:

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

Wenn Sie ein Inlinedataset verwenden, sieht das zugehörige Datenflussskript wie folgt aus:

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

Native Änderungsnachverfolgung

Azure Data Factory unterstützt jetzt ein natives Feature in Snowflake, das als Änderungsnachverfolgung bekannt ist, was das Nachverfolgen von Änderungen in Form von Protokollen umfasst. Dieses Feature von Snowflake ermöglicht es uns, die Änderungen der Daten im Zeitverlauf zu verfolgen, was für das inkrementelle Laden von Daten und für Überprüfungszwecke nützlich ist. Wenn Sie dieses Feature verwenden möchten, erstellen wir beim Aktivieren von Change Data Capture und Auswählen der Snowflake-Änderungsnachverfolgung ein Stream-Objekt für die Quelltabelle, das die Änderungsnachverfolgung in der Snowflake-Tabelle ermöglicht. Anschließend verwenden wir die CHANGES-Klausel in unserer Abfrage, um nur die neuen oder aktualisierten Daten aus der Quelltabelle abzurufen. Außerdem wird empfohlen, die Pipeline so zu planen, dass Änderungen innerhalb des Intervalls der Datenaufbewahrungszeit für die Snowflake-Quelltabelle verarbeitet werden, sonst können Benutzer inkonsistentes Verhalten bei erfassten Änderungen sehen.

Senkentransformation

In der folgenden Tabelle sind die von einer Snowflake-Senke unterstützten Eigenschaften aufgeführt. Sie können diese Eigenschaften auf der Registerkarte Einstellungen bearbeiten. Bei Verwendung eines Inlinedatasets werden zusätzliche Einstellungen angezeigt. Diese entsprechen den Eigenschaften, die im Abschnitt zu den Dataseteigenschaften beschrieben sind. Der Connector verwendet die interne Datenübertragung von Snowflake.

Name BESCHREIBUNG Erforderlich Zulässige Werte Datenflussskript-Eigenschaft
Updatemethode Geben Sie an, welche Vorgänge für das Snowflake-Ziel zulässig sind.
Um Aktualisierungs-, Upsert- oder Löschaktionen auf Zeilen anzuwenden, muss eine Zeilenänderungstransformation zum Kennzeichnen von Zeilen für diese Aktionen erfolgen.
Ja true oder false deletable
insertable
updateable
upsertable
Schlüsselspalten Für Update-, Upsert- und Löschvorgänge muss mindestens eine Schlüsselspalte festgelegt werden, um die Zeile zu bestimmen, die geändert werden soll. Nein Array keys
Aktion table Bestimmt, ob die Zieltabelle vor dem Schreiben neu erstellt werden soll oder alle Zeilen aus der Zieltabelle entfernt werden sollen.
- Keine: Es wird keine Aktion an der Tabelle vorgenommen.
- Neu erstellen: Die Tabelle wird gelöscht und neu erstellt. Erforderlich, wenn eine neue Tabelle dynamisch erstellt wird.
- Abschneiden: Alle Zeilen werden aus der Zieltabelle entfernt.
Nein true oder false Neu erstellen
truncate

Beispiele für Snowflake-Senkenskripts

Wenn Sie das Snowflake-Dataset als Senkentyp verwenden, sieht das zugehörige Datenflussskript wie folgt aus:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Wenn Sie ein Inlinedataset verwenden, sieht das zugehörige Datenflussskript wie folgt aus:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Abfrage-Pushdown-Optimierung

Durch Festlegen des Protokolliergrads der Pipeline auf „Keine“ schließen wir die Übertragung von Zwischenmetriken für die Transformation aus, wodurch potenzielle Hindernisse für Spark-Optimierungen vermieden und die von Snowflake bereitgestellte Abfrage-Pushdown-Optimierung ermöglicht werden. Diese Pushdown-Optimierung ermöglicht erhebliche Leistungsverbesserungen für große Snowflake-Tabellen mit umfangreichen Datasets.

Hinweis

Temporäre Tabellen in Snowflake werden nicht unterstützt, da sie lokal für die Sitzungen oder Benutzer sind, die sie erstellen, wodurch sie für andere Sitzungen unzugänglich sind und anfällig dafür macht, von Snowflake als reguläre Tabellen überschrieben zu werden. Während Snowflake transiente Tabellen als Alternative bietet, die global zugänglich sind, erfordern sie ein manuelles Löschen, was unserem primären Ziel der Verwendung von temporäre Tabellen widerspricht, das darin besteht, jegliche Löschvorgänge im Quellschema zu vermeiden.

Eigenschaften der Lookup-Aktivität

Weitere Informationen zu den Eigenschaften finden Sie unter Lookup-Aktivität in Azure Data Factory.

Upgraden des Snowflake-Connectors

Um den verknüpften Snowflake-Dienst zu upgraden, können Sie ein paralleles Upgrade oder ein direktes Upgrade durchführen.

Paralleles Upgrade

Führen Sie die folgenden Schritte aus, um ein paralleles Upgrade durchzuführen:

  1. Erstellen Sie einen neuen verknüpften Snowflake-Dienst, und konfigurieren Sie ihn anhand der Eigenschaften des verknüpften Diensts.
  2. Erstellen Sie ein Dataset basierend auf dem neu erstellten verknüpften Snowflake-Dienst.
  3. Ersetzen Sie den neuen verknüpften Dienst und das Dataset durch die vorhandenen in den Pipelines, die auf die Legacyobjekte verweisen.

Direktes Upgrade

Um ein direktes Upgrade durchzuführen, müssen Sie die Nutzdaten des vorhandenen verknüpften Diensts bearbeiten und das Dataset aktualisieren, damit der neue verknüpfte Dienst verwendet wird.

  1. Aktualisieren Sie den Typ von Snowflake in SnowflakeV2.

  2. Ändern Sie die Nutzdaten des verknüpften Diensts aus dem Legacyformat in das neue Muster. Sie können jedes Feld entweder über die Benutzeroberfläche ausfüllen, nachdem Sie den oben genannten Typ geändert haben, oder die Nutzdaten direkt über den JSON-Editor aktualisieren. Im Abschnitt Eigenschaften des verknüpften Diensts in diesem Artikel finden Sie die unterstützten Verbindungseigenschaften. Die folgenden Beispiele zeigen die Unterschiede bei den Nutzdaten für die Legacy- und die neuen verknüpften Snowflake-Dienste:

    JSON-Nutzdaten des verknüpften Snowflake-Legacydiensts:

      {
         "name": "Snowflake1",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "annotations": [],
             "type": "Snowflake",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC",
                 "encryptedCredential": "<your_encrypted_credential_value>"
             },
             "connectVia": {
                 "referenceName": "AzureIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    

    JSON-Nutzdaten des neuen verknüpften Snowflake-Diensts:

     {
         "name": "Snowflake2",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "parameters": {
                 "schema": {
                     "type": "string",
                     "defaultValue": "PUBLIC"
                 }
             },
             "annotations": [],
             "type": "SnowflakeV2",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "accountIdentifier": "<FAKE_Account>",
                 "user": "FAKE_USER",
                 "database": "FAKE_DB",
                 "warehouse": "FAKE_DW",
                 "encryptedCredential": "<placeholder>"
             },
             "connectVia": {
                 "referenceName": "AutoResolveIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    
  3. Aktualisieren Sie das Dataset, um den neuen verknüpften Dienst zu verwenden. Sie können entweder ein neues Dataset basierend auf dem neu erstellten verknüpften Dienst erstellen oder die type-Eigenschaft eines vorhandenen Datasets von SnowflakeTable in SnowflakeV2Table ändern.

Unterschiede zwischen Snowflake und Snowflake (Legacy)

Der Snowflake-Connector bietet neue Funktionen und ist mit den meisten Features des Snowflake-Connector (Legacy) kompatibel. Die folgende Tabelle zeigt die Funktionsunterschiede zwischen Snowflake und Snowflake (Legacy).

Snowflake Snowflake (Legacy)
Unterstützen die Standardauthentifizierung und die Authentifizierung über ein Schlüsselpaar. Unterstützen die Standardauthentifizierung.
Skriptparameter werden derzeit in Skripten nicht unterstützt. Verwenden Sie alternativ dynamische Ausdrücke für Skriptparameter. Weitere Informationen finden Sie unter Ausdrücke und Funktionen in Azure Data Factory und Azure Synapse Analytics. Unterstützen Skriptparameter in Skripten.
Unterstützen BigDecimal in Lookups. Der in Snowflake definierte Typ NUMBER wird als Zeichenfolge in Lookups angezeigt. BigDecimal wird in Lookups nicht unterstützt.
Legacy-Eigenschaft connectionstring ist veraltet und wird durch die erforderlichen Parameter Konto, Lager, Datenbank, Schema und Rolle ersetzt Im älteren Snowflake-Verbinder wurde die connectionstring-Eigenschaft verwendet, um eine Verbindung herzustellen.

Eine Liste der Datenspeicher, die als Quellen und Senken für die Copy-Aktivität unterstützt werden, finden Sie unter Unterstützte Datenspeicher und Formate.