Gegevens kopiëren en transformeren in Snowflake met behulp van Azure Data Factory of Azure Synapse Analytics
VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics
Tip
Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .
In dit artikel wordt beschreven hoe u de Copy-activiteit in Azure Data Factory- en Azure Synapse-pijplijnen gebruikt om gegevens van en naar Snowflake te kopiëren en Gegevensstroom te gebruiken om gegevens in Snowflake te transformeren. Zie het inleidende artikel voor Data Factory of Azure Synapse Analytics voor meer informatie.
Belangrijk
De nieuwe Snowflake-connector biedt verbeterde systeemeigen Snowflake-ondersteuning. Als u de verouderde Snowflake-connector in uw oplossing gebruikt, wordt u aangeraden om uw Snowflake-connector zo snel mogelijk bij te werken. Raadpleeg deze sectie voor meer informatie over het verschil tussen de verouderde en de nieuwste versie.
Deze Snowflake-connector wordt ondersteund voor de volgende mogelijkheden:
Ondersteunde mogelijkheden | IR |
---|---|
Copy-activiteit (bron/sink) | (1) (2) |
Toewijzingsgegevensstroom (bron/sink) | (1) |
Activiteit Lookup | (1) (2) |
Scriptactiviteit | (1) (2) |
(1) Azure Integration Runtime (2) Zelf-hostende Integration Runtime
Voor de Copy-activiteit ondersteunt deze Snowflake-connector de volgende functies:
- Kopieer gegevens uit Snowflake die de COPY van Snowflake gebruikt naar de opdracht [locatie] om de beste prestaties te bereiken.
- Kopieer gegevens naar Snowflake die gebruikmaken van de COPY van Snowflake naar de opdracht [table] om de beste prestaties te bereiken. Het biedt ondersteuning voor Snowflake in Azure.
- Als een proxy is vereist om vanuit een zelf-hostende Integration Runtime verbinding te maken met Snowflake, moet u de omgevingsvariabelen configureren voor HTTP_PROXY en HTTPS_PROXY op de Integration Runtime-host.
Als uw gegevensarchief zich in een on-premises netwerk, een virtueel Azure-netwerk of een virtuele particuliere cloud van Amazon bevindt, moet u een zelf-hostende Integration Runtime configureren om er verbinding mee te maken. Zorg ervoor dat u de IP-adressen toevoegt die de zelf-hostende Integration Runtime gebruikt voor de lijst met toegestane adressen.
Als uw gegevensarchief een beheerde cloudgegevensservice is, kunt u De Azure Integration Runtime gebruiken. Als de toegang is beperkt tot IP-adressen die zijn goedgekeurd in de firewallregels, kunt u IP-adressen van Azure Integration Runtime toevoegen aan de lijst met toegestane adressen.
Het Snowflake-account dat wordt gebruikt voor bron of sink, moet de benodigde USAGE
toegang hebben tot de database en lees-/schrijftoegang voor het schema en de tabellen/weergaven eronder. Daarnaast moet het ook CREATE STAGE
in het schema staan om de externe fase te kunnen maken met sas-URI.
De volgende waarden voor accounteigenschappen moeten worden ingesteld
Eigenschappen | Beschrijving | Vereist | Standaardinstelling |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Hiermee geeft u op of een opslagintegratieobject als cloudreferenties moet worden vereist bij het maken van een benoemde externe fase (met CREATE STAGE) voor toegang tot een opslaglocatie in de privécloud. | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Hiermee geeft u op of u een benoemde externe fase wilt gebruiken die verwijst naar een opslagintegratieobject als cloudreferenties bij het laden van gegevens uit of het verwijderen van gegevens naar een opslaglocatie in de privécloud. | FALSE | FALSE |
Zie Strategieën voor gegevenstoegang voor meer informatie over de netwerkbeveiligingsmechanismen en -opties die door Data Factory worden ondersteund.
Als u de kopieeractiviteit wilt uitvoeren met een pijplijn, kunt u een van de volgende hulpprogramma's of SDK's gebruiken:
- Het hulpprogramma voor het kopiëren van gegevens
- Azure Portal
- De .NET-SDK
- De Python-SDK
- Azure PowerShell
- De REST API
- Een Azure Resource Manager-sjabloon
Gebruik de volgende stappen om een gekoppelde service te maken voor Snowflake in de gebruikersinterface van Azure Portal.
Blader naar het tabblad Beheren in uw Azure Data Factory- of Synapse-werkruimte en selecteer Gekoppelde services en klik vervolgens op Nieuw:
Zoek naar Snowflake en selecteer de Snowflake-connector.
Configureer de servicedetails, test de verbinding en maak de nieuwe gekoppelde service.
De volgende secties bevatten details over eigenschappen waarmee entiteiten worden gedefinieerd die specifiek zijn voor een Snowflake-connector.
Deze algemene eigenschappen worden ondersteund voor de gekoppelde Snowflake-service:
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De eigenschap type moet worden ingesteld op SnowflakeV2. | Ja |
accountIdentifier | De naam van het account samen met de organisatie. Bijvoorbeeld myorg-account123. | Ja |
database | De standaarddatabase die wordt gebruikt voor de sessie nadat u verbinding hebt gemaakt. | Ja |
warehouse | Het standaard virtuele magazijn dat wordt gebruikt voor de sessie nadat u verbinding hebt gemaakt. | Ja |
authenticationType | Type verificatie dat wordt gebruikt om verbinding te maken met de Snowflake-service. Toegestane waarden zijn: Basic (Standaard) en KeyPair. Raadpleeg de bijbehorende secties hieronder over respectievelijk meer eigenschappen en voorbeelden. | Nee |
role | De standaardbeveiligingsrol die wordt gebruikt voor de sessie nadat u verbinding hebt gemaakt. | Nee |
host | De hostnaam van het Snowflake-account. Voorbeeld: contoso.snowflakecomputing.com . .cn wordt ook ondersteund. |
Nee |
connectVia | De integratieruntime die wordt gebruikt om verbinding te maken met het gegevensarchief. U kunt de Azure Integration Runtime of een zelf-hostende Integration Runtime gebruiken (als uw gegevensarchief zich in een privénetwerk bevindt). Als dit niet is opgegeven, wordt de standaard Azure Integration Runtime gebruikt. | Nee |
Deze Snowflake-connector ondersteunt de volgende verificatietypen. Zie de bijbehorende secties voor meer informatie.
Als u basisverificatie wilt gebruiken, geeft u naast de algemene eigenschappen die in de vorige sectie worden beschreven, de volgende eigenschappen op:
Eigenschappen | Beschrijving | Vereist |
---|---|---|
Gebruiker | Aanmeldingsnaam voor de Snowflake-gebruiker. | Ja |
password | Het wachtwoord voor de Snowflake-gebruiker. Markeer dit veld als een SecureString-type om het veilig op te slaan. U kunt ook verwijzen naar een geheim dat is opgeslagen in Azure Key Vault. | Ja |
Voorbeeld:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Wachtwoord in Azure Key Vault:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Notitie
Toewijzingen Gegevensstroom ondersteunt alleen basisverificatie.
Als u sleutelpaarverificatie wilt gebruiken, moet u een verificatiegebruiker voor sleutelpaar configureren en maken in Snowflake door te verwijzen naar sleutelpaarverificatie en sleutelpaarrotatie. Noteer daarna de persoonlijke sleutel en de wachtwoordzin (optioneel), die u gebruikt om de gekoppelde service te definiëren.
Geef naast de algemene eigenschappen die in de vorige sectie worden beschreven, de volgende eigenschappen op:
Eigenschappen | Beschrijving | Vereist |
---|---|---|
Gebruiker | Aanmeldingsnaam voor de Snowflake-gebruiker. | Ja |
privateKey | De persoonlijke sleutel die wordt gebruikt voor de verificatie van het sleutelpaar. Om ervoor te zorgen dat de persoonlijke sleutel geldig is wanneer deze wordt verzonden naar Azure Data Factory en rekening houdend met het feit dat het privateKey-bestand newlinetekens (\n) bevat, is het essentieel om de privateKey-inhoud correct op te maken in de letterlijke vorm van de tekenreeks. Dit proces omvat het expliciet toevoegen van \n aan elke nieuwe regel. |
Ja |
privateKeyPassphrase | De wachtwoordzin die wordt gebruikt voor het ontsleutelen van de persoonlijke sleutel, als deze is versleuteld. | Nee |
Voorbeeld:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Zie het artikel Gegevenssets voor een volledige lijst met secties en eigenschappen die beschikbaar zijn voor het definiëren van gegevenssets .
De volgende eigenschappen worden ondersteund voor de Snowflake-gegevensset.
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De typeeigenschap van de gegevensset moet worden ingesteld op SnowflakeV2Table. | Ja |
schema | Naam van het schema. Houd er rekening mee dat de schemanaam hoofdlettergevoelig is. | Nee voor bron, ja voor sink |
table | Naam van de tabel/weergave. Let op: de tabelnaam is hoofdlettergevoelig. | Nee voor bron, ja voor sink |
Voorbeeld:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Zie het artikel Pijplijnen voor een volledige lijst met secties en eigenschappen die beschikbaar zijn voor het definiëren van activiteiten. Deze sectie bevat een lijst met eigenschappen die worden ondersteund door de Snowflake-bron en -sink.
Snowflake-connector maakt gebruik van de COPY van Snowflake in de opdracht [location] om de beste prestaties te bereiken.
Als sinkgegevensarchief en -indeling systeemeigen worden ondersteund door de opdracht Snowflake COPY, kunt u de Copy-activiteit gebruiken om rechtstreeks vanuit Snowflake naar sink te kopiëren. Zie Direct copy from Snowflake voor meer informatie. Gebruik anders ingebouwde gefaseerde kopie uit Snowflake.
Als u gegevens uit Snowflake wilt kopiëren, worden de volgende eigenschappen ondersteund in de Copy-activiteit bronsectie.
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De typeeigenschap van de Copy-activiteit bron moet worden ingesteld op SnowflakeV2Source. | Ja |
query | Hiermee geeft u de SQL-query op voor het lezen van gegevens uit Snowflake. Als de namen van het schema, de tabel en de kolommen kleine letters bevatten, noemt u de object-id in de query, bijvoorbeeld select * from "schema"."myTable" .Het uitvoeren van een opgeslagen procedure wordt niet ondersteund. |
Nee |
exportSettings | Geavanceerde instellingen die worden gebruikt om gegevens op te halen uit Snowflake. U kunt de exemplaren configureren die door de COPY worden ondersteund in de opdracht die de service doorgeeft wanneer u de instructie aanroept. | Ja |
Onder exportSettings : |
||
type | Het type exportopdracht, ingesteld op SnowflakeExportCopyCommand. | Ja |
storageIntegration | Geef de naam op van uw opslagintegratie die u in Snowflake hebt gemaakt. Zie Een Snowflake-opslagintegratie configureren voor de vereiste stappen voor het gebruik van de opslagintegratie. | Nee |
additionalCopyOptions | Extra kopieeropties, aangeboden als een woordenlijst van sleutel-waardeparen. Voorbeelden: MAX_FILE_SIZE, OVERSCHRIJVEN. Zie Snowflake Copy Options voor meer informatie. | Nee |
additionalFormatOptions | Aanvullende bestandsindelingsopties die worden aangeboden aan de opdracht COPY als een woordenlijst van sleutel-waardeparen. Voorbeelden: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Zie Opties voor het type type Snowflake-indeling voor meer informatie. | Nr. |
Notitie
Zorg ervoor dat u gemachtigd bent om de volgende opdracht uit te voeren en toegang te krijgen tot het schema INFORMATION_SCHEMA en de tabelKOLOMMEN.
COPY INTO <location>
Als uw sinkgegevensarchief en -indeling voldoen aan de criteria die in deze sectie worden beschreven, kunt u de Copy-activiteit gebruiken om rechtstreeks vanuit Snowflake naar sink te kopiëren. De service controleert de instellingen en mislukt de Copy-activiteit wordt uitgevoerd als niet aan de volgende criteria wordt voldaan:
Wanneer u opgeeft
storageIntegration
in de bron:Het sink-gegevensarchief is de Azure Blob Storage waarnaar u in de externe fase in Snowflake hebt verwezen. U moet de volgende stappen uitvoeren voordat u gegevens kopieert:
Maak een gekoppelde Azure Blob Storage-service voor de sink Azure Blob Storage met ondersteunde verificatietypen.
Verken ten minste de rol Inzender voor opslagblobgegevens aan de Snowflake-service-principal in de sink Azure Blob Storage Access Control (IAM).
Wanneer u niet opgeeft
storageIntegration
in de bron:De gekoppelde sinkservice is Azure Blob Storage met shared access Signature-verificatie. Als u gegevens rechtstreeks naar Azure Data Lake Storage Gen2 wilt kopiëren in de volgende ondersteunde indeling, kunt u een gekoppelde Azure Blob Storage-service met SAS-verificatie maken voor uw Azure Data Lake Storage Gen2-account om gefaseerde kopie uit Snowflake te voorkomen.
De sinkgegevensindeling is van Parquet, gescheiden tekst of JSON met de volgende configuraties:
- Voor Parquet-indeling is de compressiecodec Geen, Snappy of Lzo.
- Voor tekst met scheidingstekens :
rowDelimiter
is \r\n of een willekeurig teken.compression
kan geen compressie, gzip, bzip2 of deflate zijn.encodingName
is standaard ingesteld of ingesteld op utf-8.quoteChar
is dubbele aanhalingstekens, enkele aanhalingstekens of lege tekenreeks (geen aanhalingsteken).
- Voor JSON-indeling ondersteunt direct kopiëren alleen het geval dat de snowflake-brontabel of het queryresultaat slechts één kolom heeft en het gegevenstype van deze kolom VARIANT, OBJECT of MATRIX is.
compression
kan geen compressie, gzip, bzip2 of deflate zijn.encodingName
is standaard ingesteld of ingesteld op utf-8.filePattern
in de sink voor kopieeractiviteiten is standaard ingesteld of ingesteld op SetOfObjects.
In bron
additionalColumns
van kopieeractiviteit is niet opgegeven.Kolomtoewijzing is niet opgegeven.
Voorbeeld:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Wanneer uw sinkgegevensarchief of -indeling niet systeemeigen compatibel is met de opdracht Snowflake COPY, zoals vermeld in de laatste sectie, schakelt u de ingebouwde gefaseerde kopie in met behulp van een tussentijdse Instantie van Azure Blob Storage. De functie voor gefaseerde kopie biedt u ook een betere doorvoer. De service exporteert gegevens uit Snowflake naar faseringsopslag, kopieert de gegevens naar sink en schoont ten slotte uw tijdelijke gegevens op uit de faseringsopslag. Zie Gefaseerde kopie voor meer informatie over het kopiëren van gegevens met behulp van fasering.
Als u deze functie wilt gebruiken, maakt u een gekoppelde Azure Blob Storage-service die verwijst naar het Azure-opslagaccount als tijdelijke fasering. Geef vervolgens de enableStaging
en stagingSettings
eigenschappen op in de Copy-activiteit.
Wanneer u opgeeft
storageIntegration
in de bron, moet de tijdelijke fasering van Azure Blob Storage het tijdelijke faseringstype zijn dat u in de externe fase in Snowflake hebt verwezen. Zorg ervoor dat u een gekoppelde Azure Blob Storage-service maakt met elke ondersteunde verificatie en dat u ten minste de rol Opslagblobgegevensbijdrager verleent aan de Snowflake-service-principal in het faseringsbeheer voor Azure Blob Storage-toegangsbeheer (IAM).Wanneer u niet opgeeft
storageIntegration
in de bron, moet de gekoppelde Azure Blob Storage-service gebruikmaken van shared access Signature-verificatie, zoals vereist door de opdracht Snowflake COPY. Zorg ervoor dat u de juiste toegangsmachtigingen verleent aan Snowflake in de fasering van Azure Blob Storage. Zie dit artikel voor meer informatie.
Voorbeeld:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Wanneer u een gefaseerde kopie uitvoert vanuit Snowflake, is het van cruciaal belang om het gedrag van sinkkopie in te stellen op Bestanden samenvoegen. Deze instelling zorgt ervoor dat alle gepartitioneerde bestanden correct worden verwerkt en samengevoegd, waardoor het probleem wordt voorkomen dat alleen het laatste gepartitioneerde bestand wordt gekopieerd.
Voorbeeldconfiguratie
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Notitie
Als u het gedrag van de sinkkopie niet instelt om bestanden samen te voegen, kan dit ertoe leiden dat alleen het laatste gepartitioneerde bestand wordt gekopieerd.
De Snowflake-connector maakt gebruik van de copy van Snowflake in de opdracht [tabel] om de beste prestaties te bereiken. Het biedt ondersteuning voor het schrijven van gegevens naar Snowflake in Azure.
Als de brongegevensopslag en -indeling systeemeigen worden ondersteund door de opdracht Snowflake COPY, kunt u de Copy-activiteit gebruiken om rechtstreeks van de bron naar Snowflake te kopiëren. Zie Direct kopiëren naar Snowflake voor meer informatie. Gebruik anders ingebouwde gefaseerde kopie naar Snowflake.
Als u gegevens naar Snowflake wilt kopiëren, worden de volgende eigenschappen ondersteund in de sectie Copy-activiteit sink.
Eigenschappen | Beschrijving | Vereist |
---|---|---|
type | De typeeigenschap van de Copy-activiteit sink, ingesteld op SnowflakeV2Sink. | Ja |
preCopyScript | Geef in elke uitvoering een SQL-query op voor de Copy-activiteit die moet worden uitgevoerd voordat u gegevens in Snowflake schrijft. Gebruik deze eigenschap om de vooraf geladen gegevens op te schonen. | Nee |
importSettings | Geavanceerde instellingen die worden gebruikt om gegevens naar Snowflake te schrijven. U kunt de exemplaren configureren die door de COPY worden ondersteund in de opdracht die de service doorgeeft wanneer u de instructie aanroept. | Ja |
Onder importSettings : |
||
type | Het type importopdracht, ingesteld op SnowflakeImportCopyCommand. | Ja |
storageIntegration | Geef de naam op van uw opslagintegratie die u in Snowflake hebt gemaakt. Zie Een Snowflake-opslagintegratie configureren voor de vereiste stappen voor het gebruik van de opslagintegratie. | Nee |
additionalCopyOptions | Extra kopieeropties, aangeboden als een woordenlijst van sleutel-waardeparen. Voorbeelden: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Zie Snowflake Copy Options voor meer informatie. | Nee |
additionalFormatOptions | Aanvullende bestandsindelingsopties die beschikbaar zijn voor de opdracht COPY, opgegeven als een woordenlijst van sleutel-waardeparen. Voorbeelden: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Zie Opties voor het type type Snowflake-indeling voor meer informatie. | Nr. |
Notitie
Zorg ervoor dat u gemachtigd bent om de volgende opdracht uit te voeren en toegang te krijgen tot het schema INFORMATION_SCHEMA en de tabelKOLOMMEN.
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Als uw brongegevensarchief en -indeling voldoen aan de criteria die in deze sectie worden beschreven, kunt u de Copy-activiteit gebruiken om rechtstreeks van bron naar Snowflake te kopiëren. De service controleert de instellingen en mislukt de Copy-activiteit wordt uitgevoerd als niet aan de volgende criteria wordt voldaan:
Wanneer u opgeeft
storageIntegration
in de sink:Het brongegevensarchief is de Azure Blob Storage waarnaar u in de externe fase in Snowflake hebt verwezen. U moet de volgende stappen uitvoeren voordat u gegevens kopieert:
Maak een gekoppelde Azure Blob Storage-service voor de bron-Azure Blob Storage met ondersteunde verificatietypen.
Verdeel ten minste de rol opslagblobgegevenslezer toe aan de Snowflake-service-principal in het azure-brontoegangsbeheer voor Azure Blob Storage (IAM).
Wanneer u niet opgeeft
storageIntegration
in de sink:De gekoppelde bronservice is Azure Blob Storage met handtekeningverificatie voor gedeelde toegang. Als u gegevens rechtstreeks vanuit Azure Data Lake Storage Gen2 wilt kopiëren in de volgende ondersteunde indeling, kunt u een gekoppelde Azure Blob Storage-service met SAS-verificatie maken voor uw Azure Data Lake Storage Gen2-account om gefaseerde kopie naar Snowflake te voorkomen.
De brongegevensindeling is Parquet, tekst met scheidingstekens of JSON met de volgende configuraties:
Voor Parquet-indeling is de compressiecodec Geen of Snappy.
Voor tekst met scheidingstekens :
rowDelimiter
is \r\n of een willekeurig teken. Als het scheidingsteken voor rijen niet '\r\n' is,firstRowAsHeader
moet dit onwaar zijn enskipLineCount
is niet opgegeven.compression
kan geen compressie, gzip, bzip2 of deflate zijn.encodingName
is ingesteld op 'UTF-8', 'UTF-16', 'UTF-16', 'UTF-16BE', "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9 "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".quoteChar
is dubbele aanhalingstekens, enkele aanhalingstekens of lege tekenreeks (geen aanhalingsteken).
Voor JSON-indeling ondersteunt direct kopiëren alleen het geval dat de sink Snowflake-tabel slechts één kolom heeft en het gegevenstype van deze kolom VARIANT, OBJECT of MATRIX is.
compression
kan geen compressie, gzip, bzip2 of deflate zijn.encodingName
is standaard ingesteld of ingesteld op utf-8.- Kolomtoewijzing is niet opgegeven.
In de Copy-activiteit bron:
additionalColumns
is niet opgegeven.- Als uw bron een map is,
recursive
is ingesteld op waar. prefix
,modifiedDateTimeStart
,modifiedDateTimeEnd
enenablePartitionDiscovery
worden niet opgegeven.
Voorbeeld:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Wanneer uw brongegevensarchief of -indeling niet systeemeigen compatibel is met de opdracht Snowflake COPY, zoals vermeld in de laatste sectie, schakelt u de ingebouwde gefaseerde kopie in met behulp van een tussentijdse Instantie van Azure Blob Storage. De functie voor gefaseerde kopie biedt u ook een betere doorvoer. De service converteert de gegevens automatisch om te voldoen aan de vereisten voor gegevensindeling van Snowflake. Vervolgens wordt de opdracht COPY aangeroepen om gegevens in Snowflake te laden. Ten slotte worden uw tijdelijke gegevens uit de blobopslag opgeschoond. Zie Gefaseerde kopie voor meer informatie over het kopiëren van gegevens met behulp van fasering.
Als u deze functie wilt gebruiken, maakt u een gekoppelde Azure Blob Storage-service die verwijst naar het Azure-opslagaccount als tijdelijke fasering. Geef vervolgens de enableStaging
en stagingSettings
eigenschappen op in de Copy-activiteit.
Wanneer u opgeeft
storageIntegration
in de sink, moet de tijdelijke fasering van Azure Blob Storage de tijdelijke fasering zijn die u in de externe fase in Snowflake hebt genoemd. Zorg ervoor dat u een gekoppelde Azure Blob Storage-service maakt met elke ondersteunde verificatie en dat u ten minste de rol Storage Blob Data Reader verleent aan de Snowflake-service-principal in het faseringsbeheer voor Azure Blob Storage-toegangsbeheer (IAM).Wanneer u niet opgeeft
storageIntegration
in de sink, moet de gekoppelde Service voor fasering van Azure Blob Storage gebruikmaken van shared access Signature-verificatie, zoals vereist door de opdracht Snowflake COPY.
Voorbeeld:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Wanneer u gegevens transformeert in de toewijzingsgegevensstroom, kunt u lezen van en schrijven naar tabellen in Snowflake. Zie de brontransformatie en sinktransformatie in toewijzingsgegevensstromen voor meer informatie. U kunt ervoor kiezen om een Snowflake-gegevensset of een inlinegegevensset te gebruiken als bron- en sinktype.
De onderstaande tabel bevat de eigenschappen die worden ondersteund door de Snowflake-bron. U kunt deze eigenschappen bewerken op het tabblad Bronopties. De connector maakt gebruik van interne gegevensoverdracht van Snowflake.
Name | Beschrijving | Vereist | Toegestane waarden | Eigenschap gegevensstroomscript |
---|---|---|---|---|
Tabel | Als u Tabel als invoer selecteert, haalt de gegevensstroom alle gegevens op uit de tabel die is opgegeven in de Snowflake-gegevensset of in de bronopties wanneer u inlinegegevensset gebruikt. | Nee | String | (alleen voor inlinegegevensset) tableName schemaName |
Query | Als u Query als invoer selecteert, voert u een query in om gegevens op te halen uit Snowflake. Met deze instelling wordt elke tabel overschreven die u hebt gekozen in de gegevensset. Als de namen van het schema, de tabel en de kolommen kleine letters bevatten, noemt u de object-id in de query, bijvoorbeeld select * from "schema"."myTable" . |
Nee | String | query |
Incrementeel extraheren inschakelen (preview) | Gebruik deze optie om ADF te laten weten dat alleen rijen moeten worden verwerkt die zijn gewijzigd sinds de laatste keer dat de pijplijn is uitgevoerd. | Nee | Booleaanse waarde | enableCdc |
Incrementele kolom | Wanneer u de functie incrementeel extraheren gebruikt, moet u de datum/tijd/numerieke kolom kiezen die u als watermerk in de brontabel wilt gebruiken. | Nee | String | waterMarkColumn |
Snowflake-Wijzigingen bijhouden inschakelen (preview) | Met deze optie kan ADF gebruikmaken van de technologie voor het vastleggen van wijzigingen in Snowflake om alleen de deltagegevens te verwerken sinds de vorige pijplijnuitvoering. Met deze optie worden de deltagegevens automatisch geladen met bewerkingen voor invoegen, bijwerken en verwijderen van rijen zonder incrementele kolom. | Nee | Booleaanse waarde | enableNativeCdc |
Nettowijzigingen | Wanneer u het bijhouden van wijzigingen in snowflake gebruikt, kunt u deze optie gebruiken om gewijzigde rijen of volledige wijzigingen op te halen. Verwijderde gewijzigde rijen tonen alleen de meest recente versies van de rijen die zijn gewijzigd sinds een bepaald tijdstip, terwijl uitputtende wijzigingen u alle versies van elke rij laten zien die zijn gewijzigd, inclusief de versies die zijn verwijderd of bijgewerkt. Als u bijvoorbeeld een rij bijwerkt, ziet u een verwijderversie en een invoegversie in volledige wijzigingen, maar alleen de invoegversie in de ontdubbelde gewijzigde rijen. Afhankelijk van uw gebruiksscenario kunt u de optie kiezen die bij uw behoeften past. De standaardoptie is onwaar, wat uitgebreide wijzigingen betekent. | Nee | Booleaanse waarde | netChanges |
Systeemkolommen opnemen | Wanneer u het bijhouden van wijzigingen in snowflake gebruikt, kunt u de optie systemColumns gebruiken om te bepalen of de kolommen van de metagegevensstroom die door Snowflake worden geleverd, worden opgenomen of uitgesloten in de uitvoer van wijzigingen bijhouden. SystemColumns is standaard ingesteld op true, wat betekent dat de kolommen van de metagegevensstroom worden opgenomen. U kunt systemColumns instellen op false als u ze wilt uitsluiten. | Nee | Booleaanse waarde | systemColumns |
Beginnen met lezen vanaf het begin | Als u deze optie instelt met incrementeel extraheren en bijhouden van wijzigingen, wordt ADF geïnstrueerd om alle rijen bij de eerste uitvoering van een pijplijn te lezen, waarbij incrementeel extract is ingeschakeld. | Nee | Booleaanse waarde | skipInitialLoad |
Wanneer u de Snowflake-gegevensset als brontype gebruikt, is het bijbehorende gegevensstroomscript:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Als u inlinegegevensset gebruikt, is het bijbehorende gegevensstroomscript:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Azure Data Factory ondersteunt nu een systeemeigen functie in Snowflake, ook wel wijzigingen bijhouden genoemd, waarbij wijzigingen in de vorm van logboeken worden bijgehouden. Met deze functie van snowflake kunnen we de wijzigingen in de gegevens in de loop van de tijd bijhouden, waardoor het nuttig is voor het laden en controleren van incrementele gegevens. Als u deze functie wilt gebruiken, maken we, wanneer u Gegevens vastleggen wijzigen inschakelt en de Snowflake-Wijzigingen bijhouden selecteert, een Stream-object voor de brontabel die het bijhouden van wijzigingen in de bron-snowflake-tabel mogelijk maakt. Vervolgens gebruiken we de component CHANGES in onze query om alleen de nieuwe of bijgewerkte gegevens op te halen uit de brontabel. Het is ook raadzaam om een pijplijn te plannen, zodat wijzigingen worden gebruikt binnen het interval van de gegevensretentietijd die is ingesteld voor de snowflake-brontabel, anders kan de gebruiker inconsistent gedrag zien in vastgelegde wijzigingen.
De onderstaande tabel bevat de eigenschappen die worden ondersteund door snowflake-sink. U kunt deze eigenschappen bewerken op het tabblad Instellingen . Wanneer u een inlinegegevensset gebruikt, ziet u aanvullende instellingen, die hetzelfde zijn als de eigenschappen die worden beschreven in de sectie eigenschappen van de gegevensset. De connector maakt gebruik van interne gegevensoverdracht van Snowflake.
Name | Beschrijving | Vereist | Toegestane waarden | Eigenschap gegevensstroomscript |
---|---|---|---|---|
Bijwerkingsmethode | Geef op welke bewerkingen zijn toegestaan op uw Snowflake-bestemming. Als u rijen wilt bijwerken, upsert of verwijderen, is een transformatie van een alter row vereist om rijen voor deze acties te taggen. |
Ja | true of false |
te verwijderen invoegbaar kan worden bijgewerkt upsertable |
Sleutelkolommen | Voor updates, upserts en verwijderingen moet een sleutelkolom of -kolommen worden ingesteld om te bepalen welke rij moet worden gewijzigd. | Nee | Matrix | keys |
Tabelactie | Bepaalt of alle rijen uit de doeltabel opnieuw moeten worden gemaakt of verwijderd voordat ze worden geschreven. - Geen: Er wordt geen actie uitgevoerd voor de tabel. - Opnieuw maken: de tabel wordt verwijderd en opnieuw gemaakt. Vereist als u dynamisch een nieuwe tabel maakt. - Afkappen: alle rijen uit de doeltabel worden verwijderd. |
Nee | true of false |
recreëren truncate |
Wanneer u de Snowflake-gegevensset als sinktype gebruikt, is het bijbehorende gegevensstroomscript:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Als u inlinegegevensset gebruikt, is het bijbehorende gegevensstroomscript:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Door het niveau van logboekregistratie van pijplijnen in te stellen op Geen, sluiten we de overdracht van metrische gegevens voor tussenliggende transformatie uit, waardoor potentiële belemmeringen voor Spark-optimalisaties worden voorkomen en querypushdownoptimalisatie mogelijk wordt gemaakt door Snowflake. Deze pushdown-optimalisatie maakt aanzienlijke prestatieverbeteringen mogelijk voor grote Snowflake-tabellen met uitgebreide gegevenssets.
Notitie
Tijdelijke tabellen in Snowflake worden niet ondersteund, omdat ze lokaal zijn voor de sessie of gebruiker die ze maakt, waardoor ze niet toegankelijk zijn voor andere sessies en gevoelig zijn voor overschreven als gewone tabellen door Snowflake. Hoewel Snowflake tijdelijke tabellen biedt als alternatief, dat wereldwijd toegankelijk is, moeten ze handmatig worden verwijderd, wat in strijd is met het primaire doel van het gebruik van Temp-tabellen. Dit is om verwijderbewerkingen in het bronschema te voorkomen.
Zie Lookup-activiteit voor meer informatie over de eigenschappen.
Als u de Snowflake-connector wilt upgraden, kunt u een upgrade naast elkaar of een in-place upgrade uitvoeren.
Voer de volgende stappen uit om een side-by-side upgrade uit te voeren:
- Maak een nieuwe gekoppelde Snowflake-service en configureer deze door te verwijzen naar de eigenschappen van de gekoppelde service.
- Maak een gegevensset op basis van de zojuist gemaakte gekoppelde Snowflake-service.
- Vervang de nieuwe gekoppelde service en gegevensset door de bestaande services in de pijplijnen die gericht zijn op de verouderde objecten.
Als u een in-place upgrade wilt uitvoeren, moet u de bestaande nettolading van de gekoppelde service bewerken en de gegevensset bijwerken om de nieuwe gekoppelde service te gebruiken.
Werk het type van Snowflake bij naar SnowflakeV2.
Wijzig de nettolading van de gekoppelde service van de verouderde indeling in het nieuwe patroon. U kunt elk veld invullen vanuit de gebruikersinterface nadat u het hierboven genoemde type hebt gewijzigd of de nettolading rechtstreeks bijwerken via de JSON-editor. Raadpleeg de sectie Eigenschappen van de gekoppelde service in dit artikel voor de ondersteunde verbindingseigenschappen. In de volgende voorbeelden ziet u de verschillen in nettolading voor de verouderde en nieuwe gekoppelde Snowflake-services:
JSON-nettolading van verouderde Snowflake-gekoppelde service:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Nieuwe JSON-nettolading van de gekoppelde Snowflake-service:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Werk de gegevensset bij om de nieuwe gekoppelde service te gebruiken. U kunt een nieuwe gegevensset maken op basis van de zojuist gemaakte gekoppelde service of de typeeigenschap van een bestaande gegevensset bijwerken van SnowflakeTable naar SnowflakeV2Table.
De Snowflake-connector biedt nieuwe functionaliteiten en is compatibel met de meeste functies van de Snowflake-connector (verouderd). In de onderstaande tabel ziet u de functieverschillen tussen Snowflake en Snowflake (verouderd).
Snowflake | Snowflake (verouderd) |
---|---|
Ondersteuning voor basis- en sleutelpaarverificatie. | Ondersteuning voor basisverificatie. |
Scriptparameters worden momenteel niet ondersteund in scriptactiviteit. Als alternatief kunt u dynamische expressies gebruiken voor scriptparameters. Zie Expressies en functies in Azure Data Factory en Azure Synapse Analytics voor meer informatie. | Ondersteuning voor scriptparameters in scriptactiviteit. |
Ondersteuning voor BigDecimal in lookup-activiteit. Het getaltype, zoals gedefinieerd in Snowflake, wordt weergegeven als een tekenreeks in de opzoekactiviteit. | BigDecimal wordt niet ondersteund in lookup-activiteit. |
Verouderde connectionstring eigenschap is afgeschaft ten gunste van vereiste parameters Account, Warehouse, Database, Schema en Role |
In de verouderde Snowflake-connector is de connectionstring eigenschap gebruikt om een verbinding tot stand te brengen. |
Zie ondersteunde gegevensarchieven en -indelingen voor een lijst met gegevensarchieven die door Copy-activiteit worden ondersteund als bronnen en sinks.