Распределенная трассировка и корреляция путем обмена сообщениями через служебную шину

Одна из распространенных проблем в разработке микрослужб — это возможность трассировки операции из клиента во всех службах, участвующих в обработке. Это удобно для выполнения отладки, анализа производительности, A/B-тестирования и других стандартных сценариев диагностики. Одним из аспектов этой проблемы является отслеживание логических рабочих частей. Сюда входит результат обработки сообщений, задержка вызовов зависимостей и внешние вызовы зависимостей. Другой аспект — это корреляция этих диагностических событий, которые выходят за границы процессов.

Когда производитель отправляет сообщение через очередь, это обычно происходит в области какой-нибудь другой логической операции, инициированной другим клиентом или другой службой. При получении сообщения потребитель продолжает выполнять ту же операцию. Производитель и потребитель (и другие службы, обрабатывающие операции), предположительно генерируют события телеметрии для выполнения трассировки потока и результата операции. Чтобы сопоставить эти события и выполнить комплексную трассировку операции, каждая служба, отправляющая отчеты с данными телеметрии, должна помечать каждое событие с контекстом трассировки.

Служебная шина Microsoft Azure для обмена сообщениями имеет определенные свойства полезных данных, которые производитель и потребитель должны использовать для передачи этого контекста трассировки. Протокол основан на контексте трассировки W3C.

Имя свойства Description
Diagnostic-Id Уникальный идентификатор внешнего вызова от производителя к очереди. Формат см. в заголовке traceparent контекста трассировки W3C.

Автоматическая трассировка клиента .NET служебной шины

Класс ServiceBusProcessorклиента служебной шины Microsoft Azure для .NET предоставляет точки инструментария трассировки, которые можно обработать с помощью систем трассировки или фрагмента кода клиента. Инструментирование позволяет отслеживать все вызовы в службу обмена сообщениями служебной шины со стороны клиента. Если обработка сообщений выполняется с помощью ProcessMessageAsyncServiceBusProcessor (шаблона обработчика сообщений), она также будет инструментирована.

Отслеживание с помощью Azure Application Insights

Microsoft Application Insights предоставляет широкие возможности мониторинга производительности, включая автоматическое создание запросов и отслеживание зависимостей.

В зависимости от типа проекта установите пакет SDK для Application Insights:

  • ASP.NET: установите версию 2.5-beta2 или более позднюю.
  • ASP.NET Core: установите версию 2.2.0-beta2 или более позднюю. Эти ссылки содержат подробные сведения об установке пакета SDK, создании ресурсов и настройке пакета SDK (при необходимости). Сведения для приложений без ASP.NET см. в статье Application Insights for .NET console applications (Application Insights для консольных приложений .NET).

Если вы используете ProcessMessageAsyncServiceBusProcessor (шаблона обработчика сообщений) для обработки сообщений, то обработка сообщений также будет инструментирована. Все вызовы служебной шины, выполненные вашей службой, автоматически отслеживаются и связываются с другими элементами телеметрии. В противном случае ознакомьтесь с указанным ниже примером для отслеживания обработки сообщений вручную.

Обработка сообщений трассировки

async Task ProcessAsync(ProcessMessageEventArgs args)
{
    ServiceBusReceivedMessage message = args.Message;
    if (message.ApplicationProperties.TryGetValue("Diagnostic-Id", out var objectId) && objectId is string diagnosticId)
    {
        var activity = new Activity("ServiceBusProcessor.ProcessMessage");
        activity.SetParentId(diagnosticId);
        // If you're using Microsoft.ApplicationInsights package version 2.6-beta or higher, you should call StartOperation<RequestTelemetry>(activity) instead
        using (var operation = telemetryClient.StartOperation<RequestTelemetry>("Process", activity.RootId, activity.ParentId))
        {
            telemetryClient.TrackTrace("Received message");
            try 
            {
            // process message
            }
            catch (Exception ex)
            {
                telemetryClient.TrackException(ex);
                operation.Telemetry.Success = false;
                throw;
            }

            telemetryClient.TrackTrace("Done");
        }
    }
}

В этом примере телеметрия запроса выводится для каждого обработанного сообщения вместе с меткой времени, продолжительностью и результатом (успешно). Данные телеметрии также имеют набор свойств корреляции. Вложенные трассировки и исключения, обнаруженные во время обработки сообщения, также имеют свойства корреляции, представляя их в качестве дочерних элементов RequestTelemetry.

