你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
CREATE EXTERNAL STREAM (Transact-SQL)
重要
Azure SQL Edge 将于 2025 年 9 月 30 日停用。 有关详细信息和迁移选项,请参阅停用通知。
注意
Azure SQL Edge 不再支持 ARM64 平台。
EXTERNAL STREAM 对象的输入和输出流都具有双重目的。 它可用作从事件引入服务(如 Azure 事件中心、Azure IoT 中心[或 Edge 中心]或 Kafka)查询流式处理数据的输入,也可用作输出来指定在何处以及如何存储流式处理查询的结果。
此外,还可以指定 EXTERNAL STREAM,并将其创建为事件中心或 Blob 存储等服务的输出和输入。 这适用于链接场景:一个流式处理查询将结果作为输出保存到外部流,另一个流式处理查询从相同的外部流读取结果作为输入。
Azure SQL Edge 目前仅支持以下数据源作为流输入和输出。
数据源类型 | 输入 | 输出 | 说明 |
---|---|---|---|
Azure IoT Edge 中心 | Y | Y | 将流式处理数据读/写到 Azure IoT Edge 中心的数据源。 有关详细信息,请参阅 IoT Edge 中心。 |
SQL 数据库 | N | Y | 将流式处理数据写入 SQL 数据库的数据源连接。 数据库可以是 Azure SQL Edge 中的本地数据库,也可以是 SQL Server 或 Azure SQL 数据库中的远程数据库。 |
Kafka | Y | N | 从 Kafka 主题读取流式处理数据的数据源。 |
语法
CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH ( <with_options> )
<column_definition> ::=
column_name <column_data_type>
<data_type> ::=
[ type_schema_name . ] type_name
[ ( precision [ , scale ] | max ) ]
<with_options> ::=
DATA_SOURCE = data_source_name ,
LOCATION = location_name ,
[ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
[ <optional_input_options> ] ,
[ <optional_output_options> ] ,
TAGS = <tag_column_value>
<optional_input_options> ::=
INPUT_OPTIONS = ' [ <input_options_data> ] '
<Input_option_data> ::=
<input_option_values> [ , <input_option_values> ]
<input_option_values> ::=
PARTITIONS: [ number_of_partitions ]
| CONSUMER_GROUP: [ consumer_group_name ]
| TIME_POLICY: [ time_policy ]
| LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
| OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]
<optional_output_options> ::=
OUTPUT_OPTIONS = ' [ <output_option_data> ] '
<output_option_data> ::=
<output_option_values> [ , <output_option_values> ]
<output_option_values> ::=
REJECT_POLICY: [ reject_policy ]
| MINIMUM_ROWS: [ row_value ]
| MAXIMUM_TIME: [ time_value_minutes ]
| PARTITION_KEY_COLUMN: [ partition_key_column_name ]
| PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| PARTITION_KEY: [ partition_key_name ]
| ROW_KEY: [ row_key_name ]
| BATCH_SIZE: [ batch_size_value ]
| MAXIMUM_BATCH_COUNT: [ batch_value ]
| STAGING_AREA: [ blob_data_source ]
<tag_column_value> ::= -- Reserved for Future Usage
);
参数
DATA_SOURCE
有关详细信息,请参阅 DATA_SOURCE。
FILE_FORMAT
有关详细信息,请参阅 FILE_FORMAT。
LOCATION
指定数据源中的实际数据或位置的名称。
- 对于 Edge 中心或 Kafka 流对象,location 指定要从中读取或向其写入的 Edge 中心或 Kafka 主题的名称。
- 对于 SQL 流对象(SQL Server、Azure SQL 数据库或 Azure SQL Edge),location 指定表的名称。 如果在与目标表相同的数据库和架构中创建流,则仅提供表名就够了。 否则,需要完全限定表名 (
<database_name>.<schema_name>.<table_name>
)。 - 对于 Azure Blob 存储流对象,location 指的是要在 blob 容器内使用的路径模式。 有关详细信息,请参阅来自 Azure 流分析的输出。
INPUT_OPTIONS
将选项指定为 Kafka 和 IoT Edge 中心等服务的键值对,这些服务是流式处理查询的输入。
PARTITIONS:
为主题定义的分区数。 可以使用的最大分区数限制为 32(适用于 Kafka 输入流)。
CONSUMER_GROUP:
事件中心和 IoT 中心限制一个使用者组中的读者数量(最多 5 个)。 将此字段留空将使用“$Default”使用者组。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
TIME_POLICY:
描述当延迟事件或无序事件超过其容错值时,是删除事件还是调整事件时间。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
LATE_EVENT_TOLERANCE:
最大可接受的时间延迟。 延迟表示事件时间戳与系统时钟之间的差异。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
OUT_OF_ORDER_EVENT_TOLERANCE:
事件在经历从输入到流式处理查询的行程后,可能会乱序。 可以按原样接受这些事件,也可以选择在设定的时间段内暂停以对它们重新排序。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
OUTPUT_OPTIONS
将选项指定为支持服务(此服务作为流式处理查询输出)的键值对
REJECT_POLICY:DROP | RETRY
在出现数据转换错误时,指定数据错误处理策略。
- 适用于所有支持的输出。
MINIMUM_ROWS:
写入输出时每批所需的最小行数。 对于 Parquet,每个批处理都将创建一个新文件。
- 适用于所有支持的输出。
MAXIMUM_TIME:
每批的最长等待时间(分钟)。 在此时间后,即使不满足最小行数的要求,也会将该批写入输出。
- 适用于所有支持的输出。
PARTITION_KEY_COLUMN:
该列用于分区键。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
PROPERTY_COLUMNS:
要作为自定义属性(如果提供)附加到消息的输出列的列名称逗号分隔列表。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
SYSTEM_PROPERTY_COLUMNS:
要在服务总线消息上填充的系统属性名称和输出列的名称/值对的 JSON 格式集合。 例如
{ "MessageId": "column1", "PartitionKey": "column2" }
。- 保留以供将来使用。 不适用于 Azure SQL Edge。
PARTITION_KEY:
包含分区键的输出列的名称。 分区键是给某个定表中分区的唯一标识,分区键构成了实体主键的第一部分。 它是最大大小为 1 KB 的字符串值。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
ROW_KEY:
包含行键的输出列的名称。 行键是某个给定分区中实体的唯一标识符。 行键构成了实体主键的第二部分。 行键是一个最大为 1 KB 的字符串值。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
BATCH_SIZE:
这表示最大可达 100 条记录的表存储的事务数。 对于 Azure Functions,这表示每次调用时发送到函数的批大小(以字节为单位)- 默认值为 256 kB。
- 保留以供将来使用。 不适用于 Azure SQL Edge。
MAXIMUM_BATCH_COUNT:
每次调用 Azure 函数发送给该函数的最大事件数 - 默认值为 100。 对于 SQL 数据库,这表示随每个批量插入事务一起发送的记录数上限 - 默认值为 10,000。
- 适用于所有基于 SQL 的输出
STAGING_AREA:Blob 存储的外部数据源对象
用于将高吞吐量数据引入到 Azure Synapse Analytics 的临时区域
- 保留以供将来使用。 不适用于 Azure SQL Edge。
若要详细了解与数据源类型对应的受支持的输入和输出选项,请参阅 Azure 流分析 - 输入概述和 Azure 流分析 - 输出概述。
示例
示例 A:EdgeHub
类型:输入或输出。
CREATE EXTERNAL DATA SOURCE MyEdgeHub
WITH (LOCATION = 'edgehub://');
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (FORMAT_TYPE = JSON);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyEdgeHub,
FILE_FORMAT = myFileFormat,
LOCATION = '<mytopicname>',
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
示例 B:Azure SQL 数据库、Azure SQL Edge、SQL Server
键入:输出
CREATE DATABASE SCOPED CREDENTIAL SQLCredName
WITH IDENTITY = '<user>',
SECRET = '<password>';
-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = '<my_server_name>.database.windows.net',
CREDENTIAL = SQLCredName
);
--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = ' <sqlserver://<ipaddress>,<port>',
CREDENTIAL = SQLCredName
);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyTargetSQLTable,
LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
--Note: If table is contained in the database, <TableName> should be sufficient
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
示例 C:Kafka
键入:输入
CREATE EXTERNAL DATA SOURCE MyKafka_tweets
WITH (
--The location maps to KafkaBootstrapServer
LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
CREDENTIAL = kafkaCredName
);
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (
FORMAT_TYPE = JSON,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
);
CREATE EXTERNAL STREAM Stream_A (
user_id VARCHAR,
tweet VARCHAR
)
WITH (
DATA_SOURCE = MyKafka_tweets,
LOCATION = '<KafkaTopicName>',
FILE_FORMAT = myFileFormat,
INPUT_OPTIONS = 'PARTITIONS: 5'
);