Создание задания потоковой передачи данных в SQL Azure для пограничных вычислений

Важно!

Azure SQL Edge больше не поддерживает платформу ARM64.

В этой статье описано, как создать задание потоковой передачи T-SQL в SQL Azure для пограничных вычислений. Вы создаете входные и выходные объекты внешнего потока, а затем определяете потоковый запрос в ходе создания задания потоковой передачи.

Настройка входных и выходных объектов внешнего потока

Потоковая передача T-SQL использует функциональные возможности внешнего источника данных SQL Server, чтобы определить источники данных, связанные с входными и выходными объектами внешнего потока в задании потоковой передачи. Используйте следующие команды T-SQL, чтобы создать входной или выходной объект внешнего потока.

Кроме того, если в качестве потока вывода используется SQL Azure для пограничных вычислений, SQL Server или База данных SQL Azure, необходимо СОЗДАТЬ УЧЕТНЫЕ ДАННЫЕ ДЛЯ БАЗЫ ДАННЫХ (Transact-SQL). Эта команда T-SQL определяет учетные данные для доступа к базе данных.

Поддерживаемые источники данных для входного потока и потока вывода

В настоящее время SQL Azure для пограничных вычислений поддерживает только следующие источники данных в качестве входных потоков и потоков вывода.

Тип источника данных Входные данные Выходные данные Description
Центр Azure IoT Edge Y Y Источник данных для чтения и записи потоковых данных в центр Azure IoT Edge. Дополнительные сведения см. в разделе Центр IoT Edge.
База данных SQL N Y Соединение с источником данных для записи потоковых данных в Базу данных SQL. Базой данных может быть локальная база данных в SQL Azure для пограничных вычислений, удаленная база данных в SQL Server или База данных SQL Azure.
Kafka Y N Источник данных для чтения потоковых данных из раздела Kafka.

Пример. Создание входного или выходного объекта внешнего потока для центра Azure IoT Edge

В следующем примере создается объект внешнего потока для центра Azure IoT Edge. Чтобы создать источник входных и выходных данных внешнего потока для центра Azure IoT Edge, сначала необходимо создать формат внешнего файла для структуры данных, которые считываются или записываются.

  1. Создайте формат внешнего файла типа JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Создайте внешний источник данных для центра Azure IoT Edge. Следующий скрипт T-SQL создает подключение источника данных к центру IoT Edge, который выполняется на том же узле Docker, что и SQL Azure для пограничных вычислений.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Создайте объект внешнего потока для центра Azure IoT Edge. Следующий скрипт T-SQL создает объект потока для центра IoT Edge. В случае с объектом потока центра IoT Edge параметр LOCATION представляет собой имя раздела или канала центра IoT Edge, в котором выполняется операция считывания или записи.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Пример. Создание объекта внешнего потока для Базы данных SQL Azure

В приведенном ниже примере создается объект внешнего потока для локальной базы данных SQL Azure для пограничных вычислений.

  1. Создайте главный ключ в базе данных. Это необходимо для шифрования секрета учетных данных.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Создайте учетные данные базы данных для доступа к источнику SQL Server. В следующем примере создаются учетные данные для внешнего источника данных, где IDENTITY = имя пользователя, а SECRET = пароль.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. Создайте внешний источник данных с помощью инструкции CREATE EXTERNAL DATA SOURCE. Следующий пример:

    • Создает внешний источник данных с именем LocalSQLOutput.
    • Определяет внешний источник данных (LOCATION = '<vendor>://<server>[:<port>]'). В примере он указывает на локальный экземпляр SQL Azure для пограничных вычислений.
    • Использует учетные данные, созданные ранее.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Создайте объект внешнего потока. В приведенном ниже примере в базе данных MySQLDatabase создается объект внешнего потока, указывающий на таблицу dbo.TemperatureMeasurements.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Пример. Создание объекта внешнего потока для Kafka

В приведенном ниже примере создается объект внешнего потока для локальной базы данных SQL Azure для пограничных вычислений. В приведенном ниже примере предполагается, что сервер Kafka настроен для анонимного доступа.

  1. Создайте внешний источник данных с помощью инструкции CREATE EXTERNAL DATA SOURCE. Следующий пример:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Создайте внешний формат файла для входных данных Kafka. В следующем примере создается формат файла JSON с со сжатием GZip.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Создайте объект внешнего потока. В следующем примере создается объект внешнего потока, указывающий на раздел Kafka TemperatureMeasurement.

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Создание задания потоковой передачи и потоковых запросов

Используйте системную sys.sp_create_streaming_job хранимую процедуру, чтобы определить запросы потоковой передачи и создать задание потоковой передачи. Хранимая процедура sp_create_streaming_job принимает следующие параметры:

  • @job_name — имя задания потоковой передачи. Имена заданий потоковой передачи уникальны в пределах экземпляра.
  • @statement — операторы потокового запроса на основе языка запросов Stream Analytics.

В приведенном ниже примере создается простое задание потоковой передачи с одним потоковым запросом. Этот запрос считывает входные данные из центра IoT Edge и выполняет запись в dbo.TemperatureMeasurements базы данных.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

В следующем примере создается более сложное задание потоковой передачи с несколькими разными запросами. Один из этих запросов использует встроенную функцию AnomalyDetection_ChangePoint для обнаружения аномалий в данных температуры.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Запуск, завершение, удаление и отслеживание заданий потоковой передачи

Чтобы запустить задание потоковой передачи в SQL Azure для пограничных вычислений, выполните хранимую процедуру sys.sp_start_streaming_job. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно запустить.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Чтобы завершить задание потоковой передачи, выполните хранимую процедуру sys.sp_stop_streaming_job. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно остановить.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Чтобы прервать (удалить) задание потоковой передачи, выполните хранимую процедуру sys.sp_drop_streaming_job. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно прервать.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Чтобы получить состояние текущего задания потоковой передачи, выполните хранимую процедуру sys.sp_get_streaming_job. В качестве входного объекта для хранимой процедуры необходимо указать имя задания потоковой передачи, которое нужно прервать. Хранимая процедура выводит имя и текущее состояние задания потоковой передачи.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

Возможные состояния задания потоковой передачи:

Status Описание
Создано Задание потоковой передачи создано, но не запущено.
Запуск Задание потоковой передачи запускается.
Бездействие Задание потоковой передачи выполняется, но входные данные для обработки отсутствуют.
Обрабатывается Задание потоковой передачи выполняется, и входные данные обрабатываются. Это свидетельствует о работоспособности задания потоковой передачи.
Деградация Задание потоковой передачи выполняется, но при обработке входных данных возникли некритические ошибки. Задание ввода продолжает выполняться, но приведет к удалению входных данных, которые сталкиваются с ошибками.
Остановлено Выполнение задания потоковой передачи остановлено.
Не удалось отправить При выполнении задания потоковой передачи произошел сбой. Обычно это указывает на неустранимую ошибку при обработке.

Примечание.

Так как задание потоковой передачи выполняется асинхронно, задание может столкнуться с ошибками во время выполнения. Чтобы устранить сбой задания потоковой передачи, используйте sys.sp_get_streaming_job хранимую процедуру или просмотрите журнал Docker из контейнера SQL Azure Для пограничных вычислений, которые могут предоставить сведения об ошибке из задания потоковой передачи.

Следующие шаги