Использование Logstash для потоковой передачи журналов с помощью API сбора данных HTTP (устаревшая версия)

Важно!

Прием данных с помощью подключаемого модуля вывода Logstash в настоящее время доступен в общедоступной предварительной версии. Эта функция предоставляется без соглашения об уровне обслуживания и не рекомендуется для использования в рабочей среде. Дополнительные сведения см. в статье Дополнительные условия использования Предварительных версий Microsoft Azure.

Примечание.

Более новая версия подключаемого модуля Logstash может пересылать журналы из внешних источников данных в пользовательские и стандартные таблицы с помощью API на основе DCR. Новый подключаемый модуль обеспечивает полный контроль над выходной схемой, включая конфигурацию имен и типов столбцов.

С помощью подключаемого модуля выходных данных Microsoft Sentinel для подсистемы сбора данных Logstash Data Collection можно отправить журнал любого типа через Logstash прямо в рабочую область Log Analytics для Microsoft Sentinel. Журналы будут отправлены в настраиваемую таблицу, определяемую с помощью выходного подключаемого модуля. Эта версия подключаемого модуля использует API сбора данных HTTP.

Дополнительные сведения о работе с подсистемой сбора данных Logstash см. в статье Начало работы с Logstash.

Обзор

Архитектура и вводная информация

Diagram of the Logstash architecture.

Подсистема Logstash состоит из трех компонентов.

  • Подключаемые модули ввода: настраиваемый сбор данных из различных источников.
  • Подключаемые модули фильтрации: обработка и нормализация данных в соответствии с заданными критериями.
  • Подключаемые модули вывода: настраиваемая отправка полученных и обработанных данных в разные целевые расположения.

Примечание.

  • Корпорация Майкрософт поддерживает только описываемый здесь подключаемый модуль вывода Logstash, предоставляемый в Microsoft Sentinel. Текущей версией этого подключаемого модуля является версия 1.0.0, выпущенная 25 августа 2020 года. Можно открыть запрос в службу поддержки для сообщения о любых проблемах, связанных с подключаемым модулем вывода.

  • Корпорация Майкрософт не поддерживает сторонние подключаемые модули вывода Logstash для Microsoft Sentinel, а также любые другие подключаемые модули Logstash или компоненты любого типа.

  • Подключаемый модуль вывода Logstash в Microsoft Sentinel поддерживает только logstash версии 7.0 до 7.17.10 и версии 8.0 до 8.9 и 8.11. При использовании Logstash 8 рекомендуется отключить ECS в конвейере.

Подключаемый модуль вывода Microsoft Sentinel для Logstash отправляет данные в формате JSON в рабочую область Log Analytics с помощью REST API HTTP-сборщика данных Log Analytics. Данные отправляются в пользовательские журналы.

Развертывание подключаемого модуля вывода Microsoft Sentinel в Logstash

Шаг 1. Установка

Подключаемый модуль вывода Microsoft Sentinel доступен в коллекции Logstash.

Шаг 2. Настройка

Используйте информацию в документации Logstash Структура файла конфигурации и добавьте в конфигурацию подключаемый модуль вывода Microsoft Sentinel со следующими ключами и значениями. (Правильный синтаксис файла конфигурации показан после таблицы.)

Имя поля Тип данных Description
workspace_id строка Введите уникальный идентификатор рабочей области (см. "Совет").
workspace_key строка Введите уникальный идентификатор первичного ключа рабочей области (см. "Совет").
custom_log_table_name строка Задайте имя таблицы, в которую будут отправляться журналы. Можно задать только одно имя таблицы для каждого подключаемого модуля вывода. Таблица журналов появится в Microsoft Sentinel в разделе Журналы, Таблицы, в категории Пользовательские журналы с суффиксом _CL.
endpoint строка Дополнительное поле. По умолчанию это конечная точка Log Analytics. Используйте это поле, чтобы задать альтернативную конечную точку.
time_generated_field строка Дополнительное поле. Это свойство переопределяет поле по умолчанию TimeGenerated в Log Analytics. Укажите имя поля метки времени в источнике данных. Данные в этом поле должны соответствовать формату ISO 8601 (YYYY-MM-DDThh:mm:ssZ).
key_names array Введите список полей схемы вывода Log Analytics. Каждый элемент списка должен быть заключен в одинарные кавычки, элементы разделены запятыми, а весь список заключен в квадратные скобки. См. пример ниже.
plugin_flush_interval number Дополнительное поле. Задайте для определения максимального интервала (в секундах) между передачами сообщений в Log Analytics. Значение по умолчанию равно 5.
amount_resizing boolean True или false. Включите или отключите механизм автоматического масштабирования, который регулирует размер буфера сообщений в соответствии с объемом полученных данных журнала.
max_items number Дополнительное поле. Применяется только в том случае, если для amount_resizing задано значение false, и позволяет ограничить размер (количество записей) буфера сообщений. Значение по умолчанию — 2000.
azure_resource_id строка Дополнительное поле. Определяет идентификатор ресурса Azure, в котором располагаются данные.
Значение идентификатора ресурса особенно необходимо, если используется RBAC в контексте ресурса для предоставления доступа только к определенным данным.

