CREATE EXTERNAL STREAM (Transact-SQL)

Важно!

Azure SQL Edge больше не поддерживает платформу ARM64.

Объект EXTERNAL STREAM имеет два назначения: для входного и выходного потока. Его можно использовать в качестве входных данных для запроса потоковых данных из служб приема событий, таких как Центры событий Azure, Центр Интернета вещей Azure (или Пограничный концентратор) или Kafka или его можно использовать в качестве выходных данных, чтобы указать, где и как хранить результаты из потокового запроса.

Внешний ПОТОК также можно указать и создать как выходные данные, так и входные данные для таких служб, как Центры событий или хранилище BLOB-объектов. Это позволяет создавать сценарии с цепочкой, в которых запрос потоковой передачи сохраняет результаты во внешнем потоке в качестве выходных данных, а другой потоковый запрос считывает результаты из того же внешнего потока в качестве входных данных.

В настоящее время SQL Azure для пограничных вычислений поддерживает только следующие источники данных в качестве входных потоков и потоков вывода.

Тип источника данных Входные данные Выходные данные Description
Центр Azure IoT Edge Y Y Источник данных для чтения и записи потоковых данных в центр Azure IoT Edge. Дополнительные сведения см. в разделе Центр IoT Edge.
База данных SQL N Y Соединение с источником данных для записи потоковых данных в Базу данных SQL. Базой данных может быть локальная база данных в SQL Azure для пограничных вычислений, удаленная база данных в SQL Server или База данных SQL Azure.
Kafka Y N Источник данных для чтения потоковых данных из раздела Kafka.

Синтаксис

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

Аргументы

DATA_SOURCE

Дополнительные сведения см. в DATA_SOURCE.

FILE_FORMAT

Дополнительные сведения см. в FILE_FORMAT.

МЕСТОНАХОЖДЕНИЕ

Задает имя для фактических данных или расположения в источнике данных.

  • Для центра Edge и объектов потока Kafka аргумент LOCATION указывает имя центра Edge или раздел Kafka для чтения или записи.
  • Для объектов потока SQL (SQL Server, База данных SQL Azure или Azure SQL Edge) расположение указывает имя таблицы. Если поток создается в той же базе данных и схеме, что и целевая таблица, достаточно имени таблицы. В противном случае необходимо полностью указать имя таблицы (<database_name>.<schema_name>.<table_name>).
  • Для объекта потока Хранилища BLOB-объектов Azure аргумент LOCATION ссылается на шаблон пути, используемый внутри контейнера BLOB-объектов. Дополнительные сведения см. в разделе "Выходные данные" из Azure Stream Analytics.

INPUT_OPTIONS

Укажите параметры в качестве пар "ключ-значение" для таких служб, как Kafka и Центры IoT Edge, которые являются входными данными для потоковых запросов.

  • РАЗДЕЛОВ:

    Количество секций, определенных для раздела. Максимальное количество секций, которые можно использовать, ограничено 32 (применяется к входным Потоки Kafka).

    • CONSUMER_GROUP.

      Концентратор событий и центры Интернета вещей ограничивают число читателей в одной группе потребителей (до пяти). Если оставить это поле пустым, будет использоваться группа потребителей $Default.

      • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
    • TIME_POLICY.

      Описывает, следует ли удалять события или настраивать время продолжительности события, если поздние или неупорядоченные события передают свое значение допуска.

      • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
    • LATE_EVENT_TOLERANCE.

      Максимально допустимая задержка времени. Задержка представляет разницу между меткой времени события и системными часами.

      • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
    • OUT_OF_ORDER_EVENT_TOLERANCE.

      После того как события перешли из входных данных в запрос потоковой передачи, они могут поступать не по порядку. Эти события можно принять как есть или приостановить процесс на определенное время, чтобы изменить их порядок.

      • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.

OUTPUT_OPTIONS

Указывает параметры в качестве пар "ключ — значение" для поддерживаемых служб, которые являются выходными данными для запросов потоковой передачи.

  • REJECT_POLICY: DROP | ПОВТОРИТЬ

    Вид политик обработки ошибок данных при возникновении ошибок преобразования данных.

    • Применяется ко всем поддерживаемым выходным данным.
  • MINIMUM_ROWS.

    Минимальное количество строк, необходимых для каждого пакета, записанного в выходные данные. Для Parquet для каждого пакета создается новый файл.

    • Применяется ко всем поддерживаемым выходным данным.
  • MAXIMUM_TIME.

    Максимальное время ожидания на пакет в минутах. После этого пакет будет записан в выходные данные, даже если минимальные требования к строкам не выполнены.

    • Применяется ко всем поддерживаемым выходным данным.
  • PARTITION_KEY_COLUMN.

    Столбец, который используется для ключа секции.

    • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
  • PROPERTY_COLUMNS.

    Разделенный запятыми список имен выходных столбцов, присоединенных к сообщениям в качестве настраиваемых свойств, если это указано.

    • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
  • SYSTEM_PROPERTY_COLUMNS.

    Коллекция пар "имя — значение" для имен системных свойств и выходных столбцов, которая будет заполнена на основе сообщений служебной шины, в формате JSON. Например, { "MessageId": "column1", "PartitionKey": "column2" }.

    • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
  • PARTITION_KEY.

    Имя выходного столбца, содержащего ключ раздела. Ключ раздела — это уникальный идентификатор раздела в пределах конкретной таблицы, являющийся первой частью первичного ключа сущности. Это строковое значение, которое может составлять до 1 КБ размера.

    • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
  • ROW_KEY.

    Имя выходного столбца, содержащего ключ строки. Ключ строки — это уникальный идентификатор сущности внутри конкретного раздела. Он является второй частью первичного ключа сущности. Ключ строки — это строковое значение размером до 1 КБ.

    • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
  • BATCH_SIZE.

    Представляет количество транзакций для хранилища таблиц, в которых максимальное число строк может доходить до 100. Для Функции Azure представляет размер пакета в байтах, отправляемого в функцию за один вызов. Значение по умолчанию — 256 КБ.

    • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.
  • MAXIMUM_BATCH_COUNT.

    Максимальное количество событий, отправленных в функцию, на вызов для Функции Azure. Значение по умолчанию — 100. Для Базы данных SQL представляет максимальное число записей, отправленных с каждой транзакцией массовой вставки. По умолчанию — 10 000.

    • Применяется ко всем выходным данным на основе SQL.
  • STAGING_AREA: объект EXTERNAL DATA SOURCE для BLOB-объектов служба хранилища

    Промежуточная область приема данных с высокой пропускной способностью в Azure Synapse Analytics

    • Зарезервировано для использования в будущем. Не применяется к Пограничным службам SQL Azure.

Дополнительные сведения о поддерживаемых параметрах входных и выходных данных, соответствующих типу источника данных, см. в статье "Обзор входных данных" и "Общие сведения о выходных данных Azure Stream Analytics".

Примеры

Пример A: EdgeHub

Тип: входные или выходные данные.

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Пример B: База данных SQL Azure, Azure SQL Edge, SQL Server

Тип: выходные данные

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Пример C: Kafka

Тип: входные данные

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );

См. также