CREATE EXTERNAL STREAM (Transact-SQL)
Ważne
Usługa Azure SQL Edge zostanie wycofana 30 września 2025 r. Aby uzyskać więcej informacji i opcji migracji, zobacz powiadomienie o wycofaniu.
Uwaga
Usługa Azure SQL Edge nie obsługuje już platformy ARM64.
Obiekt EXTERNAL STREAM ma podwójne przeznaczenie zarówno strumienia wejściowego, jak i wyjściowego. Może służyć jako dane wejściowe do wykonywania zapytań dotyczących danych przesyłanych strumieniowo z usług pozyskiwania zdarzeń, takich jak Azure Event Hubs, Azure IoT Hub (lub Edge Hub) lub Kafka, albo może służyć jako dane wyjściowe do określenia, gdzie i jak przechowywać wyniki z zapytania przesyłania strumieniowego.
Strumień ZEWNĘTRZNY można również określić i utworzyć jako dane wyjściowe i wejściowe dla usług, takich jak Event Hubs lub Blob Storage. Ułatwia to tworzenie łańcuchów scenariuszy, w których zapytanie przesyłane strumieniowo utrwala wyniki do strumienia zewnętrznego jako dane wyjściowe, a inne odczyty zapytań przesyłanych strumieniowo z tego samego strumienia zewnętrznego co dane wejściowe.
Usługa Azure SQL Edge obecnie obsługuje tylko następujące źródła danych jako dane wejściowe i wyjściowe strumienia.
Typ źródła danych | Dane wejściowe | Dane wyjściowe | opis |
---|---|---|---|
Centrum usługi Azure IoT Edge | Y | Y | Źródło danych do odczytywania i zapisywania danych przesyłanych strumieniowo do centrum usługi Azure IoT Edge. Aby uzyskać więcej informacji, zobacz Centrum usługi IoT Edge. |
SQL Database | N | Y | Połączenie ze źródłem danych w celu zapisu danych przesyłanych strumieniowo do usługi SQL Database. Baza danych może być lokalną bazą danych w usłudze Azure SQL Edge lub zdalną bazą danych w programie SQL Server lub usłudze Azure SQL Database. |
Kafka | Y | N | Źródło danych do odczytywania danych przesyłanych strumieniowo z tematu platformy Kafka. |
Składnia
CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH ( <with_options> )
<column_definition> ::=
column_name <column_data_type>
<data_type> ::=
[ type_schema_name . ] type_name
[ ( precision [ , scale ] | max ) ]
<with_options> ::=
DATA_SOURCE = data_source_name ,
LOCATION = location_name ,
[ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
[ <optional_input_options> ] ,
[ <optional_output_options> ] ,
TAGS = <tag_column_value>
<optional_input_options> ::=
INPUT_OPTIONS = ' [ <input_options_data> ] '
<Input_option_data> ::=
<input_option_values> [ , <input_option_values> ]
<input_option_values> ::=
PARTITIONS: [ number_of_partitions ]
| CONSUMER_GROUP: [ consumer_group_name ]
| TIME_POLICY: [ time_policy ]
| LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
| OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]
<optional_output_options> ::=
OUTPUT_OPTIONS = ' [ <output_option_data> ] '
<output_option_data> ::=
<output_option_values> [ , <output_option_values> ]
<output_option_values> ::=
REJECT_POLICY: [ reject_policy ]
| MINIMUM_ROWS: [ row_value ]
| MAXIMUM_TIME: [ time_value_minutes ]
| PARTITION_KEY_COLUMN: [ partition_key_column_name ]
| PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
| PARTITION_KEY: [ partition_key_name ]
| ROW_KEY: [ row_key_name ]
| BATCH_SIZE: [ batch_size_value ]
| MAXIMUM_BATCH_COUNT: [ batch_value ]
| STAGING_AREA: [ blob_data_source ]
<tag_column_value> ::= -- Reserved for Future Usage
);
Argumenty
DATA_SOURCE
Aby uzyskać więcej informacji, zobacz DATA_SOURCE.
FILE_FORMAT
Aby uzyskać więcej informacji, zobacz FILE_FORMAT.
LOKALIZACJA
Określa nazwę rzeczywistych danych lub lokalizacji w źródle danych.
- W przypadku obiektów strumieniowych usługi Edge Hub lub Kafka lokalizacja określa nazwę tematu Centrum usługi Edge lub platformy Kafka do odczytu lub zapisu.
- W przypadku obiektów strumienia SQL (SQL Server, Azure SQL Database lub Azure SQL Edge) lokalizacja określa nazwę tabeli. Jeśli strumień jest tworzony w tej samej bazie danych i schemacie co tabela docelowa, wystarczy tylko nazwa tabeli. W przeciwnym razie musisz w pełni zakwalifikować nazwę tabeli (
<database_name>.<schema_name>.<table_name>
). - W przypadku lokalizacji obiektu strumienia usługi Azure Blob Storage odwołuje się do wzorca ścieżki do użycia wewnątrz kontenera obiektów blob. Aby uzyskać więcej informacji, zobacz Dane wyjściowe z usługi Azure Stream Analytics.
INPUT_OPTIONS
Określ opcje jako pary klucz-wartość dla usług, takich jak Kafka i IoT Edge Hubs, które są danymi wejściowymi zapytań przesyłanych strumieniowo.
PARTYCJI:
Liczba partycji zdefiniowanych dla tematu. Maksymalna liczba partycji, które mogą być używane, jest ograniczona do 32 (dotyczy strumieni wejściowych platformy Kafka).
CONSUMER_GROUP:
Usługa Event i IoT Hubs ograniczają liczbę czytelników w jednej grupie odbiorców (do 5). Pozostawienie tego pola pustego spowoduje użycie grupy odbiorców "$Default".
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
TIME_POLICY:
Opisuje, czy należy usuwać zdarzenia, czy dostosowywać czas zdarzenia, gdy zdarzenia opóźnione lub poza kolejnością przekazują ich wartość tolerancji.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
LATE_EVENT_TOLERANCE:
Maksymalne dopuszczalne opóźnienie czasu. Opóźnienie reprezentuje różnicę między znacznikami czasu zdarzenia a zegarem systemowym.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
OUT_OF_ORDER_EVENT_TOLERANCE:
Zdarzenia mogą pochodzić z zamówienia po zakończeniu podróży z danych wejściowych do zapytania przesyłania strumieniowego. Te zdarzenia mogą być akceptowane zgodnie z rzeczywistymi lub można wstrzymać dla określonego okresu, aby zmienić ich kolejność.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
OUTPUT_OPTIONS
Określ opcje jako pary klucz-wartość dla obsługiwanych usług, które są danymi wyjściowymi zapytań przesyłanych strumieniowo
REJECT_POLICY: DROP | PONÓW PRÓBĘ
Określa zasady obsługi błędów danych w przypadku wystąpienia błędów konwersji danych.
- Dotyczy wszystkich obsługiwanych danych wyjściowych.
MINIMUM_ROWS:
Minimalna liczba wierszy wymaganych na partię zapisana w danych wyjściowych. W przypadku programu Parquet każda partia tworzy nowy plik.
- Dotyczy wszystkich obsługiwanych danych wyjściowych.
MAXIMUM_TIME:
Maksymalny czas oczekiwania w minutach na partię. Po tym czasie partia zostanie zapisana w danych wyjściowych, nawet jeśli minimalne wymagania dotyczące wierszy nie zostaną spełnione.
- Dotyczy wszystkich obsługiwanych danych wyjściowych.
PARTITION_KEY_COLUMN:
Kolumna używana dla klucza partycji.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
PROPERTY_COLUMNS:
Rozdzielona przecinkami lista nazw kolumn wyjściowych dołączonych do komunikatów jako właściwości niestandardowych, jeśli zostanie podana.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
SYSTEM_PROPERTY_COLUMNS:
Kolekcja w formacie JSON par nazwa/wartość nazw i kolumn wyjściowych, które mają zostać wypełnione w komunikatach usługi Service Bus. Na przykład
{ "MessageId": "column1", "PartitionKey": "column2" }
.- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
PARTITION_KEY:
Nazwa kolumny wyjściowej zawierającej klucz partycji. Klucz partycji jest unikatowym identyfikatorem partycji w danej tabeli, która stanowi pierwszą część klucza podstawowego jednostki. Jest to wartość ciągu, która może mieć rozmiar do 1 KB.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
ROW_KEY:
Nazwa kolumny wyjściowej zawierającej klucz wiersza. Klucz wiersza jest unikatowym identyfikatorem jednostki w ramach danej partycji. Stanowi drugą część klucza podstawowego jednostki. Klucz wiersza jest wartością ciągu, która może mieć rozmiar do 1 KB.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
BATCH_SIZE:
Reprezentuje to liczbę transakcji dla magazynu tabel, w których maksymalna liczba rekordów może wynosić do 100. W przypadku usługi Azure Functions reprezentuje rozmiar partii w bajtach wysyłanych do funkcji na wywołanie — wartość domyślna to 256 kB.
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
MAXIMUM_BATCH_COUNT:
Maksymalna liczba zdarzeń wysyłanych do funkcji na wywołanie funkcji platformy Azure — wartość domyślna to 100. W przypadku usługi SQL Database reprezentuje maksymalną liczbę rekordów wysyłanych z każdą transakcją wstawiania zbiorczego — wartość domyślna to 10 000.
- Dotyczy wszystkich danych wyjściowych opartych na języku SQL
STAGING_AREA: obiekt EXTERNAL DATA SOURCE do usługi Blob Storage
Obszar przejściowy na potrzeby pozyskiwania danych o wysokiej przepływności w usłudze Azure Synapse Analytics
- Zarezerwowane do użycia w przyszłości. Nie dotyczy usługi Azure SQL Edge.
Aby uzyskać więcej informacji na temat obsługiwanych opcji danych wejściowych i wyjściowych odpowiadających typowi źródła danych, zobacz Azure Stream Analytics — Omówienie danych wejściowych i Azure Stream Analytics — omówienie danych wyjściowych.
Przykłady
Przykład: EdgeHub
Typ: Dane wejściowe lub wyjściowe.
CREATE EXTERNAL DATA SOURCE MyEdgeHub
WITH (LOCATION = 'edgehub://');
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (FORMAT_TYPE = JSON);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyEdgeHub,
FILE_FORMAT = myFileFormat,
LOCATION = '<mytopicname>',
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Przykład B: Azure SQL Database, Azure SQL Edge, SQL Server
Typ: Dane wyjściowe
CREATE DATABASE SCOPED CREDENTIAL SQLCredName
WITH IDENTITY = '<user>',
SECRET = '<password>';
-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = '<my_server_name>.database.windows.net',
CREDENTIAL = SQLCredName
);
--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
WITH (
LOCATION = ' <sqlserver://<ipaddress>,<port>',
CREDENTIAL = SQLCredName
);
CREATE EXTERNAL STREAM Stream_A
WITH (
DATA_SOURCE = MyTargetSQLTable,
LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
--Note: If table is contained in the database, <TableName> should be sufficient
OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
);
Przykład C: Kafka
Typ: Dane wejściowe
CREATE EXTERNAL DATA SOURCE MyKafka_tweets
WITH (
--The location maps to KafkaBootstrapServer
LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
CREDENTIAL = kafkaCredName
);
CREATE EXTERNAL FILE FORMAT myFileFormat
WITH (
FORMAT_TYPE = JSON,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
);
CREATE EXTERNAL STREAM Stream_A (
user_id VARCHAR,
tweet VARCHAR
)
WITH (
DATA_SOURCE = MyKafka_tweets,
LOCATION = '<KafkaTopicName>',
FILE_FORMAT = myFileFormat,
INPUT_OPTIONS = 'PARTITIONS: 5'
);