你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Azure 数据工厂或 Azure Synapse Analytics 在 Snowflake 中复制和转换数据

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

本文概述了如何使用 Azure 数据工厂和 Azure Synapse 管道中的复制活动从/向 Snowflake 复制数据,并使用数据流转换 Snowflake 中的数据。 有关详细信息,请参阅数据工厂Azure Synapse Analytics 的简介文章。

重要

新的 Snowflake 连接器提供改进的本机 Snowflake 支持。 如果你在解决方案中使用了旧的 Snowflake 连接器,且该连接器仅“按原样”支持后向兼容性,请参阅 Snowflake 连接器(旧版)一文。

支持的功能

此 Snowflake 连接器支持以下功能:

支持的功能 IR
复制活动(源/接收器) ① ②
映射数据流源(源/接收器)
Lookup 活动 ① ②
脚本活动 ① ②

① Azure 集成运行时 ② 自承载集成运行时

对于复制活动,此 Snowflake 连接器支持以下功能:

  • 从 Snowflake 复制数据:利用 Snowflake 的 COPY into [location] 命令实现最佳性能。
  • 将数据复制到 Snowflake 中:利用 Snowflake 的 COPY into [table] 命令实现最佳性能。 它支持 Azure 上的 Snowflake。
  • 如果需要一个代理,以便从自承载 Integration Runtime 连接到 Snowflake,则必须在 Integration Runtime 主机上为 HTTP_PROXY 和 HTTPS_PROXY 配置环境变量。

先决条件

如果数据存储位于本地网络、Azure 虚拟网络或 Amazon Virtual Private Cloud 内部,则需要配置自承载集成运行时才能连接到该数据存储。 确保将自承载集成运行时使用的 IP 地址添加到允许列表中。

如果数据存储是托管的云数据服务,则可以使用 Azure Integration Runtime。 如果访问范围限制为防火墙规则中允许的 IP,你可以将 Azure Integration Runtime IP 添加到允许的列表。

用于源或接收器的 Snowflake 帐户应具有对数据库的必要 USAGE 访问权限以及对架构及其下的表/视图的读/写访问权限。 此外,它还应该在架构上具有 CREATE STAGE,以便能够使用 SAS URI 创建外部阶段。

必须设置以下帐户属性值

属性 描述 需要 默认
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION 指定在创建命名的外部阶段(使用 CREATE STAGE)访问私有云存储位置时是否需要存储集成对象作为云凭证。 FALSE FALSE
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION 指定在从私有云存储位置加载数据或将数据卸载到私有云存储位置时,是否需要使用引用存储集成对象作为云凭证的命名外部阶段。 FALSE FALSE

要详细了解网络安全机制和数据工厂支持的选项,请参阅数据访问策略

入门

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:

使用 UI 创建到 Snowflake 的链接服务

使用以下步骤在 Azure 门户 UI 中创建一个到 Snowflake 的链接服务。

  1. 浏览到 Azure 数据工厂或 Synapse 工作区中的“管理”选项卡并选择“链接服务”,然后单击“新建”:

  2. 搜索“Snowflake”并选择“Snowflake 连接器”。

    Snowflake 连接器的屏幕截图。

  3. 配置服务详细信息、测试连接并创建新的链接服务。

    Snowflake 的链接服务配置的屏幕截图。

连接器配置详细信息

以下部分详细介绍了用来定义特定于 Snowflake 连接器的实体的属性。

链接服务属性

Snowflake 链接服务支持以下通用属性:

properties 描述 必需
type type 属性必须设置为“SnowflakeV2”
accountIdentifier 帐户的名称及其组织。 例如,myorg-account123。
database 连接后用于会话的默认数据库。
warehouse 连接后用于会话的默认虚拟仓库。
authenticationType 用于连接到 Snowflake 服务的身份验证类型。 允许的值为:Basic(默认)和 KeyPair。 有关其他属性和示例,请参阅下面的相应部分。
role 连接后用于会话的默认安全角色。
connectVia 用于连接到数据存储的 Integration Runtime。 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。 如果未指定,则使用默认 Azure Integration Runtime。

此 Snowflake 连接器支持以下身份验证类型。 有关详细信息,请参阅相应部分。

基本身份验证

