CREATE EXTERNAL STREAM (Transact-SQL)

Önemli

Azure SQL Edge artık ARM64 platformunu desteklememektedir.

EXTERNAL STREAM nesnesinin hem giriş hem de çıkış akışı çift amacı vardır. Azure Event Hubs, Azure IoT Hub (veya Edge Hub) veya Kafka gibi olay alma hizmetlerinden akış verilerini sorgulamak için giriş olarak kullanılabilir veya akış sorgusundan elde edilecek sonuçların nerede ve nasıl depoleneceğini belirtmek için çıkış olarak kullanılabilir.

Dış AKıŞ ayrıca Event Hubs veya Blob depolama gibi hizmetler için hem çıkış hem de giriş olarak belirtilebilir ve oluşturulabilir. Bu, bir akış sorgusunun sonuçları çıkış olarak dış akışa ve başka bir akış sorgusunun girişle aynı dış akıştan okumasına neden olduğu zincirleme senaryolarını kolaylaştırır.

Azure SQL Edge şu anda yalnızca aşağıdaki veri kaynaklarını akış girişleri ve çıkışları olarak desteklemektedir.

Veri kaynağı türü Girdi Çıktı Tanım
Azure IoT Edge hub'ı Y Y Azure IoT Edge hub'ına akış verilerini okumak ve yazmak için veri kaynağı. Daha fazla bilgi için bkz . IoT Edge Hub.
SQL Veritabanı N Y akış verilerini SQL Veritabanı yazmak için veri kaynağı bağlantısı. Veritabanı, Azure SQL Edge'deki yerel bir veritabanı veya SQL Server veya Azure SQL Veritabanı uzak veritabanı olabilir.
Kafka Y N Kafka konusundan akış verilerini okumak için veri kaynağı.

Sözdizimi

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

Bağımsız değişkenler

DATA_SOURCE

Daha fazla bilgi için bkz . DATA_SOURCE.

FILE_FORMAT

Daha fazla bilgi için bkz . FILE_FORMAT.

KONUM

Veri kaynağındaki gerçek verilerin veya konumun adını belirtir.

  • Edge Hub veya Kafka akış nesneleri için konum, okuma veya yazma için Edge Hub veya Kafka konusunun adını belirtir.
  • SQL akış nesneleri için (SQL Server, Azure SQL Veritabanı veya Azure SQL Edge), konum tablonun adını belirtir. Akış hedef tabloyla aynı veritabanında ve şemada oluşturulduysa yalnızca Tablo adı yeterli olur. Aksi takdirde tablo adını (<database_name>.<schema_name>.<table_name> ) tam olarak nitelemeniz gerekir.
  • Azure Blob Depolama akış nesnesi konumu, blob kapsayıcısının içinde kullanılacak yol desenine başvurur. Daha fazla bilgi için bkz . Azure Stream Analytics çıkışları.

INPUT_OPTIONS

Kafka ve IoT Edge Hubs gibi hizmetler için akış sorgularının girişleri olan anahtar-değer çiftleri olarak seçenekleri belirtin.

  • BÖLÜM:

    Bir konu için tanımlanan bölüm sayısı. Kullanılabilecek en fazla bölüm sayısı 32 ile sınırlıdır (Kafka Giriş Akışlar için geçerlidir).

    • CONSUMER_GROUP:

      Event ve IoT Hubs, bir tüketici grubundaki okuyucu sayısını sınırlar (5 ile). Bu alanı boş bırakmak '$Default' tüketici grubunu kullanır.

      • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
    • TIME_POLICY:

      Olayların bırakılıp bırakılmayacağını veya geç olayların veya sıra dışı olayların tolerans değerlerini geçirmesi durumunda olay zamanının ayarlanıp ayarlanmayacağını açıklar.

      • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
    • LATE_EVENT_TOLERANCE:

      Kabul edilebilir en uzun gecikme süresi. Gecikme, olayın zaman damgası ile sistem saati arasındaki farkı temsil eder.

      • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      Olaylar girişten akış sorgusuna yolculuğu yaptıktan sonra sıra dışı gelebilir. Bu olaylar olduğu gibi kabul edilebilir veya bunları yeniden sıralamak için belirli bir süre boyunca duraklatmayı seçebilirsiniz.

      • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.

