透過服務匯流排傳訊進行分散式追蹤與相互關聯

微服務開發的其中一個常見問題,便是經由涉及處理程序的所有服務,追蹤來自用戶端之作業的能力。 此能力對於進行偵錯、效能分析、A/B 測試,以及其他一般診斷案例皆相當有用。 此問題的其中一部份,辨識追蹤作業的邏輯片段。 它包含訊息處理結果及延遲,以及外部相依性呼叫。 另外一部分,則是這些診斷事件於程序界線之外的關聯性。

當產生者透過佇列傳送訊息時,它通常會發生於其他邏輯作業的範圍中,並由其他用戶端或服務起始。 當取用者接收到訊息時,也會繼續相同的作業。 產生者與取用者 (以及其他處理該作業的服務) 應該都會發出遙測事件,以追蹤作業流程和結果。 若要將此類事件相互關聯並以端對端的方式追蹤作業,每個回報遙測的服務都必須為每個事件提供追蹤內容的戳記。

Microsoft Azure 服務匯流排傳訊已定義產生者與取用者應用來傳遞此類追蹤內容的裝載屬性。 通訊協定是以 W3C 追蹤內容為基礎。

屬性名稱 描述
Diagnostic-Id 產生者針對佇列之外部呼叫的唯一識別碼。 如需格式,請參閱 W3C 追蹤內容 traceparent 標頭

服務匯流排 .NET 用戶端自動追蹤

適用於 .NET 的 Azure 傳訊服務匯流排用戶端ServiceBusProcessor 類別會提供追蹤檢測點,可由追蹤系統或用戶端程式碼片段連結。 該檢測允許從用戶端追蹤針對服務匯流排傳訊服務的所有呼叫。 若訊息處理是透過使用 ServiceBusProcessorProcessMessageAsync (訊息處理常式模式) 完成,則訊息處理也會進行檢測處理。

透過 Azure Application Insights 進行追蹤

Microsoft Application Insights 能提供豐富的效能監視功能,包括自動要求和相依性追蹤。

請根據您的專案類型安裝 Application Insights SDK:

如果您使用 ServiceBusProcessorProcessMessageAsync (訊息處理常式模式) 來處理訊息,也會檢測訊息處理。 系統會自動追蹤由您的服務所完成的所有服務匯流排呼叫,並將其與其他遙測項目相互關聯。 否則,請參考下列範例以進行手動的訊息處理追蹤。

追蹤訊息處理

async Task ProcessAsync(ProcessMessageEventArgs args)
{
    ServiceBusReceivedMessage message = args.Message;
    if (message.ApplicationProperties.TryGetValue("Diagnostic-Id", out var objectId) && objectId is string diagnosticId)
    {
        var activity = new Activity("ServiceBusProcessor.ProcessMessage");
        activity.SetParentId(diagnosticId);
        // If you're using Microsoft.ApplicationInsights package version 2.6-beta or higher, you should call StartOperation<RequestTelemetry>(activity) instead
        using (var operation = telemetryClient.StartOperation<RequestTelemetry>("Process", activity.RootId, activity.ParentId))
        {
            telemetryClient.TrackTrace("Received message");
            try 
            {
            // process message
            }
            catch (Exception ex)
            {
                telemetryClient.TrackException(ex);
                operation.Telemetry.Success = false;
                throw;
            }

            telemetryClient.TrackTrace("Done");
        }
    }
}

在此範例中,系統會針對每個已處理的訊息回報要求遙測,並具有時間戳記、持續期間及結果 (成功)。 遙測也具有相互關聯屬性的集合。 於訊息處理期間回報的巢狀追蹤和例外狀況,也會具有相互關聯屬性的戳記,以代表它們是 RequestTelemetry 的「子系」。

如果您在訊息處理期間對支援的外部元件進行呼叫,系統也會自動對其進行追蹤及相互關聯。 請參閱使用 Application Insights .NET SDK 追蹤自訂作業以了解手動追蹤及相互關聯。

如果您除了 Application Insights SDK 之外,正在執行任何外部程式碼,則檢視 Application Insights 記錄時,預期會看到較長的持續時間

