Cómo ingerir datos con la API de REST

Se prefiere la biblioteca Kusto.Ingest para ingerir datos en el clúster. Sin embargo, puede lograr prácticamente la misma funcionalidad sin depender del paquete Kusto.Ingest. En este artículo se muestra cómo, mediante la ingesta en cola en el clúster para canalizaciones de nivel de producción.

Nota

El código siguiente se escribe en C#, y usa el SDK de Azure Storage, la biblioteca de autenticación de Microsoft (MSAL) y el paquete NewtonSoft.JSON para simplificar el código de ejemplo. Si es necesario, el código correspondiente se puede reemplazar por las llamadas a la API REST de Azure Storage adecuadas, non-.NET paquete MSAL y cualquier paquete de control de JSON disponible.

En este artículo se trata el modo recomendado de ingesta. Para la biblioteca Kusto.Ingest, su entidad correspondiente es la interfaz IKustoQueuedIngestClient . Aquí, el código de cliente interactúa con el clúster publicando mensajes de notificación de ingesta en una cola de Azure. Las referencias a los mensajes se obtienen del servicio kusto Administración de datos (también conocido como ingesta). La interacción con el servicio debe autenticarse con Microsoft Entra id.

En el código siguiente se muestra cómo el servicio kusto Administración de datos controla la ingesta de datos en cola sin usar la biblioteca Kusto.Ingest. Este ejemplo puede ser útil si la versión completa de .NET no es accesible o no está disponible debido al entorno u otras restricciones.

El código incluye los pasos para crear un cliente de Azure Storage y cargar los datos en un blob. Cada paso se describe con más detalle, después del código de ejemplo.

  1. Obtención de un token de autenticación para acceder al servicio de ingesta
  2. Consulte el servicio de ingesta para obtener:
  3. Carga de datos en un blob en uno de los contenedores de blobs obtenidos de Kusto en (2)
  4. Redactar un mensaje de ingesta que identifique la base de datos y la tabla de destino y que apunte al blob desde (3)
  5. Publicar el mensaje de ingesta que hemos compuesto en (4) en una cola de ingesta obtenida en (2)
  6. Recuperar cualquier error encontrado por el servicio durante la ingesta
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Uso de la ingesta en cola para canalizaciones de nivel de producción

Obtención de evidencia de autenticación del identificador de Microsoft Entra

Aquí usamos la Biblioteca de autenticación de Microsoft (MSAL) para obtener un token de Microsoft Entra para acceder al servicio de Administración de datos kusto y solicitar sus colas de entrada. MSAL está disponible en varias plataformas.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

Recuperación de recursos de ingesta

Construya manualmente una solicitud HTTP POST al servicio Administración de datos, solicitando la devolución de los recursos de ingesta. Estos recursos incluyen colas en las que escucha el servicio DM y contenedores de blobs para la carga de datos. El servicio Administración de datos procesará los mensajes que contengan solicitudes de ingesta que lleguen a una de esas colas.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Obtención de un token de identidad de Kusto

Los mensajes de ingesta se entregan al clúster a través de un canal no directo (cola de Azure), lo que hace imposible realizar la validación de autorización en banda para acceder al servicio de ingesta. La solución consiste en adjuntar un token de identidad a cada mensaje de ingesta. El token habilita la validación de autorización en banda. A continuación, el servicio de ingesta puede validar este token firmado cuando recibe el mensaje de ingesta.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Carga de datos en el contenedor de blobs de Azure

Este paso consiste en cargar un archivo local en un blob de Azure que se entregará para la ingesta. Este código usa el SDK de Azure Storage. Si no es posible la dependencia, se puede lograr con la API REST de Azure Blob Service.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

Redacción del mensaje de ingesta

El paquete NewtonSoft.JSON volverá a componer una solicitud de ingesta válida para identificar la base de datos y la tabla de destino, y que apunta al blob. El mensaje se publicará en la cola de Azure en la que escucha el servicio kusto Administración de datos pertinente.

Estos son algunos puntos a tener en cuenta.

  • Esta solicitud es el mínimo para el mensaje de ingesta.

Nota

El token de identidad es obligatorio y debe formar parte del objeto JSON AdditionalProperties .

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

Publicar el mensaje de ingesta en la cola de ingesta

Por último, publique el mensaje que ha construido en la cola de ingesta seleccionada que obtuvo anteriormente.

Nota

Versiones de cliente de almacenamiento de .Net por debajo de v12, de forma predeterminada, codifican el mensaje en base64 Para obtener más información, consulte la documentación de almacenamiento. Si usa versiones de cliente de almacenamiento de .Net anteriores a v12, debe codificar correctamente el contenido del mensaje.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Comprobación de los mensajes de error de la cola de Azure

Después de la ingesta, se comprueban los mensajes de error de la cola correspondiente en la que escribe el Administración de datos. Para obtener más información sobre la estructura del mensaje de error, consulte Estructura de mensajes de error de ingesta.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

Mensajes de ingesta: formatos de documento JSON

Estructura interna del mensaje de ingesta

El mensaje que el servicio kusto Administración de datos espera leer de la cola de Azure de entrada es un documento JSON en el siguiente formato.

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Propiedad Descripción
Identificador Identificador de mensaje (GUID)
BlobPath Ruta de acceso (URI) al blob, incluida la clave SAS que concede permisos para leer, escribir o eliminarlo. Se requieren permisos para que el servicio de ingesta pueda eliminar el blob una vez que haya completado la ingesta de los datos.
RawDataSize Tamaño de los datos sin comprimir en bytes. Proporcionar este valor permite que el servicio de ingesta optimice la ingesta agregando potencialmente varios blobs. Esta propiedad es opcional, pero, si no se especifica, el servicio tendrá acceso al blob solo para recuperar el tamaño.
DatabaseName Nombre de la base de datos de destino
TableName Nombre de la tabla de destino
RetainBlobOnSuccess Si se establece trueen , el blob no se eliminará una vez completada correctamente la ingesta. Valor predeterminado: false
FlushImmediately Si se establece en true, se omitirá cualquier agregación. Valor predeterminado: false
ReportLevel Nivel de informes de errores o correctos: 0-Failures, 1-None, 2-All
ReportMethod Mecanismo de informes: 0-Queue, 1-Table
AdditionalProperties Otras propiedades, como format, tagsy creationTime. Para obtener más información, consulte propiedades de ingesta de datos.

Estructura de mensajes de error de ingesta

El mensaje que el Administración de datos espera leer de la cola de Azure de entrada es un documento JSON en el formato siguiente.

Propiedad Descripción
OperationId Identificador de operación (GUID) que se puede usar para realizar un seguimiento de la operación en el lado del servicio
Base de datos Nombre de la base de datos de destino
Tabla Nombre de la tabla de destino
FailedOn Marca de tiempo de error
IngestionSourceId GUID que identifica el fragmento de datos que no se pudo ingerir
IngestionSourcePath Ruta de acceso (URI) al fragmento de datos que no se pudo ingerir
Detalles Mensaje de error
ErrorCode Código de error. Para todos los códigos de error, consulte Códigos de error de ingesta.
FailureStatus Indica si el error es permanente o transitorio.
RootActivityId Identificador de correlación (GUID) que se puede usar para realizar un seguimiento de la operación en el lado del servicio.
OriginatesFromUpdatePolicy Indica si el error fue causado por una directiva de actualización transaccional errónea.
ShouldRetry Indica si la ingesta puede realizarse correctamente si se reintenta tal cual.