Kusto.Ingest 状態レポート

この記事では、IKustoQueuedIngestClient 機能を使用して取り込み要求の状態を追跡する方法について説明します。

説明クラス

これらの説明クラスには、取り込まれるソース データに関する重要な詳細が含まれており、取り込み操作で使用する必要があります。

  • SourceDescription
  • DataReaderDescription
  • StreamDescription
  • FileDescription
  • BlobDescription

クラスはすべて抽象クラス SourceDescription から派生し、各データ ソースの一意識別子をインスタンス化するために使用されます。 その後、各識別子は状態の追跡に使用され、関連する操作に関するすべてのレポート、トレース、および例外で表示されます。

クラス SourceDescription

大規模なデータセットは 1 GB のチャンクに分割され、各部分は個別に取り込まれます。 その後、同じ SourceId が、同じデータセットからのすべての取り込み操作に適用されます。

public abstract class SourceDescription
{
    public Guid? SourceId { get; set; }
}

クラス DataReaderDescription

public class DataReaderDescription : SourceDescription
{
    public  IDataReader DataReader { get; set; }
}

クラス StreamDescription

public class StreamDescription : SourceDescription
{
    public Stream Stream { get; set; }
}

クラス FileDescription

public class FileDescription : SourceDescription
{
    public string FilePath { get; set; }
}

クラス BlobDescription

public class BlobDescription : SourceDescription
{
    public string BlobUri { get; set; }
    // Setting the Blob size here is important, as this saves the client the trouble of probing the blob for size
    public long? BlobSize { get; set; }
}

取り込み結果の表現

インターフェイス IKustoIngestionResult

このインターフェイスでは、単一のキューに入れられた取り込み操作の結果を取り込み、SourceId で取得できます。

public interface IKustoIngestionResult
{
    // Retrieves the detailed ingestion status of the ingestion source with the given sourceId.
    IngestionStatus GetIngestionStatusBySourceId(Guid sourceId);

    // Retrieves the detailed ingestion status of all data ingestion operations into Kusto associated with this IKustoIngestionResult instance.
    IEnumerable<IngestionStatus> GetIngestionStatusCollection();
}

クラス IngestionStatus

IngestionStatus には、単一の取り込み操作の完全な状態が含まれています。

public class IngestionStatus
{
    // The ingestion status returns from the service. Status remains 'Pending' during the ingestion process and
    // is updated by the service once the ingestion completes. When <see cref="IngestionReportMethod"/> is set to 'Queue' the ingestion status
    // will always be 'Queued' and the caller needs to query the report queues for ingestion status, as configured. To query statuses that were
    // reported to queue, see: <see href="https://learn.microsoft.com/azure/kusto/api/netfx/kusto-ingest-client-status#ingestion-status-in-azure-queue"/>.
    // When <see cref="IngestionReportMethod"/> is set to 'Table', call <see cref="IKustoIngestionResult.GetIngestionStatusBySourceId"/> or
    // <see cref="IKustoIngestionResult.GetIngestionStatusCollection"/> to retrieve the most recent ingestion status.
    public Status Status { get; set; }
    // A unique identifier representing the ingested source. Can be supplied during the ingestion execution.
    public Guid IngestionSourceId { get; set; }
    // The URI of the blob, potentially including the secret needed to access the blob.
    // This can be a filesystem URI (on-premises deployments only), or an Azure Blob Storage URI (including a SAS key or a semicolon followed by the account key)
    public string IngestionSourcePath { get; set; }
    // The name of the database holding the target table.
    public string Database { get; set; }
    // The name of the target table into which the data will be ingested.
    public string Table { get; set; }
    // The last updated time of the ingestion status.
    public DateTime UpdatedOn { get; set; }
    // The ingestion's operation Id.
    public Guid OperationId { get; set; }
    // The ingestion operation activity Id.
    public Guid ActivityId { get; set; }
    // In case of a failure - indicates the failure's error code.
    public IngestionErrorCode ErrorCode { get; set; }
    // In case of a failure - indicates the failure's status.
    public FailureStatus FailureStatus { get; set; }
    // In case of a failure - indicates the failure's details.
    public string Details { get; set; }
    // In case of a failure - indicates whether or not the failures originate from an Update Policy.
    public bool OriginatesFromUpdatePolicy { get; set; }
}

状態の列挙

説明 一時/永続
保留中 取り込み操作の結果に基づき、値は取り込みの過程で変わる可能性があります 一時
成功 データが正常に取り込まれました 永続的
失敗 取り込みに失敗しました 永続的
キューに登録済み データが取り込みのためにキューに入れられました 永続的
スキップ データが指定されませんでした。取り込み操作はスキップされました 永続的
PartiallySucceeded データの一部は正常に取り込まれましたが、失敗したものもあります 永続的

