CREATE EXTERNAL STREAM (Transact-SQL)

重要

Azure SQL Edge では、ARM64 プラットフォームがサポートされなくなりました。

EXTERNAL STREAM オブジェクトには、入力ストリームと出力ストリームの両方の 2 つの目的があります。 これは、Azure Event Hubs、Azure IoT Hub (または Edge ハブ)、Kafka などのイベント インジェスト サービスからのストリーミング データをクエリするための入力として使用することも、ストリーミング クエリの結果を格納する場所と方法を指定するための出力として使用することもできます。

EXTERNAL STREAM は、Event Hubs や Blob Storage などのサービスの出力と入力の両方として、指定および作成することもできます。 これにより、あるストリーミング クエリでは外部ストリームに対する結果が出力として保持されており、別のストリーミング クエリでは入力として同じ外部ストリームから読み取りが行われるチェーン シナリオが容易になります。

現在、Azure SQL Edge でストリーム入力および出力としてサポートされているのは、次のデータ ソースのみです。

[データ ソースの種類] 入力 出力 説明
Azure IoT Edge ハブ Y Y Azure IoT Edge ハブに対するストリーミング データの読み書きを行うためのデータ ソース。 詳細については、IoT Edge ハブに関するページを参照してください。
SQL Database N Y SQL Database にストリーミング データを書き込むためのデータ ソース接続。 データベースは、Azure SQL Edge のローカル データベースでも、SQL Server または Azure SQL Database のリモート データベースでもかまいません。
Kafka Y N Kafka トピックからストリーミング データを読み取るためのデータ ソース。

構文

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

引数

DATA_SOURCE

詳しくは、DATA_SOURCE に関する記事をご覧ください。

FILE_FORMAT

詳しくは、FILE_FORMAT に関する記事をご覧ください。

LOCATION

データ ソース内の実際のデータまたは場所の名前を指定します。

  • Edge ハブまたは Kafka ストリーム オブジェクトの場合、場所は読み取り元または書き込み先の Edge ハブまたは Kafka トピックの名前を指定します。
  • SQL ストリーム オブジェクト (SQL Server、Azure SQL Database、または Azure SQL Edge) の場合、場所はテーブルの名前を指定します。 ターゲット テーブルと同じデータベースおよびスキーマにストリームが作成されている場合は、テーブル名だけで十分です。 それ以外の場合は、テーブル名を完全に修飾する必要があります (<database_name>.<schema_name>.<table_name>)。
  • Azure Blob Storage の場合、ストリーム オブジェクトの場所とは、BLOB コンテナー内で使用するパス パターンを表します。 詳しくは、Azure Stream Analytics からの出力に関する記事をご覧ください。

INPUT_OPTIONS

ストリーミング クエリに対する入力である Kafka や IoT Edge ハブなどのサービスへのオプションを、キーと値のペアとして指定します。

  • PARTITIONS:

    トピックに対して定義されているパーティションの数。 使用できるパーティションの最大数は、32 に制限されています (Kafka 入力ストリームに適用されます)。

    • CONSUMER_GROUP:

      Event Hubs や IOT Hub では、1 つのコンシューマー グループ内のリーダーの数が (5 個に) 制限されています。 このフィールドを空白のままにすると、"$Default" コンシューマー グループが使用されます。

      • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
    • TIME_POLICY:

      遅延イベントまたは順不同イベントが許容範囲の値に到達したときに、イベントを削除するか、イベント時間を調整するかを指定します。

      • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
    • LATE_EVENT_TOLERANCE:

      許容される最大遅延時間。 遅延は、イベントのタイムスタンプとシステム クロックの間の差を表します。

      • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      イベントは、入力からストリーミング クエリに移動した後、順不同に到着してもかまいません。 このようなイベントは、そのまま受け入れることも、または設定した期間だけ一時停止して並べ替えることもできます。

      • 将来利用するために予約されています。 Azure SQL Edge には適用されません。

OUTPUT_OPTIONS

ストリーミング クエリに対する出力であるサポートされるサービスへのオプションを、キーと値のペアとして指定します

  • REJECT_POLICY: DROP | RETRY

    データ変換エラーが発生したときのデータ エラー処理ポリシーを指定します。

    • サポートされるすべての出力に適用されます。
  • MINIMUM_ROWS:

    出力に書き込まれるバッチごとに必要な最小行数。 Parquet の場合、すべてのバッチによって新しいファイルが作成されます。

    • サポートされるすべての出力に適用されます。
  • MAXIMUM_TIME:

    バッチあたりの最大待機時間 (分)。 この時間が経過すると、最小行数の要件が満たされていなくても、バッチは出力に書き込まれます。

    • サポートされるすべての出力に適用されます。
  • PARTITION_KEY_COLUMN:

    パーティション キーに使用される列。

    • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
  • PROPERTY_COLUMNS:

    指定する場合は、カスタム プロパティとしてメッセージに添付する出力列の名前のコンマ区切りのリスト。

    • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
  • SYSTEM_PROPERTY_COLUMNS:

    Service Bus メッセージに設定されるシステム プロパティ名と出力列の名前と値のペアの JSON 形式のコレクション。 たとえば、{ "MessageId": "column1", "PartitionKey": "column2" } のようにします。

    • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
  • PARTITION_KEY:

    パーティション キーが含まれる出力列の名前。 パーティション キーは、エンティティのプライマリ キーの最初の部分を形成する、特定のテーブル内のパーティションの一意の識別子です。 最大サイズが 1 KB の文字列値です。

    • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
  • ROW_KEY:

    行キーが含まれる出力列の名前。 行キーは、特定のパーティション内のエンティティを示す一意の識別子です。 これにより、エンティティのプライマリ キーの 2 番目の部分が形成されます。 行キーは、最大サイズが 1 KB の文字列値です。

    • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
  • BATCH_SIZE:

    これは、Table Storage に対するトランザクションの数を表します。最大 100 レコードまで可能です。 Azure Functions の場合、これは呼び出しごとに関数に送信されるバッチ サイズ (バイト単位) を表します。既定値は 256 KB です。

    • 将来利用するために予約されています。 Azure SQL Edge には適用されません。
  • MAXIMUM_BATCH_COUNT:

    Azure 関数の場合は、呼び出しごとに関数に送信されるイベントの最大数。既定値は 100 です。 SQL Database の場合、これは一括挿入の各トランザクションで送信されたレコードの最大数を表します。既定値は 10,000 です。

    • すべての SQL ベースの出力に適用されます
  • STAGING_AREA: Blob Storage への外部データ ソース オブジェクト

    Azure Synapse Analytics への高スループットデータ インジェスト用のステージング領域

    • 将来利用するために予約されています。 Azure SQL Edge には適用されません。

データ ソースの種類に対応するサポートされている入力および出力オプションの詳細については、Azure Stream Analytics の入力の概要Azure Stream Analytics の出力の概要に関するページをそれぞれ参照してください。

例 A: EdgeHub

種類: 入力または出力:

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

例 B: Azure SQL Database、Azure SQL Edge、SQL Server

種類: 出力

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

例 C: Kafka

種類: 入力

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );

関連項目