CREATE EXTERNAL STREAM (Transact-SQL)

重要

Azure SQL Edge 不再支援 ARM64 平臺。

EXTERNAL STREAM 物件具有輸入和輸出資料流程的雙重用途。 它可以作為輸入,從事件擷取服務查詢串流資料,例如Azure 事件中樞、Azure IoT 中樞(或 Edge Hub)或 Kafka,也可以作為輸出來指定串流查詢結果的位置和方式。

您也可以指定 EXTERNAL STREAM,並將其建立為事件中樞或 Blob 儲存體等服務的輸出和輸入。 這可協助鏈結串流查詢將結果保存至外部資料流作為輸出,以及從與輸入相同的外部資料流讀取的另一個串流查詢。

Azure SQL Edge 目前僅支援下列資料來源作為資料流程輸入和輸出。

資料來源類型 輸入 輸出 描述
Azure IoT Edge 中樞 Y Y 用來讀取和寫入串流資料的資料來源至 Azure IoT Edge 中樞。 如需詳細資訊,請參閱 IoT Edge 中樞
SQL Database 要寫入串流資料的資料來源連線至SQL 資料庫。 資料庫可以是 Azure SQL Edge 中的本機資料庫,或是 SQL Server 或 Azure SQL 資料庫中的遠端資料庫。
Kafka 從 Kafka 主題讀取串流資料的資料來源。

語法

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

引數

DATA_SOURCE

如需詳細資訊,請參閱 DATA_SOURCE

FILE_FORMAT

如需詳細資訊,請參閱 FILE_FORMAT

LOCATION

指定資料來源中實際資料或位置的名稱。

  • 針對 Edge Hub 或 Kafka 串流物件,位置會指定要讀取或寫入的 Edge Hub 或 Kafka 主題名稱。
  • 針對 SQL 資料流程物件 (SQL Server、Azure SQL 資料庫 或 Azure SQL Edge),位置會指定資料表的名稱。 如果在與目的地資料表相同的資料庫和架構中建立資料流程,則只有資料表名稱就已足夠。 否則,您必須完整限定資料表名稱 ( <database_name>.<schema_name>.<table_name> )。
  • 對於Azure Blob 儲存體資料流程物件位置是指 Blob 容器內要使用的路徑模式。 如需詳細資訊,請參閱 Azure 串流分析的 輸出。

INPUT_OPTIONS

指定選項做為 Kafka 和 IoT Edge 中樞等服務的索引鍵/值組,這些服務是串流查詢的輸入。

  • 分區:

    為主題定義的分割區數目。 可以使用的資料分割數目上限限制為 32 個(適用于 Kafka 輸入資料流程)。

    • CONSUMER_GROUP:

      事件和IoT 中樞會將一個取用者群組內的讀取器數目限制為 5。 將此欄位保留空白將會使用 「$Default」取用者群組。

      • 保留供日後使用。 不適用於 Azure SQL Edge。
    • TIME_POLICY:

      描述卸載事件或調整延遲事件或順序錯亂事件傳遞其容錯值時的事件時間。

      • 保留供日後使用。 不適用於 Azure SQL Edge。
    • LATE_EVENT_TOLERANCE:

      可接受的時間延遲上限。 延遲代表事件時間戳記與系統時鐘之間的差異。

      • 保留供日後使用。 不適用於 Azure SQL Edge。
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      從輸入到串流查詢之後,事件可能會不依序抵達。 這些事件可以依目前方式接受,或者您可以選擇暫停一段設定期間來重新排序事件。

      • 保留供日後使用。 不適用於 Azure SQL Edge。

OUTPUT_OPTIONS

針對輸出至串流查詢的支援服務,將選項指定為索引鍵/值組

  • REJECT_POLICY:DROP |重試

    當發生資料轉換錯誤時,請指定資料錯誤處理原則。

    • 適用于所有支援的輸出。
  • MINIMUM_ROWS:

    寫入輸出的每個批次所需的最小資料列。 針對 Parquet,每個批次都會建立新的檔案。

    • 適用于所有支援的輸出。
  • MAXIMUM_TIME:

    每個批次的等候時間上限,以分鐘為單位。 在這段時間之後,即使不符合最低資料列需求,批次仍會寫入輸出。

    • 適用于所有支援的輸出。
  • PARTITION_KEY_COLUMN:

    用於資料分割索引鍵的資料行。

    • 保留供日後使用。 不適用於 Azure SQL Edge。
  • PROPERTY_COLUMNS:

    如果提供,附加至訊息做為自訂屬性之輸出資料行名稱的逗號分隔清單。

    • 保留供日後使用。 不適用於 Azure SQL Edge。
  • SYSTEM_PROPERTY_COLUMNS:

    要填入服務匯流排訊息之系統屬性名稱和輸出資料行名稱/值組的 JSON 格式集合。 例如: { "MessageId": "column1", "PartitionKey": "column2" }

    • 保留供日後使用。 不適用於 Azure SQL Edge。
  • PARTITION_KEY:

    包含資料分割索引鍵的輸出資料行名稱。 資料分割索引鍵是指定資料表內資料分割的唯一識別碼,可形成實體主鍵的第一個部分。 這是大小上限為 1 KB 的字串值。

    • 保留供日後使用。 不適用於 Azure SQL Edge。
  • ROW_KEY:

    包含資料列索引鍵的輸出資料行名稱。 資料列索引鍵是指定資料分割內實體的唯一識別碼。 它會形成實體主鍵的第二個部分。 資料列索引鍵是一個字串值,大小可能高達 1 KB。

    • 保留供日後使用。 不適用於 Azure SQL Edge。
  • BATCH_SIZE:

    這代表資料表儲存體的交易數目,其中最大值最多可達 100 筆記錄。 針對 Azure Functions,這代表每個呼叫傳送至函式的批次大小 ,預設值為 256 kB。

    • 保留供日後使用。 不適用於 Azure SQL Edge。
  • MAXIMUM_BATCH_COUNT:

    針對 Azure 函式,每個呼叫傳送至函式的事件數目上限 - 預設值為 100。 對於SQL 資料庫,這代表每次大量插入交易傳送的記錄數目上限 - 預設值為 10,000。

    • 適用于所有 SQL 型輸出
  • STAGING_AREA:BLOB 的 EXTERNAL DATA SOURCE 物件儲存體

    高輸送量資料擷取至 Azure Synapse Analytics 的暫存區域

    • 保留供日後使用。 不適用於 Azure SQL Edge。

如需對應至資料來源類型之支援輸入和輸出選項的詳細資訊,請參閱 Azure 串流分析 - 輸入概觀 Azure 串流分析 - 輸出概觀

範例

範例 A:EdgeHub

類型:輸入或輸出。

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

範例 B:Azure SQL 資料庫、Azure SQL Edge、SQL Server

類型:輸出

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

範例 C:Kafka

類型:輸入

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );

另請參閱