Если вы отправили вызовы к поддерживаемым внешним компонентам во время обработки сообщения, они также будут автоматически отслеживаться и коррелироваться. Дополнительные сведения о выполнении отслеживания и корреляции вручную см. в статье Отслеживание пользовательских операций с помощью пакета SDK Application Insights для .NET.

Если кроме пакета SDK для Application Insights вы используете какой-либо внешний код, при просмотре журналов Application Insights может увеличиться время ожидания.

Longer duration in Application Insights log

Это не значит, что во время получения сообщения произошла задержка. В этом сценарии сообщение уже получено, поскольку оно передается в качестве параметра в код пакета SDK. Кроме того, тег name в журналах App Insights (Process) указывает на то, что сообщение теперь обрабатывается с помощью внешнего кода обработки событий. Эта проблема не связана с Azure. Наоборот, эти метрики указывают на эффективность вашего внешнего кода при условии, что сообщение уже получено из служебной шины.

Отслеживание с помощью OpenTelemetry

Клиентская библиотека .NET версии 7.5.0 и выше Служебной шины поддерживает OpenTelemetry в экспериментальном режиме. Дополнительные сведения см. в статье "Распределенная трассировка" в пакете SDK для .NET.

Отслеживание без системы трассировки

Если ваша система трассировки не поддерживает автоматическое отслеживание вызовов служебной шины, вы можете рассмотреть возможность добавить эту поддержку в систему трассировки или приложение. В этом разделе описываются события диагностики, отправленные клиентом .NET служебной шины.

Клиент .NET служебной шины инструментируется с помощью примитивов трассировки .NET System.Diagnostics.Activity и System.Diagnostics.DiagnosticSource.

Activity используется как контекст трассировки, в то время как DiagnosticSource — это механизм уведомлений.

Если для событий DiagnosticSource нет прослушивателя, инструментирование будет отключено, чтобы за него не взималась плата. DiagnosticSource передает весь контроль прослушивателю:

  • прослушиватель определяет, какие источники и события следует прослушивать;
  • прослушиватель контролирует частоту событий и выборку;
  • события отправляются вместе с полезными данными, которые предоставляют полный контекст, чтобы вы могли получить доступ к объекту сообщения во время события и изменить его.

Ознакомьтесь с руководством пользователя DiagnosticSource, прежде чем приступить к реализации.

Давайте создадим прослушиватель для событий служебной шины в приложении ASP.NET Core, который записывает журналы с помощью Microsoft.Extension.Logger. Он использует библиотеку System.Reactive.Core, чтобы подписаться на DiagnosticSource (на DiagnosticSource можно также легко подписаться без прослушивателя).

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime applicationLifetime)
{
    // configuration...

    var serviceBusLogger = factory.CreateLogger("Azure.Messaging.ServiceBus");

    IDisposable innerSubscription = null;
    IDisposable outerSubscription = DiagnosticListener.AllListeners.Subscribe(delegate (DiagnosticListener listener)
    {
        // subscribe to the Service Bus DiagnosticSource
        if (listener.Name == "Azure.Messaging.ServiceBus")
        {
            // receive event from Service Bus DiagnosticSource
            innerSubscription = listener.Subscribe(delegate (KeyValuePair<string, object> evnt)
            {
                // Log operation details once it's done
                if (evnt.Key.EndsWith("Stop"))
                {
                    Activity currentActivity = Activity.Current;
                    serviceBusLogger.LogInformation($"Operation {currentActivity.OperationName} is finished, Duration={currentActivity.Duration}, Id={currentActivity.Id}, StartTime={currentActivity.StartTimeUtc}");
                }
            });
        }
    });

    applicationLifetime.ApplicationStopping.Register(() =>
    {
        outerSubscription?.Dispose();
        innerSubscription?.Dispose();
    });
}

В этом примере прослушиватель записывает длительность, результат, уникальный идентификатор и время начала для каждой операции служебной шины.

События

Все события будут иметь следующие свойства, соответствующие спецификации OpenTelemetry: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/api.md.

  • message_bus.destination — очередь, раздел и путь к подписке;
  • peer.address — пространство полных имен;
  • kind — производитель, потребитель или клиент (производитель используется при отправке сообщений, потребитель — при получении, а клиент —при сопоставлении);
  • componentservicebus