要使用基本身份验证,除了上一部分中所述的通用属性外,还需要指定以下属性:

properties 描述 必须
user Snowflake 用户的登录名。
password Snowflake 用户的密码。 将此字段标记为 SecureString 类型以将其安全存储。 此外,还可以引用 Azure Key Vault 中存储的机密

示例:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Azure 密钥保管库中的密码:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

注意

映射数据流仅支持基本身份验证。

密钥对身份验证

若要使用密钥对身份验证,需要参考密钥对身份验证和密钥对轮换在 Snowflake 中配置和创建密钥对身份验证用户。 之后,请记下用于定义链接服务的私钥和通行短语(可选)。

除了前面部分所述的通用属性,还指定以下属性:

属性 描述 必须
user Snowflake 用户的登录名。
privateKey 用于密钥对身份验证的私钥。

为了确保私钥在发送到 Azure 数据工厂时有效,并且考虑到 privateKey 文件包含换行符 (\n),必须以字符串字面量形式正确设置 privateKey 内容的格式。 此过程涉及向每个换行符显式添加 \n。
privateKeyPassphrase 用于解密私钥的通行短语(如果已加密)。

示例:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "KeyPair",
            "user": "<username>",
            "privateKey": {
                "type": "SecureString",
                "value": "<privateKey>"
            },
            "privateKeyPassphrase": { 
                "type": "SecureString",
                "value": "<privateKeyPassphrase>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

数据集属性

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集一文。

Snowflake 数据集支持以下属性。

属性 描述 必需
type 数据集的 type 属性必须设置为 SnowflakeV2Table
schema 架构的名称。 请注意,架构名称区分大小写。 对于源为“否”,对于接收器为“是”
表/视图的名称。 请注意,表名称区分大小写。 对于源为“否”,对于接收器为“是”

示例:

{
    "name": "SnowflakeV2Dataset",
    "properties": {
        "type": "SnowflakeV2Table",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

复制活动属性

有关可用于定义活动的各部分和属性的完整列表,请参阅管道一文。 本部分提供 Snowflake 源和接收器支持的属性列表。

以 Snowflake 作为源

Snowflake 连接器利用 Snowflake 的 COPY into [location] 命令实现最佳性能。

如果 Snowflake 的 COPY 命令以本机方式支持接收器数据存储和格式,则可使用复制活动将数据从 Snowflake 直接复制到接收器。 有关详细信息,请参阅从 Snowflake 进行的直接复制。 否则,请使用内置的从 Snowflake 进行的暂存复制

从 Snowflake 复制数据时,复制活动的“源”部分支持以下属性。

属性 描述 必需
type 复制活动源的类型属性必须设置为 SnowflakeV2Source
查询 指定要从 Snowflake 读取数据的 SQL 查询。 如果架构、表和列的名称包含小写字母,请在查询中引用对象标识符,例如 select * from "schema"."myTable"
不支持执行存储过程。
exportSettings 用于从 Snowflake 检索数据的高级设置。 可以配置 COPY into 命令支持的此类设置。在调用相关语句时,该服务会传递此类设置。
exportSettings
type 导出命令的类型,设置为 SnowflakeExportCopyCommand
additionalCopyOptions 其他复制选项,作为键值对的字典提供。 示例:MAX_FILE_SIZE、OVERWRITE。 有关详细信息,请参阅 Snowflake 复制选项
additionalFormatOptions 作为键值对的字典提供给 COPY 命令的其他文件格式选项。 示例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 有关详细信息,请参阅 Snowflake 格式类型选项

注意

请确保你有权执行以下命令并访问架构 INFORMATION_SCHEMA 和表 COLUMNS。

  • COPY INTO <location>

从 Snowflake 进行的直接复制

如果接收器数据存储和格式符合此部分所述条件,则可使用复制活动将数据从 Snowflake 直接复制到接收器。 该服务将检查设置,如果不符合以下条件,复制活动运行将会失败:

  • “接收器链接服务”是使用“共享访问签名”身份验证的 Azure Blob 存储 。 若要采用下面受支持的格式将数据直接复制到 Azure Data Lake Storage Gen2,可以创建带有针对 ADLS Gen2 帐户的 SAS 身份验证功能的 Azure Blob 链接服务,从而避免使用从 Snowflake 进行的分阶段复制

  • 接收器数据格式为“Parquet”、“带分隔符的文本”或“JSON”,其配置如下 :

    • 对于“Parquet”格式,压缩编解码器为“无”、“Snappy”或或“Lzo”。
    • 对于“带分隔符的文本”格式:
      • rowDelimiter\r\n 或任何单个字符。
      • compression 可为“无压缩”、 gzipbzip2deflate
      • encodingName 保留为默认值或设置为 utf-8
      • quoteChar 为双引号、单引号或空字符串(无引号字符) 。
    • 对于“JSON”格式,直接复制只支持以下情况:源 Snowflake 表或查询结果仅有一列且该列的数据类型是“VARIANT”、“OBJECT”或“ARRAY” 。
      • compression 可为“无压缩”、 gzipbzip2deflate
      • encodingName 保留为默认值或设置为 utf-8
      • filePattern 在复制活动接收器中保留为默认值或设置为“setOfObjects”。
  • 在复制活动源中,additionalColumns 未指定。

  • 列映射未指定。

示例:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",
                "sqlReaderQuery": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    }
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

从 Snowflake 进行暂存复制

如果接收器数据存储或格式与上一部分所述的 Snowflake COPY 命令并非以本机方式兼容,请通过临时的 Azure Blob 存储实例启用内置暂存复制。 暂存复制功能也能提供更高的吞吐量。 该服务将数据从 Snowflake 导出到临时存储,然后将数据复制到接收器,最后从临时存储中清除临时数据。 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制

若要使用此功能,请创建一个引用 Azure 存储帐户作为临时暂存位置的 Azure Blob 存储链接服务。 然后,在复制活动中指定 enableStagingstagingSettings 属性。

注意

暂存 Azure Blob 存储链接服务必须使用 Snowflake 的 COPY 命令所需的共享访问签名身份验证。 确保在暂存 Azure Blob 存储中授予对 Snowflake 的适当访问权限。 要了解详细信息,请参阅本文

示例:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",               
                "sqlReaderQuery": "SELECT * FROM MyTable",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand"
                }
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

以 Snowflake 作为接收器

Snowflake 连接器利用 Snowflake 的 COPY into [table] 命令实现最佳性能。 它支持将数据写入 Azure 上的 Snowflake。

如果 Snowflake 的 COPY 命令以本机方式支持源数据存储和格式,则可使用复制活动将数据从源直接复制到 Snowflake。 有关详细信息,请参阅直接复制到 Snowflake。 否则,请使用内置的暂存复制到 Snowflake

若要将数据复制到 Snowflake,复制活动的“接收器”部分需要支持以下属性。

属性 描述 必需
type 复制活动接收器的类型属性设置为 SnowflakeV2Sink
preCopyScript 指定在每次运行中将数据写入到 Snowflake 之前要由复制活动运行的 SQL 查询。 使用此属性清理预加载的数据。
importSettings 用于将数据写入 Snowflake 的高级设置。 可以配置 COPY into 命令支持的此类设置。在调用相关语句时,该服务会传递此类设置。
importSettings
type 导入命令的类型,设置为 SnowflakeImportCopyCommand
additionalCopyOptions 其他复制选项,作为键值对的字典提供。 示例:ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。 有关详细信息,请参阅 Snowflake 复制选项
additionalFormatOptions 提供给 COPY 命令的其他文件格式选项,作为键值对的字典提供。 示例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 有关详细信息,请参阅 Snowflake 格式类型选项

注意

请确保你有权执行以下命令并访问架构 INFORMATION_SCHEMA 和表 COLUMNS。

  • SELECT CURRENT_REGION()
  • COPY INTO <table>
  • SHOW REGIONS
  • CREATE OR REPLACE STAGE
  • DROP STAGE

直接复制到 Snowflake

如果源数据存储和格式符合此部分所述条件,则可使用复制活动将数据从源直接复制到 Snowflake。 该服务将检查设置,如果不符合以下条件,复制活动运行将会失败:

  • “源链接服务”是使用“共享访问签名”身份验证的 Azure Blob 存储 。 若要以下面受支持的格式从 Azure Data Lake Storage Gen2 直接复制数据,可以创建带有针对 ADLS Gen2 帐户的 SAS 身份验证功能的 Azure Blob 链接服务,从而避免使用到 Snowflake 的分阶段复制

  • “源数据格式”为“Parquet”、“带分隔符的文本”或“JSON”,其配置如下 :

    • 对于“Parquet”格式,压缩编解码器为“无”或“Snappy”。

    • 对于“带分隔符的文本”格式:

      • rowDelimiter\r\n 或任何单个字符。 如果行分隔符不是“\r\n”,则需将 firstRowAsHeader 设置为 false,且不指定 skipLineCount
      • compression 可为“无压缩”、 gzipbzip2deflate
      • encodingName 保留为默认值或设置为“UTF-8”、“UTF-16”、“UTF-16BE”、“UTF-32”、“UTF-32BE”、“BIG5”、“EUC-JP”、“EUC-KR”、“GB18030”、“ISO-2022-JP”、“ISO-2022-KR”、“ISO-8859-1”、“ISO-8859-2”、“ISO-8859-5”、“ISO-8859-6”、“ISO-8859-7”、“ISO-8859-8”、“ISO-8859-9”、“WINDOWS-1250”、“WINDOWS-1251”、“WINDOWS-1252”、“WINDOWS-1253”、“WINDOWS-1254”、“WINDOWS-1255”。
      • quoteChar 为双引号、单引号或空字符串(无引号字符) 。
    • 对于“JSON”格式,直接复制只支持以下情况:接收器 Snowflake 表仅有一列且该列的数据类型是“VARIANT”、“OBJECT”或“ARRAY” 。

      • compression 可为“无压缩”、 gzipbzip2deflate
      • encodingName 保留为默认值或设置为 utf-8
      • 列映射未指定。
  • 在复制活动源中:

    • 未指定 additionalColumns
    • 如果源为文件夹,则将 recursive 设置为 true。
    • prefixmodifiedDateTimeStartmodifiedDateTimeEndenablePartitionDiscovery 未指定。

示例:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE"
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD"
                    }
                }
            }
        }
    }
]

暂存复制到 Snowflake

如果源数据存储或格式与上一部分所述的 Snowflake COPY 命令并非以本机方式兼容,请通过临时的 Azure Blob 存储实例启用内置暂存复制。 暂存复制功能也能提供更高的吞吐量。 该服务会自动转换数据,以满足 Snowflake 的数据格式要求。 然后,它会调用 COPY 命令将数据载入 Snowflake。 最后,它会从 Blob 存储中清理临时数据。 若要详细了解如何通过暂存方式复制数据,请参阅暂存复制

若要使用此功能,请创建一个引用 Azure 存储帐户作为临时暂存位置的 Azure Blob 存储链接服务。 然后,在复制活动中指定 enableStagingstagingSettings 属性。

注意

暂存 Azure Blob 存储链接服务需要使用 Snowflake 的 COPY 命令所需的共享访问签名身份验证。

示例:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand"
                }
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

映射数据流属性

在映射数据流中转换数据时,可以从 Snowflake 中的表读取数据以及将数据写入表中。 有关详细信息,请参阅映射数据流中的源转换接收器转换。 可以选择使用 Snowflake 数据集或内联数据集作为源和接收器类型。

源转换

下表列出了 Snowflake 源支持的属性。 你可以在“源选项”选项卡中编辑这些属性。该连接器利用 Snowflake 内部数据传输

