CREATE EXTERNAL STREAM (Transact-SQL)

Importante

O SQL do Azure no Edge encerrou o suporte à plataforma ARM64.

O objeto EXTERNAL STREAM tem uma finalidade dupla de fluxo de entrada e saída. Ele pode ser usado como uma entrada para consultar dados de streaming de serviços de ingestão de eventos, como Hubs de Eventos do Azure, Hub IoT do Azure (ou Hub de Borda) ou Kafka, ou pode ser usado como uma saída para especificar onde e como armazenar resultados de uma consulta de streaming.

Um FLUXO EXTERNO também pode ser especificado e criado como saída e entrada para serviços como Hubs de Eventos ou armazenamento de Blobs. Isso facilita cenários de encadeamento em que uma consulta de streaming está persistindo os resultados para o stream externo como saída e outra consulta de streaming lendo do mesmo fluxo externo como entrada.

O SQL do Azure no Edge atualmente oferece suporte apenas às fontes de dados a seguir como entradas e saídas de fluxo.

Tipo de fonte de dados Entrada Saída Descrição
Hub do Azure IoT Edge N S Fonte de dados para ler e gravar dados de streaming em um hub do Azure IoT Edge. Para obter mais informações, confira Hub do IoT Edge.
Banco de Dados SQL N S Conexão de fonte de dados para gravar dados de streaming no banco de dado SQL. O banco de dados pode ser um banco de dados local no SQL do Azure no Edge ou em um banco de dados remoto no SQL Server ou em um banco de dados SQL do Azure.
Kafka N N Fonte de dados para ler dados de streaming de um tópico Kafka.

Sintaxe

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

Argumentos

DATA_SOURCE

Para obter mais informações, consulte DATA_SOURCE.

FILE_FORMAT

Para obter mais informações, consulte FILE_FORMAT.

LOCATION

Especifica o nome para os dados reais ou o local na fonte de dados.

  • Para objetos de fluxo Kafka ou Hub do Edge, o local especifica o nome do tópico Kafka ou Hub do Edge do qual ler ou no qual gravar.
  • Para objetos de fluxo SQL (SQL Server, Banco de Dados SQL do Azure ou Azure SQL Edge), o local especifica o nome da tabela. Se o fluxo for criado no mesmo banco de dados e esquema que a tabela de destino, apenas o nome da tabela será o suficiente. Caso contrário, você precisará qualificar totalmente o nome da tabela (<database_name>.<schema_name>.<table_name>).
  • Para o Armazenamento de Blobs do Azure, o local do objeto de fluxo se refere ao padrão de caminho a ser usado no contêiner de blob. Para obter mais informações, consulte Saídas do Azure Stream Analytics.

INPUT_OPTIONS

Especifique opções como pares chave-valor para serviços como Kafka e IoT Edge Hubs, que são entradas para consultas de streaming.

  • PARTIÇÕES:

    Número de partições definidas para um tópico. O número máximo de partições que podem ser usadas é limitado a 32 (Aplica-se a fluxos de entrada Kafka).

    • CONSUMER_GROUP:

      Os hubs IoT e de eventos limitam a 5 o número de leitores em um grupo de consumidores. Deixar esse campo vazio usará o grupo de consumidores '$Default'.

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
    • TIME_POLICY:

      Descreve se deve-se descartar eventos ou ajustar a hora do evento quando eventos atrasados ou fora de ordem passam o valor de tolerância.

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
    • LATE_EVENT_TOLERANCE:

      O atraso de tempo máximo aceitável. O atraso representa a diferença entre o carimbo de data/hora do evento e o relógio do sistema.

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      Os eventos podem chegar fora de ordem depois de terem feito a viagem da entrada para a consulta de streaming. Esses eventos podem ser aceitos como estão, ou você pode optar por pausar por um período definido para reordená-los.

      • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.

OUTPUT_OPTIONS

Especificar opções como pares chave-valor para serviços com suporte que são saídas para consultas de streaming

  • REJECT_POLICY: DROP | REPETIR

    Classifique as políticas de tratamento de erros de dados quando ocorrerem erros de conversão de dados.

    • Aplica-se a todas as saídas suportadas.
  • MINIMUM_ROWS:

    Mínimo de linhas necessárias por lote gravadas em uma saída. Para o Parquet, cada lote cria um novo arquivo.

    • Aplica-se a todas as saídas suportadas.
  • MAXIMUM_TIME:

    O tempo máximo de espera em minutos por lote. Após esse período, o lote será gravado na saída mesmo que o requisito de linhas mínimas não seja atendido.

    • Aplica-se a todas as saídas suportadas.
  • PARTITION_KEY_COLUMN:

    Coluna usada para a chave de partição.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • PROPERTY_COLUMNS:

    Uma lista separada por vírgulas dos nomes das colunas de saída anexadas às mensagens como propriedades personalizadas, se fornecida.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • SYSTEM_PROPERTY_COLUMNS:

    Uma coleção de pares de nome/valor de nomes de Propriedades do Sistema e colunas de saída formatadas como JSON a serem preenchidas nas mensagens de Barramento de Serviço. Por exemplo, { "MessageId": "column1", "PartitionKey": "column2" }.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • PARTITION_KEY:

    O nome da coluna de saída que contém a chave da partição. A chave de partição é um identificador exclusivo para a partição em uma determinada tabela que forma a primeira parte da chave primária da entidade. É um valor de cadeia de caracteres que pode ter até 1 KB de tamanho.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • ROW_KEY:

    O nome da coluna de saída que contém a chave de linha. A chave de linha é um identificador exclusivo para uma entidade em uma determinada partição. Ela forma a segunda parte da chave primária da entidade. A chave de linha é um valor de cadeia de caracteres que pode ter até 1 KB em tamanho.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • BATCH_SIZE:

    Isso representa o número de transações para o armazenamento de tabelas em que o máximo pode ir até 100 registros. Para o Azure Functions, isso representa o tamanho do lote em bytes enviado para a função por chamada. O padrão é 256 KB.

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.
  • MAXIMUM_BATCH_COUNT:

    Número máximo de eventos enviados para a função por chamada para o Azure Function. O padrão é 100. Para o Banco de Dados SQL, isso representa o número máximo de registros enviados com cada transação de inserção em massa. O padrão é 10 mil.

    • Aplica-se a todas as saídas baseadas em SQL
  • STAGING_AREA: Objeto FONTE DE DADOS EXTERNA para Armazenamento de Blobs

    A área de preparo para ingestão de dados de alta taxa de transferência no Azure Synapse Analytics

    • Reservado para uso futuro. Não se aplica ao Azure SQL Edge.

Para obter mais informações sobre as opções de entrada e saída com suporte correspondentes ao tipo de fonte de dados, consulte Visão geral de entrada do Azure Stream Analytics e Visão geral das saídas do Azure Stream Analytics, respectivamente.

Exemplos

Exemplo A: EdgeHub

Tipo: Entrada ou Saída.

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Exemplo B: Banco de Dados SQL do Azure, Azure SQL Edge, SQL Server

Tipo: Saída

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Exemplo C: Kafka

Tipo: Entrada

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );

Confira também