Поделиться через


Подключение к данным Центров событий Azure

Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий. Azure Data Explorer предлагает возможности непрерывного приема данных из управляемых клиентами Центров событий.

Конвейер приема Центров событий передает события в Azure Data Explorer в несколько этапов. Сначала создайте концентратор событий на портале Azure. Затем вы создадите целевую таблицу в Azure Data Explorer, в которой данные в определенном формате будут приняты с использованием заданных свойств приема. Подключение к Центрам событий должно поддерживать маршрутизацию событий. Данные могут внедряться с выбранными свойствами в соответствии со свойствами системы событий. Создайте подключение к Центрам событий для создания концентратора событий и отправки событий. Этим процессом можно управлять с помощью портала Azure, программно с помощью C# или Pythonили с помощью шаблона Azure Resource Manager.

Общие сведения о приеме данных в обозревателе данных Azure см. в разделе Обзор приема данных Azure Data Explorer.

Механизмы аутентификации для подключений к данным в Azure Data Explorer

  • Подключение к данным на основе управляемых удостоверений (рекомендуется): используйте его для максимальной защищенности при подключении к источникам данных. Оно обеспечивает полный контроль над возможностью получения данных из источника данных. Настройка подключения к данным с помощью управляемого удостоверения требует выполнения следующих действий.

    1. Добавьте управляемое удостоверение в кластер.
    2. Предоставьте разрешения управляемому удостоверению в источнике данных. Чтобы получить данные из Центры событий Azure, управляемое удостоверение должно иметь разрешения Центры событий Azure приемника данных.
    3. Задайте политику управляемого удостоверения в целевых базах данных.
    4. Создайте подключение к данным с помощью проверки подлинности управляемого удостоверения для получения данных.

    Внимание

    Если разрешения управляемого удостоверения удаляются из источника данных, подключение к данным больше не будет работать и не сможет получить данные из источника данных.

  • Подключение к данным на основе ключей: если для подключения к данным не указана проверка подлинности управляемого удостоверения, подключение по умолчанию используется для проверки подлинности на основе ключей. Подключения на основе ключей получают данные с помощью строки подключения к ресурсу, например строки подключения Центров событий Azure. Azure Data Explorer получает строка подключения ресурса для указанного ресурса и безопасно сохраняет его. Затем строка подключения используется для получения данных из источника данных.

    Внимание

    Если ключ поворачивается, подключение к данным больше не будет работать и не сможет получить данные из источника данных. Чтобы устранить проблему, обновите или повторно создайте подключение к данным.

Формат данных

  • Данные считываются из концентратора событий в виде объектов EventData.
  • См. раздел Поддерживаемые форматы.

    Примечание.

    • Прием из Центров событий не поддерживает формат RAW.
    • Центры событий Azure Реестр схем и avro без схемы не поддерживаются.
    • Данные можно сжимать с помощью алгоритма сжатия gzip. Вы можете указать Compression динамически с помощью свойств приема или в статических параметрах подключения к данным.
    • Сжатие данных не поддерживается для двоичных форматов (Avro, ApacheAvro, Parquet, ORC и W3CLOGFILE).
    • Пользовательские свойства кодирования и внедренные системные свойства не поддерживаются с двоичными форматами и сжатыми данными.
    • При использовании двоичных форматов (Avro, ApacheAvro, Parquet, ORC и W3CLOGFILE) и сопоставлений приемов порядок полей в определении сопоставления приема должен соответствовать порядку соответствующих столбцов в таблице.

Свойства Центров событий

Azure Data Explorer поддерживает следующие свойства Центров событий:

  • Закрытый набор свойств приема, что помогает направлять события в соответствующие таблицы.
  • Закрытый набор свойств системы событий, который можно встроить в данные с учетом заданного сопоставления.

Примечание.

Прием пользовательских свойств Центров событий, используемый для связывания метаданных с событиями, не поддерживается. Если необходимо принять пользовательские свойства, отправьте их в тело данных события. Дополнительные сведения см. в разделе Прием пользовательских свойств.

Свойства приема

Свойства приема указывают процессу приема, куда направлять данные и как их обрабатывать. Вы можете указать свойства приема для приема событий с помощью EventData.Properties. Можно задать следующие свойства:

Примечание.

В именах свойств учитывается регистр.

