Configurare l'inserimento in streaming nel cluster Esplora dati di Azure
Articolo
L’inserimento in streaming è utile per il caricamento dei dati quando è necessaria una bassa latenza tra inserimento e query. Si consideri l'uso di un inserimento in streaming negli scenari seguenti:
È obbligatoria una latenza inferiore a un secondo.
Per ottimizzare l'elaborazione operativa di molte tabelle in cui il flusso di dati in ogni tabella è relativamente piccolo (pochi record al secondo), ma il volume di inserimento dati complessivo è elevato (migliaia di record al secondo).
Se il flusso di dati in ogni tabella è elevato (oltre 4 GB all'ora), è consigliabile usare l'inserimento in coda.
Per esempi di codice basati sulle versioni precedenti dell'SDK, vedere l'articolo archiviato.
Scegliere il tipo di inserimento in streaming appropriato
Sono supportati due tipi di inserimento in streaming:
Tipo di inserimento
Descrizione
Connessione dati
Hub eventi, hub IoT e connessioni dati di Griglia di eventi possono usare l'inserimento in streaming, purché sia abilitato a livello di cluster. La decisione di usare l'inserimento in streaming viene eseguita in base ai criteri di inserimento di streaming configurati nella tabella di destinazione. Per informazioni sulla gestione delle connessioni dati, vedere Hub eventi, hub IoT e Griglia di eventi.
Usare la tabella seguente per scegliere il tipo di inserimento appropriato per l'ambiente di utilizzo:
Criterio
Connessione dati
Inserimento personalizzato
Ritardo dei dati tra l'avvio dell'inserimento e la disponibilità dei dati per la query
Ritardo più lungo
Ritardo più breve
Sovraccarico di sviluppo
Configurazione rapida e semplice, senza sovraccarico di sviluppo
Sovraccarico elevato di sviluppo per creare un'applicazione per inserire i dati, gestire gli errori e garantire la coerenza dei dati
Nota
È possibile gestire il processo per abilitare e disabilitare l'inserimento in streaming nel cluster usando il portale di Azure o a livello di codice in C#. Se si usa C# per l'applicazione personalizzata, è possibile che sia più utile usare l'approccio programmatico.
Considerazioni sulle prestazioni e sulle operazioni
I collaboratori principali che possono influire sull'inserimento in streaming sono:
Dimensioni della macchina virtuale e del cluster: le prestazioni e la capacità di inserimento di streaming aumentano le dimensioni delle macchine virtuali e del cluster. Il numero di richieste di inserimento simultanee è limitato a sei per core. Ad esempio, per 16 SKU core, ad esempio D14 e L16, il carico massimo supportato è 96 richieste di inserimento simultanee. Per due SKU core, ad esempio D11, il carico massimo supportato è 12 richieste di inserimento simultanee.
Limite di dimensioni dei dati: il limite di dimensioni dei dati per una richiesta di inserimento in streaming è di 4 MB. Sono inclusi tutti i dati creati per i criteri di aggiornamento durante l'inserimento.
Aggiornamenti dello schema: gli aggiornamenti dello schema, ad esempio la creazione e la modifica di tabelle e mapping di inserimento, possono impiegare fino a cinque minuti per il servizio di inserimento in streaming. Per altre informazioni, vedere Inserimento in streaming e modifiche allo schema.
Capacità SSD: abilitando l'inserimento di streaming in un cluster, anche quando i dati non vengono inseriti tramite streaming, usa parte del disco SSD locale dei computer cluster per lo streaming dei dati di inserimento e riduce lo spazio di archiviazione disponibile per la cache ad accesso frequente.
Abilitare l'inserimento in streaming nel cluster
Prima di poter usare l'inserimento in streaming, è necessario abilitare la funzionalità nel cluster e definire un criterio di inserimento di streaming. È possibile abilitare la funzionalità durante la creazione del cluster o aggiungerla a un cluster esistente.
Avviso
Esaminare le limitazioni prima di abilitare l'inserimento in streaming.
Abilitare l'inserimento in streaming durante la creazione di un nuovo cluster
È possibile abilitare l'inserimento in streaming durante la creazione di un nuovo cluster usando il portale di Azure o a livello di codice in C#.
Durante la creazione di un cluster usando la procedura descritta in Creare un cluster e un database di Azure Esplora dati, nella scheda Configurazioni selezionare Inserimento in>streaming attivato.
Per abilitare l'inserimento in streaming durante la creazione di un nuovo cluster di Azure Esplora dati, eseguire il codice seguente:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Abilitare l'inserimento in streaming in un cluster esistente
Se si dispone di un cluster esistente, è possibile abilitare l'inserimento in streaming usando il portale di Azure o a livello di codice in C#.
Nel portale di Azure passare a cluster di Esplora dati di Azure.
In Impostazioniselezionare Configurazioni.
Nel riquadro Configurazioni selezionare On per abilitare l'inserimento in streaming.
Seleziona Salva.
È possibile abilitare l'inserimento di streaming durante l'aggiornamento di un cluster di Azure Esplora dati esistente.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Creare una tabella di destinazione e definire i criteri
Creare una tabella per ricevere i dati di inserimento in streaming e definirne i criteri correlati usando il portale di Azure o a livello di codice in C#.
Copiare uno dei comandi seguenti nel riquadro Query e selezionare Esegui. In questo modo vengono definiti i criteri di inserimento in streaming nella tabella creata o nel database che contiene la tabella.
Suggerimento
Un criterio definito a livello di database si applica a tutte le tabelle esistenti e future presenti nel database. Quando si abilitano i criteri a livello di database, non è necessario abilitarlo per ogni tabella.
Per definire i criteri nella tabella creata, usare:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Disabilitare l'inserimento in streaming nel cluster
Avviso
La disabilitazione dell'inserimento in streaming può richiedere alcune ore.
Prima di disabilitare l'inserimento in streaming nel cluster di Azure Esplora dati, eliminare i criteri di inserimento di streaming da tutte le tabelle e i database pertinenti. La rimozione dei criteri di inserimento del flusso attiva la ridisistribuzione dei dati all'interno del cluster di Azure Esplora dati. I dati di inserimento in streaming sono spostati dalla risorsa di archiviazione iniziale a quella di archiviazione permanente nell'archivio colonne (extent o partizioni). Questo processo può richiedere da alcuni secondi a poche ore, a seconda della quantità di dati presenti nella risorsa di archiviazione iniziale.
Rimuovere i criteri di inserimento in streaming
È possibile eliminare i criteri di inserimento di streaming usando il portale di Azure o a livello di codice in C#.
Nella portale di Azure passare al cluster Esplora dati di Azure e selezionare Query.
Per eliminare i criteri di inserimento in streaming dalla tabella, copiare il comando seguente nel riquadro Query e selezionare Esegui.
.delete table TestTable policy streamingingestion
In Impostazioniselezionare Configurazioni.
Nel riquadro Configurazioni selezionare Off per disabilitare l'inserimento in streaming.
Seleziona Salva.
Per eliminare i criteri di inserimento di streaming dalla tabella, eseguire il codice seguente:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Per disabilitare l'inserimento in streaming nel cluster, eseguire il codice seguente:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Limiti
I mapping dei dati devono essere pre-creati per l'uso nell'inserimento in streaming. Le singole richieste di inserimento in streaming non supportano i mapping dei dati inline.
Non è possibile impostare i tag extent nei dati di inserimento in streaming.
Aggiorna criteri. I criteri di aggiornamento possono fare riferimento solo ai dati appena inseriti nella tabella di origine e non ad altri dati o tabelle nel database.
Quando un criterio di aggiornamento con un criterio transazionale ha esito negativo, i tentativi eseguiranno il fallback all'inserimento in batch.
Se l'inserimento in streaming è abilitato in un cluster usato come leader per i database follower, l'inserimento di streaming deve essere abilitato nei cluster seguenti per seguire i dati di inserimento in streaming. Lo stesso vale se i dati del cluster vengono condivisi tramite Condivisione dati.