名称 说明 必需 允许的值 数据流脚本属性
如果选择“表”作为输入,则在使用内联数据集时,数据流会从在 Snowflake 数据集或源选项中指定的表中获取所有数据。 字符串 (仅适用于内联数据集)
tableName
schemaName
查询 如果选择“查询”作为输入,请输入用于从 Snowflake 中提取数据的查询。 此设置会替代在数据集中选择的任何表。
如果架构、表和列的名称包含小写字母,请在查询中引用对象标识符,例如 select * from "schema"."myTable"
字符串 查询
启用增量提取(预览) 使用此选项告知 ADF 仅处理自上次执行管道以来已更改的行。 布尔 enableCdc
增量列 使用增量提取功能时,必须选择要用作源表中水印的日期/时间/数字列。 字符串 waterMarkColumn
启用 Snowflake 更改跟踪(预览版) 此选项使 ADF 能够利用 Snowflake 变更数据捕获技术仅处理自上一个管道执行以来的增量数据。 此选项通过行插入、更新和删除操作自动加载增量数据,而无需任何增量列。 布尔 enableNativeCdc
净更改 使用 Snowflake 更改跟踪时,可以使用此选项获取重复更改的行或详尽的更改。 已删除重复数据的更改行将仅显示自给定时间点以来已更改的行的最新版本,而详尽的更改将显示已更改的每行的所有版本,包括已删除或更新的版本。 例如,如果更新行,你将在详尽更改中看到删除版本和插入版本,但在已删除重复数据的更改行中只会看到插入版本。 根据你的用例,你可以选择适合你的需求的选项。 默认选项为 false,这意味着详尽的更改。 布尔 netChanges
包括系统列 使用 snowflake 更改跟踪时,可以使用 systemColumns 选项来控制是将 Snowflake 提供的元数据流列包括在更改跟踪输出中还是排除。 默认情况下,systemColumns 设置为 true,这意味着包含元数据流列。 如果要排除 systemColumns,可以将它们设置为 false。 布尔 systemColumns
从头开始读取 使用增量提取和更改跟踪设置此选项后,将指示 ADF 在首次执行具有增量提取的管道时读取所有行。 布尔 skipInitialLoad