OUTPUT_OPTIONS

Akış sorgularının çıkışları olan desteklenen hizmetler için anahtar-değer çiftleri olarak seçenekleri belirtin

  • REJECT_POLICY: DROP | YENİ -DEN DENEME

    Veri dönüştürme hataları oluştuğunda veri hatası işleme ilkelerini türleyin.

    • Desteklenen tüm çıkışlar için geçerlidir.
  • MINIMUM_ROWS:

    Çıktıya yazılan toplu iş başına gereken en düşük satır sayısı. Parquet için her toplu iş yeni bir dosya oluşturur.

    • Desteklenen tüm çıkışlar için geçerlidir.
  • MAXIMUM_TIME:

    Toplu iş başına dakika cinsinden maksimum bekleme süresi. Bu süreden sonra, minimum satır gereksinimi karşılanmasa bile toplu iş çıktıya yazılır.

    • Desteklenen tüm çıkışlar için geçerlidir.
  • PARTITION_KEY_COLUMN:

    Bölüm anahtarı için kullanılan sütun.

    • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
  • PROPERTY_COLUMNS:

    İletilere özel özellikler olarak eklenen çıkış sütunlarının adlarının virgülle ayrılmış listesi (sağlanmışsa).

    • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
  • SYSTEM_PROPERTY_COLUMNS:

    Service Bus iletilerinde doldurulacak Sistem Özelliği adlarının ve çıkış sütunlarının ad/değer çiftlerinden oluşan JSON biçimli bir koleksiyon. Örneğin, { "MessageId": "column1", "PartitionKey": "column2" }.

    • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
  • PARTITION_KEY:

    Bölüm anahtarını içeren çıkış sütununun adı. Bölüm anahtarı, bir varlığın birincil anahtarının ilk bölümünü oluşturan belirli bir tablo içindeki bölüm için benzersiz bir tanımlayıcıdır. Boyutu 1 KB'a kadar olabilen bir dize değeridir.

    • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
  • ROW_KEY:

    Satır anahtarını içeren çıkış sütununun adı. Satır anahtarı, belirli bir bölümdeki bir varlığın benzersiz tanımlayıcısıdır. Bir varlığın birincil anahtarının ikinci bölümünü oluşturur. Satır anahtarı, boyutu 1 KB'a kadar olabilen bir dize değeridir.

    • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
  • BATCH_SIZE:

    Bu, tablo depolama için en fazla 100 kayda kadar çıkabilen işlem sayısını temsil eder. Azure İşlevleri için bu, çağrı başına işleve gönderilen toplu iş boyutunu bayt cinsinden temsil eder; varsayılan değer 256 kB'dir.

    • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.
  • MAXIMUM_BATCH_COUNT:

    Azure işlevi için çağrı başına işleve gönderilen en fazla olay sayısı ( varsayılan değer 100'dür). SQL Veritabanı için bu, her toplu ekleme işlemiyle gönderilen en fazla kayıt sayısını temsil eder; varsayılan değer 10.000'dir.

    • Tüm SQL tabanlı çıkışlar için geçerlidir
  • STAGING_AREA: Blob Depolama DıŞ VERİ KAYNAKI nesnesi

    Azure Synapse Analytics'e yüksek aktarım hızına sahip veri alımı için hazırlama alanı

    • Gelecekteki kullanımlar için ayrılmıştır. Azure SQL Edge için geçerli değildir.

Veri kaynağı türüne karşılık gelen desteklenen giriş ve çıkış seçenekleri hakkında daha fazla bilgi için bkz . Sırasıyla Azure Stream Analytics - Girişe Genel Bakış ve Azure Stream Analytics - Çıkışlara Genel Bakış .

Örnekler

Örnek A: EdgeHub

Tür: Giriş veya Çıkış.

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Örnek B: Azure SQL Veritabanı, Azure SQL Edge, SQL Server

Tür: Çıkış

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

Örnek C: Kafka

Tür: Giriş

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );

Ayrıca bkz.