取り込み状態の追跡 (KustoQueuedIngestClient)

IKustoQueuedIngestClient は 'ファイア アンド フォーゲット' クライアントです。 クライアント側での取り込み操作は、Azure キューにメッセージを投稿することによって終了します。 投稿後に、クライアント ジョブが完了します。 クライアント ユーザーの便宜上、KustoQueuedIngestClient では個々の取り込み状態を追跡するためのメカニズムが提供されます。 このメカニズムは、高スループットの取り込みパイプラインでの大量使用のためのものではありません。 このメカニズムは、その率が比較的低く、追跡要件が厳しい場合の精密な取り込み用です。

警告

大量のデータ ストリームの場合、すべての取り込み要求に対して肯定通知を有効にすることは避ける必要があります。これにより、基になる xStore リソースに極端な負荷がかかり、取り込みの待機時間が長くなり、クラスターの応答性が完全になくなる可能性があるためです。

次のプロパティ (KustoQueuedIngestionProperties で設定) では、取り込みの成功または失敗の通知のレベルとトランスポートを制御します。

IngestionReportLevel の列挙

public enum IngestionReportLevel
{
    FailuresOnly = 0,
    None,
    FailuresAndSuccesses
}

IngestionReportMethod の列挙

public enum IngestionReportMethod
{
    Queue = 0,
    Table,
    QueueAndTable
}

取り込みの状態を追跡するには、取り込み操作を行う IKustoQueuedIngestClient に次の情報を指定します。

  1. IngestionReportLevel プロパティをレポートの必要なレベルに設定します。 FailuresOnly (既定値) または FailuresAndSuccessesNone に設定すると、取り込みの終了時に何も報告されません。
  2. IngestionReportMethod - QueueTable、または QueueAndTable を指定します。

使用例については、Kusto.Ingest 例に関するページを参照してください。

Azure テーブルの取り込みの状態

各取り込み操作から返される IKustoIngestionResult インターフェイスには、取り込みの状態を照会するために使用できる関数が含まれています。 返される IngestionStatus オブジェクトの Status プロパティに特に注意してください。

  • Pending は、ソースが取り込みのためにキューに入れられており、まだ更新されていないことを示します。 関数を再度使用して、ソースの状態を照会します
  • Succeeded は、ソースが正常に取り込まれたされたことを示します
  • Failed は、ソースの取り込みに失敗したことを示します

Note

Queued 状態は、IngestionReportMethod が既定値の 'Queue' のままになっていることを示します。 これは永続的な状態であり、GetIngestionStatusBySourceId または GetIngestionStatusCollection 関数を再度呼び出しても、常に同じ 'Queued' 状態になります。 Azure テーブルの取り込み状態を確認するには、取り込む前に、KustoQueuedIngestionPropertiesIngestionReportMethod プロパティが Table に設定されていることを確認します。 また、取り込み状態をキューに報告する場合は、状態を QueueAndTable に設定します。

Azure キューの取り込み状態

IKustoIngestionResult メソッドは、Azure テーブルの状態を確認する場合にのみ関係します。 Azure キューに報告された状態を照会するには、次の IKustoQueuedIngestClient メソッドを使用します。

メソッド 目的
PeekTopIngestionFailures 要求されたメッセージの制限により、まだ破棄されていない最も古い取り込みの失敗に関する情報を返す非同期メソッド
GetAndDiscardTopIngestionFailures 要求されたメッセージの制限により、まだ破棄されていない最も古い取り込みの失敗を返して破棄する非同期メソッド
GetAndDiscardTopIngestionSuccesses 要求されたメッセージの制限により、まだ破棄されていない最も古い取り込みの成功を返して破棄する非同期メソッド。 このメソッドは、IngestionReportLevelFailuresAndSuccesses に設定されている場合にのみ関係します

Azure キューから取得される取り込みの失敗

取り込みの失敗は、失敗に関する有用な情報を含む IngestionFailure オブジェクトによって表されます。

プロパティ 意味
データベース & テーブル 目的のデータベースとテーブルの名前
IngestionSourcePath 取り込まれた BLOB のパス。 ファイルが取り込まれた場合は元のファイル名が含まれます。 DataReader が取り込まれた場合はランダムになります
FailureStatus Permanent (再試行されない)、Transient (再試行される)、または Exhausted (数回の再試行も失敗)
OperationId & RootActivityId 取り込みの操作 ID と RootActivity ID (さらにトラブルシューティングを行うのに役立ちます)
FailedOn 失敗した時刻 (UTC)。 取り込みの実行前にデータが集計されるため、取り込みメソッドが呼び出された時刻値よりも大きくなります
詳細 失敗に関するその他の詳細 (存在する場合)
ErrorCode IngestionErrorCode 列挙。失敗した場合の取り込みエラー コードを表します