Longer duration in Application Insights log

這並不表示接收訊息時發生延遲。 在此案例中,已收到訊息,因為訊息會以參數的形式傳入 SDK 程式碼。 此外,App Insights 記錄中的名稱標記 (程序) 表示訊息現在正由外部事件處理程式碼進行處理。 此問題與 Azure 無關。 相反地,這些計量是指外部程式碼的效率,因為已經從服務匯流排收到訊息。

使用 OpenTelemetry 追蹤

服務匯流排 .NET 用戶端程式庫 7.5.0 版和更新版本支援實驗模式的 OpenTelemetry。 如需詳細資訊,請參閱 .NET SDK 中的分散式追蹤

在沒有追蹤系統下進行追蹤

如果您的追蹤系統不支援自動「服務匯流排」呼叫追蹤,您可以研究如何將該支援新增至追蹤系統或應用程式中。 本節說明由服務匯流排 .NET 用戶端所傳送的診斷事件。

服務匯流排 .NET 用戶端是使用 .NET 追蹤基本類型 System.Diagnostics.Activity \(英文\) 和 System.Diagnostics.DiagnosticSource \(英文\) 進行檢測。

Activity 會作為追蹤內容,而 DiagnosticSource 則為通知機制。

如果沒有適用於 DiagnosticSource 事件的接聽程式,檢測將會關閉以避免產生檢測成本。 DiagnosticSource 會將所有控制項賦予接聽程式:

  • 接聽程式能控制要接聽的來源和事件
  • 接聽程式能控制事件速率和取樣
  • 事件會搭配裝載傳送,該承載能提供完整內容,使您可以在事件期間存取並修改 Message 物件

在繼續進行實作之前,請先熟悉 DiagnosticSource 使用者指南 \(英文\)。

讓我們在 ASP.NET Core 應用程式中針對服務匯流排事件建立接聽程式,並使它可以透過 Microsoft.Extension.Logger 寫入記錄。 它會使用 System.Reactive.Core \(英文\) 程式庫來訂閱 DiagnosticSource (不使用它來訂閱 DiagnosticSource 的方式也十分簡單)

public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory factory, IApplicationLifetime applicationLifetime)
{
    // configuration...

    var serviceBusLogger = factory.CreateLogger("Azure.Messaging.ServiceBus");

    IDisposable innerSubscription = null;
    IDisposable outerSubscription = DiagnosticListener.AllListeners.Subscribe(delegate (DiagnosticListener listener)
    {
        // subscribe to the Service Bus DiagnosticSource
        if (listener.Name == "Azure.Messaging.ServiceBus")
        {
            // receive event from Service Bus DiagnosticSource
            innerSubscription = listener.Subscribe(delegate (KeyValuePair<string, object> evnt)
            {
                // Log operation details once it's done
                if (evnt.Key.EndsWith("Stop"))
                {
                    Activity currentActivity = Activity.Current;
                    serviceBusLogger.LogInformation($"Operation {currentActivity.OperationName} is finished, Duration={currentActivity.Duration}, Id={currentActivity.Id}, StartTime={currentActivity.StartTimeUtc}");
                }
            });
        }
    });

    applicationLifetime.ApplicationStopping.Register(() =>
    {
        outerSubscription?.Dispose();
        innerSubscription?.Dispose();
    });
}

在此範例中,接聽程式會記錄每個服務匯流排作業的持續期間、結果、唯一識別碼,以及開始時間。

事件

所有事件都會有符合開放遙測規格的下列屬性:https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/api.md

  • message_bus.destination - 佇列/主題/訂用帳戶路徑
  • peer.address - 完整名稱
  • kind - 產生者、取用者或用戶端。 傳送訊息時會使用產生者、接收時會使用取用者,安置時會使用用戶端。
  • componentservicebus

所有事件也有 EntityEndpoint 屬性。

  • Entity - - 實體 (佇列、主題等) 的名稱。
  • Endpoint:服務匯流排端點 URL

檢測的作業

以下是檢測作業的完整清單:

作業名稱 追蹤的 API
ServiceBusSender.Send ServiceBusSender.SendMessageAsync
ServiceBusSender.SendMessagesAsync
ServiceBusSender.Schedule ServiceBusSender.ScheduleMessageAsync
ServiceBusSender.ScheduleMessagesAsync
ServiceBusSender.Cancel ServiceBusSender.CancelScheduledMessageAsync
ServiceBusSender.CancelScheduledMessagesAsync
ServiceBusReceiver.Receive ServiceBusReceiver.ReceiveMessageAsync
ServiceBusReceiver.ReceiveMessagesAsync
ServiceBusReceiver.ReceiveDeferred ServiceBusReceiver.ReceiveDeferredMessagesAsync
ServiceBusReceiver.Peek ServiceBusReceiver.PeekMessageAsync
ServiceBusReceiver.PeekMessagesAsync
ServiceBusReceiver.Abandon ServiceBusReceiver.AbandonMessagesAsync
ServiceBusReceiver.Complete ServiceBusReceiver.CompleteMessagesAsync
ServiceBusReceiver.DeadLetter ServiceBusReceiver.DeadLetterMessagesAsync
ServiceBusReceiver.Defer ServiceBusReceiver.DeferMessagesAsync
ServiceBusReceiver.RenewMessageLock ServiceBusReceiver.RenewMessageLockAsync
ServiceBusSessionReceiver.RenewSessionLock ServiceBusSessionReceiver.RenewSessionLockAsync
ServiceBusSessionReceiver.GetSessionState ServiceBusSessionReceiver.GetSessionStateAsync
ServiceBusSessionReceiver.SetSessionState ServiceBusSessionReceiver.SetSessionStateAsync
ServiceBusProcessor.ProcessMessage ServiceBusProcessor 上設定的處理器回呼。 ProcessMessageAsync 屬性
ServiceBusSessionProcessor.ProcessSessionMessage ServiceBusSessionProcessor 上設定的處理器回呼。 ProcessMessageAsync 屬性

篩選和取樣

在某些情況下,您可能只會想記錄部分的事件,以減少效能額外負荷或儲存空間耗用量。 您可以僅記錄 'Stop' 事件 (如上述範例所示),或是對特定百分比的事件進行取樣。 DiagnosticSource 提供搭配 IsEnabled 述詞來達成它的方式。 如需詳細資訊,請參閱 DiagnosticSource 中以內容為基礎的篩選 \(英文\)。

IsEnabled 可以針對單一作業呼叫數次,以將效能影響降至最低。

系統會於下列序列中呼叫 IsEnabled

  1. IsEnabled(<OperationName>, string entity, null) 為例,IsEnabled("ServiceBusSender.Send", "MyQueue1")。 請注意到末端並沒有 'Start' 或 'Stop'。 請使用它來將特定作業或佇列篩選出來。 若回呼方法傳回 false,代表針對作業的事件並沒有傳送。

    • 針對 'Process' 和 'ProcessSession' 作業,您也會接收到 IsEnabled(<OperationName>, string entity, Activity activity) 回呼。 請使用它來根據 activity.Id 或 Tags 屬性篩選事件。
  2. IsEnabled(<OperationName>.Start) 為例,IsEnabled("ServiceBusSender.Send.Start")。 檢查是否應該引發 'Start' 事件。 結果只會影響 'Start' 事件,但進一步的檢測並不會與其相依。

'Stop' 事件沒有 IsEnabled

若某個作業結果為例外狀況,系統便會呼叫 IsEnabled("ServiceBusSender.Send.Exception")。 您只能訂閱 'Exception' 事件並避免剩下的檢測。 在此情況下,您仍然必須處理這類例外狀況。 由於其他檢測皆已停用,您不應預期追蹤內容會有從取用者傳送至產生者的訊息。

您也可以使用 IsEnabled 來實作取樣策略。 以 Activity.IdActivity.RootId 為基礎的取樣,能確保在所有嘗試上皆取得一致的取樣 (只要是由追蹤系統或您自己的程式碼所傳播)。

在存在針對相同來源多個 DiagnosticSource 接聽程式的情況下,只要其中一個接聽程式接受事件便已足夠,因此並無法保證會呼叫 IsEnabled

下一步