Совет

  • Идентификатор рабочей области и первичный ключ можно найти в ресурсе рабочей области в разделе Управление агентами.
  • Однако поскольку хранение учетных данных и другой конфиденциальной информации в виде открытого текста в файлах конфигурации не соответствует оптимальным методам обеспечения безопасности, настоятельно рекомендуется использовать хранилище ключей Logstash для безопасного включения идентификатора рабочей области и первичного ключа рабочей области в конфигурацию. Для просмотра инструкций см. документацию по эластичным базам данных.

Примеры конфигураций

Здесь приведено несколько примеров конфигурации, в которых используются разные параметры.

  • Базовая конфигурация, в которой используется канал ввода filebeat:

      input {
          beats {
              port => "5044"
          }
      }
      filter {
      }
      output {
          microsoft-logstash-output-azure-loganalytics {
            workspace_id => "<your workspace id>"
            workspace_key => "<your workspace key>"
            custom_log_table_name => "tableName"
          }
      }
    
  • Базовая конфигурация, в которой используется канал ввода tcp:

      input {
          tcp {
              port => "514"
              type => syslog #optional, will effect log type in table
          }
      }
      filter {
      }
      output {
          microsoft-logstash-output-azure-loganalytics {
            workspace_id => "<your workspace id>"
            workspace_key =>  "<your workspace key>"
            custom_log_table_name => "tableName"
          }
      }
    
  • Расширенная конфигурация:

      input {
          tcp {
              port => 514
              type => syslog
          }
      }
      filter {
          grok {
              match => { "message" => "<%{NUMBER:PRI}>1 (?<TIME_TAG>[0-9]{4}-[0-9]{1,2}-[0-9]{1,2}T[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2})[^ ]* (?<HOSTNAME>[^ ]*) %{GREEDYDATA:MSG}" }
          }
      }
      output {
          microsoft-logstash-output-azure-loganalytics {
              workspace_id => "<WS_ID>"
              workspace_key => "${WS_KEY}"
              custom_log_table_name => "logstashCustomTable"
              key_names => ['PRI','TIME_TAG','HOSTNAME','MSG']
              plugin_flush_interval => 5
          }
      } 
    
  • Более расширенная конфигурация для анализа пользовательской метки времени и строки JSON из неструктурированных текстовых данных и записи выбранного набора полей в Log Analytics с извлеченным меткой времени:

      # Example log line below:
      # Mon Nov 07 20:45:08 2022: { "name":"_custom_time_generated", "origin":"test_microsoft", "sender":"test@microsoft.com", "messages":1337}
      # take an input
      input {
          file {
              path => "/var/log/test.log"
          }
      }
      filter {
      # extract the header timestamp and the Json section
          grok {
              match => {
                  "message" => ["^(?<timestamp>.{24}):\s(?<json_data>.*)$"]
              }
          }
      # parse the extracted header as a timestamp
      date {
          id => 'parse_metric_timestamp'
              match => [ 'timestamp', 'EEE MMM dd HH:mm:ss yyyy' ]
              timezone => 'Europe/Rome'
              target => 'custom_time_generated'
          }
      json {
          source => "json_data"
          }
      }
      # output to a file for debugging (optional)
      output {
          file {
              path => "/tmp/test.txt"
              codec => line { format => "custom format: %{message} %{custom_time_generated} %{json_data}"}
          }
      }
      # output to the console output for debugging (optional)
      output {
          stdout { codec => rubydebug }
      }
      # log into Log Analytics
      output {
          microsoft-logstash-output-azure-loganalytics {
              workspace_id => '[REDACTED]'
              workspace_key => '[REDACTED]'
              custom_log_table_name => 'RSyslogMetrics'
              time_generated_field => 'custom_time_generated'
              key_names => ['custom_time_generated','name','origin','sender','messages']
          }
      }
    

    Примечание.

    Откройте репозиторий GitHub подключаемого модуля вывода, чтобы узнать больше о его внутренней работе, конфигурации и настройках производительности.

Шаг 3. Перезапуск Logstash

Шаг 4. Просмотр входящих журналов в Microsoft Sentinel

  1. Убедитесь, что сообщения отправляются в подключаемый модуль вывода.

  2. В меню навигации Microsoft Sentinel щелкните Журналы. В разделе Таблицы разверните категорию Пользовательские журналы. Найдите и выберите имя указанной таблицы (с суффиксом _CL) в конфигурации.

    Screenshot of log stash custom logs.

  3. Чтобы просмотреть записи в таблице, запросите таблицу, используя имя таблицы как схему.

    Screenshot of a log stash custom logs query.

Мониторинг журналов аудита подключаемого модуля вывода

Чтобы отслеживать подключение и активность подключаемого модуля вывода Microsoft Sentinel, включите соответствующий файл журнала Logstash. См. документацию Структура каталога Logstash для определения расположения файла журнала.

Если в этом файле журнала нет никаких данных, сгенерируйте и отправьте несколько событий локально (через подключаемые модули ввода и фильтрации), чтобы убедиться, что подключаемый модуль вывода получает данные. Microsoft Sentinel предоставляет поддержку только для проблем, связанных с подключаемым модулем вывода.

Следующие шаги

В этом документе приведена информация о том, как использовать Logstash для подключения внешних источников данных к Microsoft Sentinel. Ознакомьтесь с дополнительными сведениями о Microsoft Sentinel в следующих статьях: