Copia y transformación de datos en Snowflake mediante Azure Data Factory o Azure Synapse Analytics
SE APLICA A: Azure Data Factory Azure Synapse Analytics
Sugerencia
Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. Obtenga información sobre cómo iniciar una nueva evaluación gratuita.
En este artículo se explica cómo usar la actividad de copia en las canalizaciones de Azure Data Factory y Azure Synapse para copiar datos desde y hacia Snowflake, y también cómo usar Data Flow para transformar datos en Snowflake. Para obtener más información, vea el artículo introductorio de Data Factory o Azure Synapse Analytics.
Importante
El nuevo conector Snowflake proporciona compatibilidad nativa mejorada con Snowflake. Si usa el conector de Snowflake heredado en la solución, se recomienda actualizar el conector de Snowflake lo antes posible. Consulte esta sección para más información sobre la diferencia entre la versión heredada y la versión más reciente.
Funcionalidades admitidas
Este conector Snowflake es compatible con las funcionalidades siguientes:
Funcionalidades admitidas | IR |
---|---|
Actividad de copia (origen/receptor) | ① ② |
Flujo de datos de asignación (origen/receptor) | ① |
Actividad de búsqueda | ① ② |
Actividad de script | ① ② |
① Azure Integration Runtime ② Entorno de ejecución de integración autohospedado
Para la actividad de copia, este conector Snowflake admite las siguientes funciones:
- Copie los datos de Snowflake que usan el comando COPY into [ubicación] de Snowflake para lograr el mejor rendimiento.
- Copie los datos a Snowflake que aprovechan el comando [tabla] de Snowflake para lograr el mejor rendimiento. Es compatible con Snowflake en Azure.
- Si se requiere un proxy para conectarse a Snowflake desde un entorno de ejecución de integración autohospedado, debe configurar las variables de entorno para HTTP_PROXY y HTTPS_PROXY en el host del entorno de ejecución de integración.
Prerrequisitos
Si el almacén de datos se encuentra en una red local, una red virtual de Azure o una nube privada virtual de Amazon, debe configurar un entorno de ejecución de integración autohospedado para conectarse a él. Asegúrese de agregar las direcciones IP que usa el entorno de ejecución de integración autohospedado para la lista de permitidos.
Si el almacén de datos es un servicio de datos en la nube administrado, puede usar Azure Integration Runtime. Si el acceso está restringido a las direcciones IP que están aprobadas en las reglas de firewall, puede agregar direcciones IP de Azure Integration Runtime a la lista de permitidos.
La cuenta de Snowflake que se usa para el origen o el receptor debe tener el acceso USAGE
necesario en la base de datos, así como acceso de lectura y escritura en el esquema y las tablas o vistas en este. Además, también debe tener CREATE STAGE
en el esquema para poder crear la fase externa con el URI de SAS.
Deben establecerse los valores de las propiedades de la cuenta siguientes:
Propiedad | Descripción | Necesario | Valor predeterminado |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Especifica si se debe requerir un objeto de integración de almacenamiento como credenciales de nube al crear una fase externa con nombre (mediante CREATE STAGE) para acceder a una ubicación de almacenamiento en la nube privada. | false | false |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Especifica si se debe requerir el uso de una fase externa con nombre que haga referencia a un objeto de integración de almacenamiento como credenciales de nube al cargar o descargar datos en una ubicación de almacenamiento en la nube privada. | false | false |
Consulte Estrategias de acceso a datos para más información sobre los mecanismos de seguridad de red y las opciones que admite Data Factory.
Introducción
Para realizar la actividad de copia con una canalización, puede usar una de los siguientes herramientas o SDK:
- La herramienta Copiar datos
- Azure Portal
- El SDK de .NET
- El SDK de Python
- Azure PowerShell
- API REST
- La plantilla de Azure Resource Manager
Creación de un servicio vinculado en mediante la interfaz de usuario
Siga estos pasos para crear un servicio vinculado en Snowflake en la interfaz de usuario de Azure Portal.
Vaya a la pestaña Administrar del área de trabajo de Azure Data Factory o Synapse y seleccione Servicios vinculados; luego haga clic en Nuevo:
Busque Snowflake y seleccione el conector de Snowflake.
Configure los detalles del servicio, pruebe la conexión y cree el nuevo servicio vinculado.
Detalles de configuración del conector
En las secciones siguientes se proporcionan detalles sobre las propiedades que definen entidades específicas de un conector de Snowflake.
Propiedades del servicio vinculado
Estas propiedades genéricas son compatibles con el servicio vinculado de Snowflake:
Propiedad | Descripción | Obligatorio |
---|---|---|
type | La propiedad type debe establecerse en SmartsheetV2. | Sí |
accountIdentifier | Nombre de la cuenta junto con su organización. Por ejemplo, myorg-account123. | Sí |
database | Base de datos predeterminada que se usa para la sesión después de conectarse. | Sí |
almacén | El almacenamiento virtual predeterminado que se usa para la sesión después de conectarse. | Sí |
authenticationType | Tipo de autenticación que se usa para conectarse al servicio Snowflake. Los valores permitidos son: Basic (Valor predeterminado) y KeyPair. Haga referencia a las siguientes secciones correspondientes para obtener más información sobre propiedades y ejemplos, respectivamente. | No |
rol | Rol de seguridad predeterminado que se usa para la sesión después de conectarse. | No |
host | Nombre de host de la cuenta de Snowflake. Por ejemplo: contoso.snowflakecomputing.com . .cn también se admite. |
No |
connectVia | El entorno de ejecución de integración que se utiliza para conectarse al almacén de datos. Se puede usar Azure Integration Runtime o un entorno de ejecución de integración autohospedado (si el almacén de datos se encuentra en una red privada). Si no se especifica, se usará Azure Integration Runtime. | No |
Este conector de Snowflake admite los tipos de autenticación siguientes. Consulte las secciones correspondientes para más información.
Autenticación básica
Para usar autenticación básica, además de las propiedades genéricas que se describen en la sección anterior, especifique las siguientes propiedades:
Propiedad | Descripción | Obligatorio |
---|---|---|
usuario | Nombre de inicio de sesión del usuario de Snowflake. | Sí |
password | Contraseña del usuario de Snowflake. Marque este campo como un tipo SecureString para almacenarlo de forma segura. También puede hacer referencia a un secreto almacenado en Azure Key Vault. | Sí |
Ejemplo:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Contraseña en Azure Key Vault:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Nota:
Los flujos de datos de asignación solo admiten la autenticación básica.
Autenticación del par de claves de SSH
Para usar la autenticación de par de claves, debe configurar y crear un usuario de autenticación de par de claves en Snowflake haciendo referencia a Autenticación de par de claves y rotación de pares de claves. Después, anote la clave privada y la frase de contraseña (opcional), que se usa para definir el servicio vinculado.
Además de las propiedades genéricas descritas en las secciones anteriores, especifique las siguientes:
Propiedad | Descripción | Obligatorio |
---|---|---|
usuario | Nombre de inicio de sesión del usuario de Snowflake. | Sí |
privateKey | Clave privada usada para la autenticación del par de claves. Para asegurarse de que la clave privada es válida cuando se envía a Azure Data Factory y tiene en cuenta que el archivo privateKey incluye caracteres de nueva línea (\n), es esencial dar formato correctamente al contenido privateKey en su forma literal de cadena. Este proceso implica agregar \n explícitamente a cada nueva línea. |
Sí |
privateKeyPassphrase | Frase de contraseña que se usa para descifrar la clave privada, si está cifrada. | No |
Ejemplo:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Propiedades del conjunto de datos
Si desea ver una lista completa de las secciones y propiedades disponibles para definir conjuntos de datos, consulte el artículo sobre conjuntos de datos.
Las siguientes propiedades son compatibles con el conjunto de datos de Snowflake.
Propiedad | Descripción | Obligatorio |
---|---|---|
type | La propiedad type del conjunto de datos se debe establecer en SnowflakeV2Table. | Sí |
esquema | Nombre del esquema. Observe que el nombre del esquema distingue mayúsculas de minúsculas. | No para el origen, sí para el receptor |
table | Nombre de la tabla o vista. Observe que el nombre de la tabla distingue mayúsculas de minúsculas. | No para el origen, sí para el receptor |
Ejemplo:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Propiedades de la actividad de copia
Si desea ver una lista completa de las secciones y propiedades disponibles para definir actividades, consulte el artículo sobre canalizaciones. En esta sección se proporciona una lista de las propiedades que admiten el receptor y el origen de Snowflake.
Snowflake como origen.
El conector de Snowflake usa el comando COPY into [location] de Snowflake para lograr el mejor rendimiento.
Si el almacén de datos y el formato del receptor se admiten de forma nativa con el comando COPY de Snowflake, puede usar la actividad de copia para copiar directamente de Snowflake al receptor. Consulte Copia directa de Snowflake para obtener detalles. De lo contrario, use la Copia almacenada provisionalmente desde Snowflake.
Para copiar datos desde Snowflake, en la sección source de la actividad de copia se admiten las propiedades siguientes.
Propiedad | Descripción | Obligatorio |
---|---|---|
type | La propiedad type del origen de la actividad de copia debe establecerse en SnowflakeV2Source. | Sí |
Query | Especifica la consulta SQL para leer datos de Snowflake. Si los nombres del esquema, la tabla y las columnas contienen minúsculas, indique el identificador de objeto en la consulta, por ejemplo, select * from "schema"."myTable" .No se admite la ejecución de procedimientos almacenados. |
No |
exportSettings | Configuración avanzada utilizada para recuperar datos de Snowflake. Se pueden configurar los parámetros que admite el comando COPY into que el servicio pasará al invocar la instrucción. | Sí |
En exportSettings : |
||
type | El tipo de comando de exportación, establecido en SnowflakeExportCopyCommand. | Sí |
storageIntegration | Especifica el nombre de la integración de almacenamiento que creó en Snowflake. Para conocer los pasos previos del uso de la integración de almacenamiento, consulta Configuración de una integración de almacenamiento de Snowflake. | No |
additionalCopyOptions | Opciones de copia adicionales, proporcionadas como un diccionario de pares clave-valor. Ejemplos: MAX_FILE_SIZE, OVERWRITE. Para obtener más información consulte el documento sobre las opciones de copia de Snowflake. | No |
additionalFormatOptions | Opciones adicionales de formato de archivo que se proporcionan al comando COPY como un diccionario de pares clave-valor. Ejemplos: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Para obtener más información consulte opciones de tipo de formato de Snowflake. | No |
Nota
Asegúrese de tener permiso para ejecutar el siguiente comando y acceder al esquema INFORMATION_SCHEMA y a la tabla COLUMNS.
COPY INTO <location>
Copia directa desde Snowflake
Si el almacén de datos y el formulario del receptor cumplen los criterios descritos en esta sección, puede usar la actividad de copia para copiar directamente de Snowflake al receptor. El servicio comprueba la configuración y no ejecuta la actividad de copia si no se cumplen los siguientes criterios:
Al especificar
storageIntegration
en el origen:El almacén de datos receptor es la instancia de Azure Blob Storage a la que se hace referencia en la fase externa de Snowflake. Debes completar los pasos siguientes antes de copiar datos:
Crea un servicio vinculado de Azure Blob Storage para el receptor de Azure Blob Storage con los tipos de autenticación admitidos.
Concede al menos el rol de Colaborador de datos de blobs de almacenamiento a la entidad de servicio de Snowflake en el receptor de Azure Blob Storage Access Control (IAM).
Cuando no especifiques
storageIntegration
en el origen:El servicio vinculado al receptor es Azure Blob Storage con la autenticación de la firma de acceso de recurso compartido. Si quieres copiar los datos directamente en Azure Data Lake Storage Gen2 en el siguiente formato admitido, puedes crear un servicio vinculado de Azure Blob Storage con la autenticación de SAS en la cuenta de Azure Data Lake Storage Gen2, para evitar el uso de una copia almacenada provisionalmente desde Snowflake.
El formato de datos de receptor es de Parquet, texto delimitado o JSON con estas configuraciones:
- Para el formato Parquet, el códec de compresión es None, Snappy, o Lzo.
- Para el formato de texto delimitado:
rowDelimiter
es \r\n o cualquier carácter individual.compression
puede ser no compression, gzip, bzip2, o deflate.encodingName
se deja con el valor predeterminado o se establece en utf-8.quoteChar
es double quote, single quote, o empty string (ningún carácter de comillas).
- Para el formato JSON, la copia directa solo admite el caso en que el resultado de la consulta o la tabla Snowflake de origen solo tiene una columna y el tipo de datos de esta columna es VARIANT, OBJECT o ARRAY.
compression
puede ser no compression, gzip, bzip2 o deflate.encodingName
se deja con el valor predeterminado o se establece en utf-8.filePattern
en el receptor de la actividad de copia se deja como el valor predeterminado o se establece en setOfObjects.
En el origen de la actividad de copia, no se especifica
additionalColumns
.No se especifica la asignación de columnas.
Ejemplo:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Copia almacenada provisionalmente desde Snowflake
Cuando el formato o el almacén de datos receptor no sea compatible de forma nativa con el comando COPY de Snowflake, como se mencionó en la última sección, habilite la copia preconfigurada integrada con una instancia intermedia de Azure Blob Storage. La característica de copia almacenada provisionalmente también proporciona un mejor rendimiento. El servicio exporta datos de Snowflake al almacenamiento provisional, copia los datos en el receptor y, por último, limpia los datos temporales del almacenamiento provisional. Consulte Copia almacenada provisionalmente para obtener más información sobre cómo copiar datos mediante el almacenamiento provisional.
Para utilizar esta característica, cree un servicio vinculado de Azure Blob Storage que haga referencia a la cuenta de Azure Storage como almacenamiento provisional temporal. Luego especifique las propiedades enableStaging
y stagingSettings
en la actividad de copia.
Al especificar
storageIntegration
en el origen, el almacenamiento provisional de Azure Blob Storage debe ser el que se hace referencia en la fase externa de Snowflake. Asegúrate de crear un servicio vinculado de Azure Blob Storage con cualquier autenticación admitida y concede al menos el rol de Colaborador de datos de blobs de almacenamiento a la entidad de servicio de Snowflake en el almacenamiento provisional de Azure Blob Storage Access Control (IAM).Cuando no se especifica
storageIntegration
en el origen, el servicio vinculado de almacenamiento provisional de Azure Blob Storage debe usar la autenticación de firma de acceso compartido, según lo requiera el comando COPY de Snowflake. Asegúrate de conceder el permiso de acceso adecuado a Snowflake en el almacenamiento provisional de Azure Blob Storage. Para más información sobre esto, consulte este artículo.
Ejemplo:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Al realizar una copia almacenada provisionalmente de Snowflake, es fundamental establecer el comportamiento de copia del receptor en Combinar archivos. Esta configuración garantiza que todos los archivos con particiones se controlan y combinan correctamente, lo que impide el problema en el que solo se copia el último archivo con particiones.
Configuración de ejemplo
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Nota:
Si no se establece el comportamiento de copia del receptor en Archivos de combinación, solo se puede copiar el último archivo con particiones.
Snowflake como receptor
El conector de Snowflake utiliza el comando COPY into [location] de Snowflake para lograr un mejor rendimiento. Admite la escritura de datos de Snowflake en Azure.
Si el almacén de datos y el formato del origen se admiten de forma nativa con el comando COPY de Snowflake, puede usar la actividad de copia para copiar directamente del origen a Snowflake. Consulte Copia directa de Snowflake para obtener detalles. De lo contrario, use la Copia almacenada provisionalmente de Snowflake.
Para copiar datos a Snowflake, se admiten las siguientes propiedades en la sección sink de la actividad de copia.
Propiedad | Descripción | Obligatorio |
---|---|---|
type | La propiedad type del receptor de la actividad de copia se establece en SnowflakeV2Sink. | Sí |
preCopyScript | Especifique una consulta SQL para que la actividad de copia se ejecute antes de escribir datos en Snowflake en cada ejecución. Esta propiedad se usa para limpiar los datos cargados previamente. | No |
importSettings | Configuración avanzada utilizada para escribir datos en Snowflake. Se pueden configurar los parámetros que admite el comando COPY into que el servicio pasará al invocar la instrucción. | Sí |
En importSettings : |
||
type | El tipo de comando de importación, establecido en SnowflakeImportCopyCommand. | Sí |
storageIntegration | Especifica el nombre de la integración de almacenamiento que creó en Snowflake. Para conocer los pasos previos del uso de la integración de almacenamiento, consulta Configuración de una integración de almacenamiento de Snowflake. | No |
additionalCopyOptions | Opciones de copia adicionales, proporcionadas como un diccionario de pares clave-valor. Ejemplos: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Para obtener más información, consulte el documento sobre las opciones de copia de Snowflake. | No |
additionalFormatOptions | Opciones de formato de archivo adicionales que se proporcionan al comando COPY, que se proporciona como un diccionario de pares clave-valor. Ejemplos: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Para obtener más información consulte opciones de tipo de formato de Snowflake. | No |
Nota
Asegúrese de tener permiso para ejecutar el siguiente comando y acceder al esquema INFORMATION_SCHEMA y a la tabla COLUMNS.
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Copia directa a Snowflake
Si el almacén de datos y el formulario de origen cumplen los criterios descritos en esta sección, puede usar la actividad de copia para copiar directamente desde el origen a Snowflake. El servicio comprueba la configuración y no ejecuta la actividad de copia si no se cumplen los siguientes criterios:
Al especificar
storageIntegration
en el receptor:El almacén de datos de origen es la instancia de Azure Blob Storage a la que se hace referencia en la fase externa de Snowflake. Debes completar los pasos siguientes antes de copiar datos:
Crea un servicio vinculado de Azure Blob Storage para el origen de Azure Blob Storage con los tipos de autenticación admitidos.
Concede al menos el rol de Lector de datos de blobs de almacenamiento a la entidad de servicio de Snowflake en el origen de Azure Blob Storage Access Control (IAM).
Cuando no especifiques
storageIntegration
en el receptor:El servicio vinculado al origen es Azure Blob Storage con la autenticación de la firma de acceso de recurso compartido. Si quieres copiar los datos directamente desde Azure Data Lake Storage Gen2 en el siguiente formato admitido, puedes crear un servicio vinculado de Azure Blob Storage con la autenticación de SAS en la cuenta de Azure Data Lake Storage Gen2, para evitar el uso de una copia almacenada provisionalmente desde Snowflake.
El formato de datos de origen es Parquet, texto delimitado o JSON con estas configuraciones:
Para el formato Parquet, el códec de compresión es None o Snappy.
Para el formato de texto delimitado:
rowDelimiter
es \r\n o cualquier carácter individual. Si el delimitador de fila no es "\r\n",firstRowAsHeader
debe ser false yskipLineCount
no se especifica.compression
puede ser no compression, gzip, bzip2 o deflate.encodingName
se deja como valor predeterminado o se establece en "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".quoteChar
es double quote, single quote, o empty string (ningún carácter de comillas).
Para el formato JSON, la copia directa solo admite el caso en que la tabla Snowflake de receptor solo tiene una columna y el tipo de datos de esta columna es VARIANT, OBJECT o ARRAY.
compression
puede ser no compression, gzip, bzip2 o deflate.encodingName
se deja con el valor predeterminado o se establece en utf-8.- No se especifica la asignación de columnas.
En el origen de la actividad de copia:
additionalColumns
no se especifica.- Si el origen es una carpeta,
recursive
se establece en true. prefix
,modifiedDateTimeStart
,modifiedDateTimeEnd
yenablePartitionDiscovery
no se especifican.
Ejemplo:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Copia almacenada provisionalmente en Snowflake
Cuando el formato o el almacén de datos de origen no sea compatible de forma nativa con el comando COPY de Snowflake, como se mencionó en la última sección, habilite la copia preconfigurada integrada con una instancia intermedia de Azure Blob Storage. La característica de copia almacenada provisionalmente también proporciona un mejor rendimiento. El servicio convierte automáticamente los datos para satisfacer los requisitos del formato de datos de Snowflake. A continuación, invoca el comando COPY para cargar datos en Snowflake. Por último, limpia los datos temporales del almacenamiento de blobs. Consulte Copia almacenada provisionalmente para obtener más información sobre cómo copiar datos con el almacenamiento provisional.
Para utilizar esta característica, cree un servicio vinculado de Azure Blob Storage que haga referencia a la cuenta de Azure Storage como almacenamiento provisional temporal. Luego especifique las propiedades enableStaging
y stagingSettings
en la actividad de copia.
Al especificar
storageIntegration
en el receptor, el almacenamiento provisional de Azure Blob Storage debe ser el que se hace referencia en la fase externa de Snowflake. Asegúrate de crear un servicio vinculado de Azure Blob Storage con cualquier autenticación admitida y concede al menos el rol de Lector de datos de blobs de almacenamiento a la entidad de servicio de Snowflake en el almacenamiento provisional de Azure Blob Storage Access Control (IAM).Cuando no se especifica
storageIntegration
en el receptor, el servicio vinculado de almacenamiento provisional de Azure Blob Storage debe usar la autenticación de firma de acceso compartido, según lo requiera el comando COPY de Snowflake.
Ejemplo:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Propiedades de Asignación de instancias de Data Flow
Al transformar datos en un flujo de datos de asignación, puede leer y escribir en tablas de Snowflake. Para más información, vea la transformación de origen y la transformación de receptor en los flujos de datos de asignación. Puede optar por usar un conjunto de datos de Snowflake o un conjunto de datos en línea como tipo de origen y receptor.
Transformación de origen
En la tabla siguiente se indican las propiedades que admite el origen de Snowflake. Puede editar estas propiedades en la pestaña Opciones del origen. El conector emplea transferencia de datos interna de Snowflake.
Nombre | Descripción | Obligatorio | Valores permitidos | Propiedad de script de flujo de datos |
---|---|---|---|---|
Tabla | Si selecciona Tabla como entrada, el flujo de datos captura todos los datos de la tabla especificada en el conjunto de datos de Snowflake o en las opciones del origen al usar un conjunto de datos en línea. | No | String | (solo para conjunto de datos en línea) tableName schemaName |
Consultar | Si selecciona Consulta como entrada, escriba una consulta para capturar datos de Snowflake. Esta configuración invalida cualquier tabla que se haya elegido en el conjunto de datos. Si los nombres del esquema, la tabla y las columnas contienen minúsculas, indique el identificador de objeto en la consulta, por ejemplo, select * from "schema"."myTable" . |
No | String | Query |
Habilitar el extracto incremental (versión preliminar) | Use esta opción para indicar a ADF que solo procese las filas que hayan cambiado desde la última vez que se ejecutó la canalización. | No | Booleano | enableCdc |
Columna incremental | Si utiliza la función de extracción incremental, debe elegir la fecha, hora o columna numérica que desea utilizar como marca de agua en la tabla de origen. | No | String | waterMarkColumn |
Habilitar seguimiento de cambios de Snowflake (versión preliminar) | Esta opción permite a ADF aprovechar la tecnología de captura de datos modificados de Snowflake para procesar solo los datos delta desde la ejecución de la canalización anterior. Esta opción carga automáticamente los datos delta con operaciones de inserción, actualización y eliminación de filas y no se necesita ninguna columna incremental. | No | Booleano | enableNativeCdc |
Cambios netos | Si utiliza el seguimiento de cambios de Snowflake, puede usar esta opción para desduplicar las filas modificadas o cambios exhaustivos. Las filas modificadas desduplicadas mostrarán solo las versiones más recientes de las filas que han cambiado desde un momento dado, mientras que los cambios exhaustivos mostrarán todas las versiones de cada fila que haya cambiado, incluidas las que se han eliminado o actualizado. Por ejemplo, si actualiza una fila, verá una versión de eliminación y una versión de inserción en los cambios exhaustivos, pero solo verá la versión de inserción en la filas modificadas des duplicadas. Dependiendo del caso de uso, puede elegir la opción que se adapte a sus necesidades. La opción predeterminada es false, lo que significa cambios exhaustivos. | No | Booleano | netChanges |
Incluir columnas del sistema | Si utiliza el seguimiento de cambios de Snowflake, puede usar la opción systemColumns para controlar si las columnas de flujo de metadatos que proporciona Snowflake se incluyen o excluyen en la salida del seguimiento de cambios. De forma predeterminada, systemColumns se establece en true, lo que significa que se incluyen las columnas de flujo de metadatos. Si desea excluirlas, puede establecer systemColumns en false. | No | Booleano | systemColumns |
Empezar a leer desde el principio | Si se establece esta opción con el extracto incremental y el seguimiento de cambios, se indicará a ADF que lea todas las filas en la primera ejecución de una canalización con la extracción incremental activada. | No | Booleano | skipInitialLoad |
Ejemplos de script de origen de Snowflake
Cuando se usa un conjunto de datos de Snowflake como tipo de origen, el script de flujo de datos asociado es:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Si se usa un conjunto de datos en línea, el script de flujo de datos asociado es:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Seguimiento de cambios nativo
Azure Data Factory ahora admite una característica nativa en Snowflake, que se conoce como seguimiento de cambios, lo que implica el seguimiento de los cambios en forma de registros. Esta característica de Snowflake nos permite hacer un seguimiento de los cambios en los datos a lo largo del tiempo, lo que hace que sea útil para la carga y auditoría incrementales de los datos. Para usar esta característica, al habilitar Captura de datos modificados y seleccionar el seguimiento de cambios de Snowflake, creamos un objeto Stream para la tabla de origen que habilita el seguimiento de cambios en la tabla de Snowflake de origen. Posteriormente, usamos la cláusula CHANGES en nuestra consulta para capturar solo los datos nuevos o actualizados de la tabla de origen. Además, se recomienda programar la canalización de forma que los cambios se consuman en el intervalo de tiempo de retención de datos establecido para la tabla de origen de copo de nieve, de lo contrario, el usuario podría ver un comportamiento incoherente en los cambios capturados.
Transformación de receptor
En la tabla siguiente se indican las propiedades que admite el receptor de Snowflake. Puede editar estas propiedades en la pestaña Configuración. Al usar un conjunto de datos insertado, verá opciones adicionales, que son las mismas que las propiedades descritas en la sección Propiedades del conjunto de datos. El conector emplea transferencia de datos interna de Snowflake.
Nombre | Descripción | Obligatorio | Valores permitidos | Propiedad de script de flujo de datos |
---|---|---|---|---|
Método de actualización | Especifique qué operaciones se permiten en el destino de Snowflake. Para actualizar, upsert o eliminar filas, se requiere una transformación de alteración de fila a fin de etiquetar filas para esas acciones. |
Sí | true o false |
deletable insertable updateable upsertable |
Columnas de clave | En el caso de las actualizaciones, upserts y eliminaciones, se debe establecer una o varias columnas de clave para determinar la fila que se va a modificar. | No | Array | claves |
Acción Table | determina si se deben volver a crear o quitar todas las filas de la tabla de destino antes de escribir. - Ninguno: no se realizará ninguna acción en la tabla. - Volver a crear: se quitará la tabla y se volverá a crear. Obligatorio si se crea una nueva tabla dinámicamente. - Truncar: se quitarán todas las filas de la tabla de destino. |
No | true o false |
recreate truncate |
Ejemplos de script de receptor de Snowflake
Cuando se usa un conjunto de datos de Snowflake como tipo de receptor, el script de flujo de datos asociado es:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Si se usa un conjunto de datos en línea, el script de flujo de datos asociado es:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Optimización de la delegación de consultas
Al establecer el nivel de registro de la canalización en Ninguno, se excluye la transmisión de métricas de transformación intermedias, lo que evita que puedan aparecer obstáculos en las optimizaciones de Spark y habilita la optimización de la delegación de consultas proporcionada por Snowflake. Esta optimización permite realizar mejoras de rendimiento considerables en tablas de Snowflake de gran tamaño con grandes conjuntos de datos.
Nota:
En Snowflake no se admiten tablas temporales, ya que son locales para la sesión o el usuario que los crea, lo que hace que otras sesiones no puedan acceder a ellas y que sean propensas a que Snowflake las sobrescriba como tablas normales. Aunque Snowflake ofrece la alternativa de las tablas transitorias, a las que se puede acceder globalmente, requieren la eliminación manual, lo que va en contra de nuestro objetivo principal cuando usamos tablas temporales, que es evitar las operaciones de eliminación en el esquema de origen.
Propiedades de la actividad de búsqueda
Para más información sobre las propiedades, consulte Actividad de búsqueda.
Actualización del conector de Snowflake
Para actualizar el conector de Snowflake, puede realizar una actualización en paralelo o una actualización local.
Actualización en paralelo
Para realizar una actualización en paralelo, complete los pasos siguientes:
- Cree un servicio vinculado de Snowflake y configúrelo consultando las propiedades del servicio vinculado.
- Cree un conjunto de datos basado en el servicio vinculado de Snowflake recién creado.
- Reemplace el nuevo servicio vinculado y el conjunto de datos por los existentes en las canalizaciones que tiene como destino los objetos heredados.
Actualización local
Para realizar una actualización local, debe editar la carga del servicio vinculado existente y actualizar el conjunto de datos para usar el nuevo servicio vinculado.
Actualice el tipo de Snowflake a SnowflakeV2.
Modifique la carga del servicio vinculado desde su formato heredado al nuevo patrón. Puede rellenar cada campo desde la interfaz de usuario después de cambiar el tipo mencionado anteriormente o actualizar la carga directamente a través del Editor JSON. Consulte la sección Propiedades del servicio vinculado de este artículo para conocer las propiedades de conexión admitidas. En los siguientes ejemplos se muestran las diferencias en la carga de los servicios vinculados heredados y nuevos de Snowflake:
Carga JSON del servicio vinculado de Snowflake heredado:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Carga JSON del servicio vinculado de Snowflake nuevo:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Actualice el conjunto de datos para usar el nuevo servicio vinculado. Puede crear un nuevo conjunto de datos basado en el servicio vinculado recién creado o actualizar la propiedad tipo de un conjunto de datos existente de SnowflakeTable a SnowflakeV2Table.
Diferencias entre Snowflake y Snowflake (heredado)
El conector Snowflake ofrece nuevas funcionalidades y es compatible con la mayoría de las funciones del conector Snowflake (heredado). La siguiente tabla muestra las diferencias entre Snowflake y Snowflake (heredado).
Snowflake | Snowflake (heredado) |
---|---|
Compatibilidad con autenticación básica y por par de claves. | Compatibilidad con autenticación básica. |
Actualmente no se admiten parámetros de script en la actividad Script. Como alternativa, utilice expresiones dinámicas para los parámetros de script. Para obtener más información, consulte Expresiones y funciones en Azure Data Factory y Azure Synapse Analytics. | Compatibilidad con parámetros de script en la actividad script. |
Compatibilidad con BigDecimal en la actividad de Búsqueda. El tipo de NÚMERO, tal y como se define en Snowflake, se mostrará como una cadena en la actividad de Búsqueda. | BigDecimal no es compatible con la actividad de Búsqueda. |
La propiedad heredada connectionstring está en desuso en favor de los parámetros necesarios Cuenta, Almacén, Base de datos, Esquemay Función |
En el conector de Snowflake heredado, la propiedad connectionstring se usó para establecer una conexión. |
Contenido relacionado
Consulte los formatos y almacenes de datos compatibles para ver una lista de los almacenes de datos que la actividad de copia admite como orígenes y receptores.