Создание задания потоковой передачи данных в SQL Azure для пограничных вычислений
Внимание
Azure SQL Edge будет прекращена 30 сентября 2025 г. Дополнительные сведения и параметры миграции см. в уведомлении о выходе на пенсию.
Примечание.
Azure SQL Edge больше не поддерживает платформу ARM64.
В этой статье описано, как создать задание потоковой передачи T-SQL в SQL Azure для пограничных вычислений. Вы создаете входные и выходные объекты внешнего потока, а затем определяете потоковый запрос в ходе создания задания потоковой передачи.
Настройка входных и выходных объектов внешнего потока
Потоковая передача T-SQL использует функциональные возможности внешнего источника данных SQL Server, чтобы определить источники данных, связанные с входными и выходными объектами внешнего потока в задании потоковой передачи. Используйте следующие команды T-SQL, чтобы создать входной или выходной объект внешнего потока.
- CREATE EXTERNAL FILE FORMAT (Transact-SQL)
- CREATE EXTERNAL DATA SOURCE (Transact-SQL)
- CREATE EXTERNAL STREAM (Transact-SQL)
Кроме того, если в качестве потока вывода используется SQL Azure для пограничных вычислений, SQL Server или База данных SQL Azure, необходимо СОЗДАТЬ УЧЕТНЫЕ ДАННЫЕ ДЛЯ БАЗЫ ДАННЫХ (Transact-SQL). Эта команда T-SQL определяет учетные данные для доступа к базе данных.
Поддерживаемые источники данных для входного потока и потока вывода
В настоящее время SQL Azure для пограничных вычислений поддерживает только следующие источники данных в качестве входных потоков и потоков вывода.
Тип источника данных | Входные данные | Выходные данные | Description |
---|---|---|---|
Центр Azure IoT Edge | Y | Y | Источник данных для чтения и записи потоковых данных в центр Azure IoT Edge. Дополнительные сведения см. в разделе Центр IoT Edge. |
База данных SQL | N | Y | Соединение с источником данных для записи потоковых данных в Базу данных SQL. Базой данных может быть локальная база данных в SQL Azure для пограничных вычислений, удаленная база данных в SQL Server или База данных SQL Azure. |
Kafka | Y | N | Источник данных для чтения потоковых данных из раздела Kafka. |
Пример. Создание входного или выходного объекта внешнего потока для центра Azure IoT Edge
В следующем примере создается объект внешнего потока для центра Azure IoT Edge. Чтобы создать источник входных и выходных данных внешнего потока для центра Azure IoT Edge, сначала необходимо создать формат внешнего файла для структуры данных, которые считываются или записываются.
Создайте формат внешнего файла типа JSON.
CREATE EXTERNAL FILE format InputFileFormat WITH (FORMAT_TYPE = JSON); GO
Создайте внешний источник данных для центра Azure IoT Edge. Следующий скрипт T-SQL создает подключение источника данных к центру IoT Edge, который выполняется на том же узле Docker, что и SQL Azure для пограничных вычислений.
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH (LOCATION = 'edgehub://'); GO
Создайте объект внешнего потока для центра Azure IoT Edge. Следующий скрипт T-SQL создает объект потока для центра IoT Edge. В случае с объектом потока центра IoT Edge параметр LOCATION представляет собой имя раздела или канала центра IoT Edge, в котором выполняется операция считывания или записи.
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); GO
Пример. Создание объекта внешнего потока для Базы данных SQL Azure
В приведенном ниже примере создается объект внешнего потока для локальной базы данных SQL Azure для пограничных вычислений.
Создайте главный ключ в базе данных. Это необходимо для шифрования секрета учетных данных.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
Создайте учетные данные базы данных для доступа к источнику SQL Server. В следующем примере создаются учетные данные для внешнего источника данных, где IDENTITY = имя пользователя, а SECRET = пароль.
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'; GO
Создайте внешний источник данных с помощью инструкции CREATE EXTERNAL DATA SOURCE. Следующий пример:
- Создает внешний источник данных с именем LocalSQLOutput.
- Определяет внешний источник данных (
LOCATION = '<vendor>://<server>[:<port>]'
). В примере он указывает на локальный экземпляр SQL Azure для пограничных вычислений. - Использует учетные данные, созданные ранее.
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ); GO
Создайте объект внешнего потока. В приведенном ниже примере в базе данных MySQLDatabase создается объект внешнего потока, указывающий на таблицу dbo.TemperatureMeasurements.
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
Пример. Создание объекта внешнего потока для Kafka
В приведенном ниже примере создается объект внешнего потока для локальной базы данных SQL Azure для пограничных вычислений. В приведенном ниже примере предполагается, что сервер Kafka настроен для анонимного доступа.
Создайте внешний источник данных с помощью инструкции CREATE EXTERNAL DATA SOURCE. Следующий пример:
CREATE EXTERNAL DATA SOURCE [KafkaInput] WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'); GO
Создайте внешний формат файла для входных данных Kafka. В следующем примере создается формат файла JSON с со сжатием GZip.
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' ); GO
Создайте объект внешнего потока. В следующем примере создается объект внешнего потока, указывающий на раздел Kafka
TemperatureMeasurement
.CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' ); GO
Создание задания потоковой передачи и потоковых запросов
Используйте системную sys.sp_create_streaming_job
хранимую процедуру, чтобы определить запросы потоковой передачи и создать задание потоковой передачи. Хранимая процедура sp_create_streaming_job
принимает следующие параметры:
@job_name
— имя задания потоковой передачи. Имена заданий потоковой передачи уникальны в пределах экземпляра.@statement
— операторы потокового запроса на основе языка запросов Stream Analytics.
В приведенном ниже примере создается простое задание потоковой передачи с одним потоковым запросом. Этот запрос считывает входные данные из центра IoT Edge и выполняет запись в dbo.TemperatureMeasurements
базы данных.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
@statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
В следующем примере создается более сложное задание потоковой передачи с несколькими разными запросами. Один из этих запросов использует встроенную функцию AnomalyDetection_ChangePoint
для обнаружения аномалий в данных температуры.
EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
@statement = N'
SELECT *
INTO TemperatureMeasurements1
FROM MyEdgeHubInput1
SELECT *
INTO TemperatureMeasurements2
FROM MyEdgeHubInput2
SELECT *
INTO TemperatureMeasurements3
FROM MyEdgeHubInput3
SELECT timestamp AS [Time],
[Temperature] AS [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
INTO TemperatureAnomalies
FROM MyEdgeHubInput2;
';
GO
Запуск, завершение, удаление и отслеживание заданий потоковой передачи
Чтобы запустить задание потоковой передачи в SQL Azure для пограничных вычислений, выполните хранимую процедуру sys.sp_start_streaming_job
. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно запустить.
EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO
Чтобы завершить задание потоковой передачи, выполните хранимую процедуру sys.sp_stop_streaming_job
. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно остановить.
EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO
Чтобы прервать (удалить) задание потоковой передачи, выполните хранимую процедуру sys.sp_drop_streaming_job
. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно прервать.
EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO
Чтобы получить состояние текущего задания потоковой передачи, выполните хранимую процедуру sys.sp_get_streaming_job
. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно прервать. Хранимая процедура выводит имя и текущее состояние задания потоковой передачи.
EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
(
name NVARCHAR(256),
status NVARCHAR(256),
error NVARCHAR(256)
)
);
GO
Возможные состояния задания потоковой передачи:
Состояние | Описание |
---|---|
Создано | Задание потоковой передачи создано, но не запущено. |
Запуск | Задание потоковой передачи запускается. |
Бездействие | Задание потоковой передачи выполняется, но входные данные для обработки отсутствуют. |
Обработка | Задание потоковой передачи выполняется, и входные данные обрабатываются. Это свидетельствует о работоспособности задания потоковой передачи. |
Деградация | Задание потоковой передачи выполняется, но при обработке входных данных возникли некритические ошибки. Задание ввода продолжает выполняться, но приведет к удалению входных данных, которые сталкиваются с ошибками. |
Остановлено | Выполнение задания потоковой передачи остановлено. |
Неудачно | При выполнении задания потоковой передачи произошел сбой. Обычно это указывает на неустранимую ошибку при обработке. |
Примечание.
Так как задание потоковой передачи выполняется асинхронно, задание может столкнуться с ошибками во время выполнения. Чтобы устранить сбой задания потоковой передачи, используйте sys.sp_get_streaming_job
хранимую процедуру или просмотрите журнал Docker из контейнера SQL Azure Для пограничных вычислений, которые могут предоставить сведения об ошибке из задания потоковой передачи.