Snowflake 源脚本示例

使用 Snowflake 数据集作为源类型时,关联的数据流脚本为:

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

如果使用内联数据集,则关联的数据流脚本为:

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

本机更改跟踪

Azure 数据工厂现在支持 Snowflake 中称为更改跟踪的本机功能,该功能涉及以日志形式跟踪更改。 Snowflake 的此功能允许我们跟踪数据随时间推移的变化,使其可用于增量数据加载和审核目的。 为了利用此功能,当你启用变更数据捕获并选择 Snowflake 更改跟踪时,我们将为源表创建一个 Stream 对象,该对象在源 Snowflake 表上启用更改跟踪。 随后,我们在查询中使用 CHANGES 子句从源表中仅提取新的或更新的数据。 此外,建议计划管道,以便在 Snowflake 源表的数据保留时间设置的时间间隔内使用更改,否则用户可能会在捕获的更改中看到不一致的行为。

接收器转换

下表列出了 Snowflake 接收器支持的属性。 可以在“设置”选项卡中编辑这些属性。使用内联数据集时,你会看到其他设置,这些设置与数据集属性部分所述的属性相同。 该连接器利用 Snowflake 内部数据传输

名称 说明 必需 允许的值 数据流脚本属性
Update 方法 指定 Snowflake 目标上允许哪些操作。
若要更新、更新插入或删除行,需要进行“更改行”转换才能标记这些操作的行。
truefalse deletable
insertable
updateable
upsertable
键列 对于更新、更新插入和删除操作,必须设置一个或多个键列,以确定要更改的行。 Array 密钥
表操作 确定在写入之前是否从目标表重新创建或删除所有行。
- :不会对表进行任何操作。
- 重新创建:将删除表并重新创建表。 如果以动态方式创建表,则是必需的。
- 截断:将删除目标表中的所有行。
truefalse recreate
truncate

Snowflake 接收器脚本示例

使用 Snowflake 数据集作为接收器类型时,关联的数据流脚本为:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

如果使用内联数据集,则关联的数据流脚本为:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

查询下推优化

通过将管道日志记录级别设置为“无”,我们排除了中间转换指标的传输,防止 Spark 优化的潜在障碍,并启用 Snowflake 提供的查询下推优化。 这种下推优化为具有大量数据集的大型 Snowflake 表提供了实质性的性能增强。

注意

我们不支持 Snowflake 中的临时表,因为它们是会话或创建它们的用户的本地表,因此其他会话无法访问它们,并且容易被 Snowflake 覆盖为常规表。 虽然Snowflake提供了可全局访问的临时表作为替代方案,但它们需要手动删除,这与我们使用临时表的主要目标(即避免在源架构中进行任何删除操作)相矛盾。

查找活动属性

有关属性的详细信息,请参阅查找活动

升级 Snowflake 链接服务

若要升级 Snowflake 链接服务,请创建新的 Snowflake 链接服务,并参考链接服务属性对其进行配置。

有关复制活动支持作为源和接收器的数据存储的列表,请参阅支持的数据存储和格式