Свойство Description
База данных Имя целевой базы данных с учетом регистра. По умолчанию данные принимаются в целевую базу данных, связанную с подключением к данным. Это свойство используется для переопределения базы данных по умолчанию и отправки данных в другую базу данных. Для этого необходимо сначала настроить подключение в виде подключения к нескольким базам данных.
Таблица Имя существующей целевой таблицы с учетом регистра. Переопределяет набор Table на панели Data Connection.
Формат Формат данных. Переопределяет набор Data format на панели Data Connection.
IngestionMappingReference Имя существующего сопоставления приема, которое будет использоваться. Переопределяет набор Column mapping на панели Data Connection.
Сжатие Сжатие данных, None (по умолчанию) или gzip.
Кодировка Кодировка данных, по умолчанию — UTF8. Может быть любой из поддерживаемых кодировок .NET.
Теги Список тегов, которые нужно связать с принятыми данными, в формате строки массива JSON. При использовании тегов может понизиться производительность.
RawHeaders Указывает, что источник событий — Kafka и Azure Data Explorer должны использовать десериализацию массива байтов для чтения других свойств маршрутизации. Значение не учитывается.

Примечание.

Принимаются только события, поставленные в очередь после создания подключения к данным.

Маршрутизация событий

При создании подключения к данным в кластере можно задать маршрутизацию для отправки принимаемых данных. По умолчанию используется маршрутизация к целевой таблице, указанной в строке подключения, которая связана с целевой базой данных. Маршрутизация по умолчанию для данных также называется статической маршрутизацией. Вы также можете указать альтернативную маршрутизацию для данных с помощью свойств данных событий, описанных выше.

Маршрутизация данных событий в альтернативную базу данных

По умолчанию маршрутизация данных в альтернативную базу данных отключена. Чтобы отправить данные в другую базу данных, необходимо сначала настроить подключение в виде подключения к нескольким базам данных. Это можно сделать на портале Azure, с помощью C#, Python илишаблона ARM. Пользователь, группа, субъект-служба или управляемое удостоверение, используемые для поддержки маршрутизации базы данных, должны иметь по крайней мере роль участника и разрешения на запись в кластер.

Чтобы указать альтернативную базу данных, задайте свойство приема базы данных.

Предупреждение

Указание альтернативной базы данных без настройки подключения в виде подключения к данным для нескольких баз данных приведет к сбою приема.

Маршрутизация данных событий в альтернативную таблицу

Чтобы указать альтернативную таблицу для каждого события, задайте таблицу, формат, сжатие и свойства приема сопоставления. Подключение будет динамически маршрутизировать принимаемые данные, как указано в EventData.Properties, переопределяя статические свойства для этого события.

В следующем примере показано, как задать сведения о центре событий и отправить данные метрик погоды в альтернативную базу данных (MetricsDB) и таблицу (WeatherMetrics). Данные имеют формат JSON, а сопоставление mapping1 задано предварительно для таблицы WeatherMetrics.

// This sample uses Azure.Messaging.EventHubs which is a .Net Framework library.
await using var producerClient = new EventHubProducerClient("<eventHubConnectionString>");
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(
    new { TimeStamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }
)));
eventData.Properties.Add("Database", "MetricsDB");
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['myDataTag']");
var events = new[] { eventData };
// Send events
await producerClient.SendAsync(events);

Сопоставление свойств системы событий

Системные свойства хранят свойства, заданные службой "Центры событий" на момент поставки события в очередь. Подключение данных к центру событий может внедрить выбранный набор системных свойств в данные, принятые в таблицу с учетом заданного сопоставления.

Примечание.

  • Внедренные системные свойства поддерживаются для форматов JSON и табличных форматов (то есть JSON, MultiJSON, CSV, TSV, PSV, SCsv, SOHsv, TSVE).
  • При использовании неподдерживаемого формата (TXT или сжатых форматов, например Parquet, Avro и т. д.) данные по-прежнему будут приняты, но свойства будут игнорироваться.
  • Внедрение системных свойств не поддерживается, если настроено сжатие сообщений концентратора событий. В таких случаях будет выдаваться соответствующая ошибка, и данные не будут приняты.
  • Для табличных данных системные свойства поддерживаются только для сообщений о событиях с одной записью.
  • Для данных JSON системные свойства также поддерживаются для сообщений о событиях с несколькими записями. В таких случаях системные свойства добавляются только к первой записи сообщения о событии.
  • Для CSV сопоставления свойства добавляются в начале записи в порядке, указанном в создании подключения к данным. Не полагаться на порядок этих свойств, так как он может измениться в будущем.
  • При сопоставлении в формате JSON свойства добавляются в соответствии с именами свойств, указанных в таблице Системные свойства.

Служба "Центры событий" предоставляет следующие системные свойства:

Свойство Тип данных Description
x-opt-enqueued-time datetime Время поставки событий в очередь в формате UTC
x-opt-sequence-number long Регистрационный номер транзакции в журнале для события в потоке разделов концентратора событий
x-opt-offset string Смещение события относительно потока разделов концентратора событий. Этот идентификатор смещения уникален внутри раздела потока концентратора событий.
x-opt-publisher string Имя издателя, если сообщение было отправлено в конечную точку издателя
x-opt-partition-key string Ключ соответствующей секции, в которой хранится событие

