Configuración de la ingesta de streaming en el clúster de Azure Data Explorer
Artículo
La ingesta de streaming es útil para cargar datos cuando necesite una latencia baja entre la ingesta y la consulta. Considere la posibilidad de usar la ingesta de streaming en los escenarios siguientes:
Se requiere una latencia de menos de un segundo.
Para optimizar el procesamiento operativo de muchas tablas donde el flujo de datos a cada tabla es relativamente pequeño (pocos registros por segundo), pero el volumen de ingesta de datos global es alto (miles de registros por segundo).
Si el flujo de datos en cada tabla es alto (más de 4 GB por hora), considere la posibilidad de usar la ingesta en cola.
Para obtener ejemplos de código basados en versiones anteriores del SDK, consulte el artículo archivado.
Selección del tipo de ingesta de streaming adecuado
Se admiten dos tipos de ingesta de streaming:
Tipo de ingesta
Descripción
Conexión de datos
Las conexiones de datos de Event Hubs, IoT Hub y Event Grid pueden usar la ingesta de streaming, siempre que esté habilitada en el nivel de clúster. La decisión de usar la ingesta de streaming se realiza según la directiva de ingesta de streaming configurada en la tabla de destino. Para obtener información sobre cómo administrar conexiones de datos, consulte Centro de eventos, IoT Hub y Event Grid.
Use la tabla siguiente para ayudarle a elegir el tipo de ingesta adecuado para su entorno:
Criterio
Conexión de datos
Ingesta personalizada
Retraso de datos entre el inicio de la ingesta y los datos disponibles para la consulta
Retraso más largo
Retraso más corto
Sobrecarga de desarrollo
Configuración rápida y sencilla, sin sobrecarga de desarrollo
Alta sobrecarga de desarrollo para crear una aplicación que ingiera los datos, controle los errores y garantice la coherencia de los datos
Nota:
Puede administrar el proceso para habilitar y deshabilitar la ingesta de streaming en el clúster mediante Azure Portal o mediante programación en C#. Si usa C# para la aplicación personalizada, puede que le resulte más cómodo usar el enfoque mediante programación.
Los principales colaboradores que pueden afectar a la ingesta de streaming son:
Tamaños de máquina virtual y clúster: el rendimiento y la capacidad de la ingesta de streaming se escalan cuando aumentan los tamaños de las máquinas virtuales y los clústeres. El número de solicitudes de ingesta simultáneas está limitado a seis por núcleo. Por ejemplo, en el caso de las SKU de 16 núcleos, como las D14 y L16, la carga máxima admitida es las solicitudes de 96 ingestas simultáneas. En el caso de las SKU de dos núcleos, como la D11, la carga máxima admitida es las solicitudes de 12 ingestas simultáneas.
Límite de tamaño de los datos: el límite del tamaño de los datos para una solicitud de ingesta de streaming es de 4 MB. Esto incluye los datos creados para las directivas de actualización durante la ingesta.
Actualizaciones de esquema: las actualizaciones de esquema, como la creación y modificación de tablas y asignaciones de ingesta, pueden tardar hasta cinco minutos en el servicio de ingesta de streaming. Para más información, consulte Ingesta de streaming y cambios de esquema.
Capacidad de SSD: cuando se habilita la ingesta de streaming en un clúster, incluso cuando los datos no se ingieren a través de streaming, se usa parte del disco SSD local de las máquinas del clúster para los datos de ingesta de streaming y se reduce el almacenamiento disponible para la caché activa.
Habilitación de la ingesta de streaming en el clúster
Para habilitar la ingesta de streaming durante la creación de un nuevo clúster de Azure Data Explorer, ejecute el siguiente código:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Habilitación de la ingesta de streaming en un clúster existente
Si tiene un clúster ya existente, puede habilitar la ingesta de streaming mediante Azure Portal o mediante programación en C#.
En Azure Portal, vaya al clúster de Azure Data Explorer.
En Configuración, seleccione Configuraciones.
En el panel Configuraciones, seleccione Activado para habilitar la ingesta de streaming.
Seleccione Guardar.
Puede habilitar la ingesta de streaming al actualizar un clúster de Azure Data Explorer existente.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Creación de una tabla de destino y definición de la directiva
Cree una tabla para recibir los datos de ingesta de streaming y defina su directiva relacionada mediante Azure Portal o mediante programación en C#.
Copie uno de los siguientes comandos en el panel Consulta y seleccione Ejecutar. Esto define la directiva de ingesta de streaming en la tabla que ha creado o en la base de datos que contiene la tabla.
Sugerencia
Una directiva que se define en el nivel de base de datos se aplica a todas las tablas existentes y futuras de la base de datos. Al habilitar la directiva en el nivel de base de datos, no es necesario habilitarla por tabla.
Para definir la directiva en la tabla que ha creado, use:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Deshabilitación de la ingesta de streaming en el clúster
Advertencia
La deshabilitación de la ingesta de streaming puede tardar unas horas.
Antes de deshabilitar la ingesta de streaming en el clúster de Azure Data Explorer, quite la directiva de ingesta de streaming de todas las tablas y bases de datos pertinentes. La eliminación de la directiva de ingesta de streaming desencadena la reorganización de los datos dentro del clúster de Azure Data Explorer. Los datos de ingesta de streaming se trasladan desde el almacenamiento inicial hasta el almacenamiento permanente en el almacén de columnas (extensiones o particiones). Este proceso puede tardar entre unos segundos y algunas horas, en función de la cantidad de datos existentes en el almacenamiento inicial.
Eliminación de la directiva de ingesta de streaming
Puede eliminar la directiva de ingesta de streaming mediante Azure Portal o mediante programación en C#.
En Azure Portal, vaya al clúster de Azure Data Explorer y seleccione Consulta.
Para quitar la directiva de ingesta de streaming de la tabla, copie el siguiente comando en el panel Consulta y seleccione Ejecutar.
.delete table TestTable policy streamingingestion
En Configuración, seleccione Configuraciones.
En el panel Configuraciones, seleccione Desactivado para deshabilitar la ingesta de streaming.
Seleccione Guardar.
Para quitar la directiva de ingesta de streaming de la tabla, ejecute el siguiente código:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Para deshabilitar la ingesta de streaming en el clúster, ejecute el siguiente código:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Limitaciones
La asignación de datos debe haberse creado con anterioridad para su uso en la ingesta de streaming. Las solicitudes individuales de ingesta de streaming no acomodan asignaciones de datos insertadas.
Directiva de actualización. La directiva de actualización solo puede hacer referencia a los datos que se acaban de ingerir en la tabla de origen y no a otros datos o tablas de la base de datos.
Cuando se produce un error en una directiva de actualización con una directiva transaccional, los reintentos se revertirán a la ingesta por lotes.
Si la ingesta de streaming está habilitada en un clúster que se usa como clip inicial de las bases de datos de seguidores, la ingesta de streaming debe habilitarse también en los siguientes clústeres para seguir los datos de ingesta de streaming. Lo mismo es aplicable si los datos del clúster se comparten mediante Data Share.