Все события также имеют Entity и Endpoint свойства.

  • Entity — имя сущности (очереди, раздела и т. д.);
  • Endpoint — URL-адрес конечной точки служебной шины.

Инструментированные операции

Ниже приведен полный список инструментированных операций.

Имя операции Отслеживаемый API
ServiceBusSender.Send ServiceBusSender.SendMessageAsync
ServiceBusSender.SendMessagesAsync
ServiceBusSender.Schedule ServiceBusSender.ScheduleMessageAsync
ServiceBusSender.ScheduleMessagesAsync
ServiceBusSender.Cancel ServiceBusSender.CancelScheduledMessageAsync
ServiceBusSender.CancelScheduledMessagesAsync
ServiceBusReceiver.Receive ServiceBusReceiver.ReceiveMessageAsync
ServiceBusReceiver.ReceiveMessagesAsync
ServiceBusReceiver.ReceiveDeferred ServiceBusReceiver.ReceiveDeferredMessagesAsync
ServiceBusReceiver.Peek ServiceBusReceiver.PeekMessageAsync
ServiceBusReceiver.PeekMessagesAsync
ServiceBusReceiver.Abandon ServiceBusReceiver.AbandonMessagesAsync
ServiceBusReceiver.Complete ServiceBusReceiver.CompleteMessagesAsync
ServiceBusReceiver.DeadLetter ServiceBusReceiver.DeadLetterMessagesAsync
ServiceBusReceiver.Defer ServiceBusReceiver.DeferMessagesAsync
ServiceBusReceiver.RenewMessageLock ServiceBusReceiver.RenewMessageLockAsync
ServiceBusSessionReceiver.RenewSessionLock ServiceBusSessionReceiver.RenewSessionLockAsync
ServiceBusSessionReceiver.GetSessionState ServiceBusSessionReceiver.GetSessionStateAsync
ServiceBusSessionReceiver.SetSessionState ServiceBusSessionReceiver.SetSessionStateAsync
ServiceBusProcessor.ProcessMessage Для обратного вызова процессора задано значение ServiceBusProcessor. Свойство ProcessMessageAsync
ServiceBusSessionProcessor.ProcessSessionMessage Для обратного вызова процессора задано значение ServiceBusSessionProcessor. Свойство ProcessMessageAsync

Фильтрация и выборка

В некоторых случаях предпочтительно зарегистрировать только часть событий, чтобы снизить потери производительности и уровень использования хранилища. Вы можете зарегистрировать только события Stop (как показано в предыдущем примере) или процент выборок событий. DiagnosticSource предоставляет способ реализации этих возможностей с помощью предиката IsEnabled. Дополнительные сведения см. в разделе Фильтрация на основе контекста.

Вы можете вызвать IsEnabled несколько раз для одной операции, чтобы минимизировать влияние на производительность.

IsEnabled вызывается в следующей последовательности:

  1. IsEnabled(<OperationName>, string entity, null), например, IsEnabled("ServiceBusSender.Send", "MyQueue1"). Обратите внимание, что в конце нет события Start или Stop. Используйте его, чтобы отфильтровать конкретные операции или очереди. Если метод обратного вызова возвращает значение false, события для операции не отправляются.

    • Для операций Process и ProcessSession вы также получите обратный вызов IsEnabled(<OperationName>, string entity, Activity activity). С помощью него можно фильтровать события на основе activity.Id или свойства тегов.
  2. IsEnabled(<OperationName>.Start), например, IsEnabled("ServiceBusSender.Send.Start"). Проверяет, следует ли вызывать событие Start. Результат влияет только на событие Start, но дальнейшее инструментирование от него не зависит.

Для события Stop отсутствует IsEnabled.

Если результатом некоторых операций является исключение, вызывается IsEnabled("ServiceBusSender.Send.Exception"). Вы можете только подписаться на события Exception и запретить остальное инструментирование. В этом случае вам по-прежнему следует обработать такие исключения. После отключения других операций инструментирования не следует ожидать, что контекст трассировки будет передаваться с сообщением от потребителя к производителю.

Вы также можете использовать IsEnabled, который также реализует стратегии выборки. Выборка на основе Activity.Id или Activity.RootId гарантирует согласованную выборку на всех ресурсах (при условии, что она распространяется системой трассировки или вашим кодом).

При наличии нескольких прослушивателей DiagnosticSource для одного источника будет достаточно, чтобы событие принял один из них, поэтому IsEnabled вызывать необязательно.

Следующие шаги