Кроме того, при работе с концентраторами событий IoT Central вы можете внедрить системные свойства Центра Интернета вещей в полезные данные. Полный список см. в разделе Системные свойства Центра Интернета вещей.

Если вы выбрали Свойства системы событий в разделе Источник данных таблицы, вы должны включить свойства в схему таблицы и сопоставление.

Примеры схемы сопоставления

Пример схемы сопоставления таблицы

Если данные содержат три столбца (TimeStamp, MetricName и Value) и включены свойства x-opt-enqueued-time и x-opt-offset, создайте или измените схему таблицы с помощью следующей команды:

    .create-merge table TestTable (TimeStamp: datetime, MetricName: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

Пример сопоставления CSV-файла

Выполните следующие команды, чтобы добавить данные в начало записи. Запишите порядковые номера.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "TimeStamp", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "MetricName", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

Пример сопоставления JSON

Данные добавляются с помощью сопоставления свойств системы. Выполните следующие команды.

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "TimeStamp", "Properties":{"Path":"$.TimeStamp"}},'
    '    { "column" : "MetricName", "Properties":{"Path":"$.MetricName"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.Value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

Сопоставление схем для файлов Avro для центров событий

Одним из способов использования данных Центров событий является запись событий с помощью Центры событий Azure в Хранилище BLOB-объектов Azure или Azure Data Lake Storage. Затем можно реализовать прием этих файлов по мере их записи с помощью подключения к данным Сетки событий в Azure Data Explorer.

Схема файлов записи отличается от схемы исходного события, отправляемого в Центры событий. Следует проектировать схему целевой таблицы с учетом этого различия. В частности, полезные данные события представлены в файле сбора в виде массива байтов, и этот массив не декодируется автоматически при подключении к данным Сетки событий в Azure Data Explorer. Дополнительные сведения о схеме файлов для данных записи Avro Центров событий см. в статье "Изучение захваченных файлов Avro" в Центры событий Azure.

Чтобы правильно декодировать полезные данные события:

  1. Сопоставьте поле Body записанного события со столбцом типа dynamic в целевой таблице.
  2. Примените политику обновления, которая преобразует массив байтов в удобочитаемую строку с помощью функции unicode_codepoints_to_string().

Настраиваемые свойства приема

При приеме событий из Центров событий данные берутся из раздела body объекта данных события. Но пользовательские свойства Центров событий определяются в разделе properties объекта и не принимаются. Чтобы принять пользовательские свойства, необходимо внедрить их в данные в разделе body объекта.

В следующем примере сравнивается объект данных событий, содержащий пользовательское свойство customProperty со значением defined от Центров событий (слева), со свойством embedded, которое требуется для приема (справа).

{
"body":{
"value": 42
},
"properties":{
"customProperty": "123456789"
}
}
{
"body":{
"value": 42,
"customProperty": "123456789"
}
}

Для внедрения пользовательских свойств в данные в разделе body объекта данных события можно использовать один из следующих методов:

Подключение к данным центров событий между регионами

Для максимальной производительности создавайте все приведенные ниже ресурсы в том же регионе, в котором находится кластер. Если нет другой альтернативы, рассмотрите возможность использования уровней "Премиум " или "Выделенные центры событий". Сравнение уровней Центров событий можно найти здесь.

Создание концентратора событий

Создайте концентратор событий, если у вас его еще нет. Подключение к концентратору событий можно устанавливать через портал Azure, программно с помощью C# либо Python или с помощью шаблона ARM.

Примечание.

  • Возможность динамического добавления секций после создания концентратора событий доступна только в Центрах событий категорий "Премиум" и "Выделенный" Указывать число секций следует с учетом долгосрочной перспективы.
  • Группа потребителей должна быть уникальной для каждого потребителя. Создайте группу потребителей, предназначенную для подключения к Azure Data Explorer.

Отправка событий

См. пример приложения, которое генерирует данные и отправляет их в концентратор событий.

Настройка решения для геоизбыточного аварийного восстановления

Концентратор событий предлагает решение для геоизбыточного аварийного восстановления. Azure Data Explorer не поддерживает пространства имен Alias концентратора событий. Чтобы реализовать геоизбыточное аварийное восстановление в решении, создайте два подключения к данным концентратора событий: одно для основного пространства имен и одно для дополнительного. Azure Data Explorer будет ожидать передачи данных с помощью обоих подключений концентратора событий.

Примечание.

Ответственность за реализацию отработки отказа из основного пространства имен в дополнительное несет пользователь.