Konfigurera datainmatning för streaming i ditt Azure Data Explorer-kluster
Artikel
Strömningsinmatning är användbart för inläsning av data när du behöver korta svarstider mellan inmatning och fråga. Överväg att använda strömningsinmatning i följande scenarier:
Svarstid på mindre än en sekund krävs.
För att optimera den operativa bearbetningen av många tabeller där dataströmmen till varje tabell är relativt liten (några poster per sekund), men den totala datainmatningsvolymen är hög (tusentals poster per sekund).
Om dataströmmen till varje tabell är hög (över 4 GB per timme) bör du överväga att använda köad inmatning.
Kodexempel baserade på tidigare SDK-versioner finns i den arkiverade artikeln.
Välj lämplig typ av strömningsinmatning
Två typer av strömningsinmatning stöds:
Inmatningstyp
Description
Dataanslutning
Event Hubs-, IoT Hub- och Event Grid-dataanslutningar kan använda strömmande inmatning, förutsatt att den är aktiverad på klusternivå. Beslutet att använda strömningsinmatning görs enligt den princip för strömningsinmatning som konfigurerats i måltabellen. Information om hur du hanterar dataanslutningar finns i Event Hub, IoT Hub och Event Grid.
Använd följande tabell för att välja den inmatningstyp som är lämplig för din miljö:
Kriterium
Dataanslutning
Anpassad inmatning
Datafördröjning mellan inmatningsinitiering och tillgängliga data för fråga
Längre fördröjning
Kortare fördröjning
Omkostnader för utveckling
Snabb och enkel installation, inga utvecklingskostnader
Höga utvecklingskostnader för att skapa ett program som matar in data, hanterar fel och säkerställer datakonsekvens
Anteckning
Du kan hantera processen för att aktivera och inaktivera strömningsinmatning i klustret med hjälp av Azure Portal eller programmatiskt i C#. Om du använder C# för ditt anpassade program kan det vara enklare att använda den programmatiska metoden.
De viktigaste bidragsgivarna som kan påverka strömningsinmatningen är:
VM- och klusterstorlek: Prestanda för strömningsinmatning och kapacitetsskalor med ökade storlekar på virtuella datorer och kluster. Antalet samtidiga inmatningsbegäranden är begränsat till sex per kärna. För SKU:er med 16 kärnor, till exempel D14 och L16, är den maximala belastningen som stöds 96 samtidiga inmatningsbegäranden. För två kärn-SKU:er, till exempel D11, är den maximala belastningen som stöds 12 samtidiga inmatningsbegäranden.
Datastorleksgräns: Datastorleksgränsen för en begäran om strömningsinmatning är 4 MB. Detta inkluderar alla data som skapats för uppdateringsprinciper under inmatningen.
Schemauppdateringar: Schemauppdateringar, till exempel skapande och ändring av tabeller och inmatningsmappningar, kan ta upp till fem minuter för den strömmande inmatningstjänsten. Mer information finns i Strömmande inmatning och schemaändringar.
SSD-kapacitet: Om du aktiverar strömningsinmatning i ett kluster, även om data inte matas in via strömning, används en del av den lokala SSD-disken på klusterdatorerna för strömmande datainmatning och minskar lagringsutrymmet som är tillgängligt för frekvent cache.
Om du vill aktivera strömningsinmatning när du skapar ett nytt Azure Data Explorer-kluster kör du följande kod:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Aktivera strömningsinmatning i ett befintligt kluster
Om du har ett befintligt kluster kan du aktivera strömningsinmatning med hjälp av Azure Portal eller programmatiskt i C#.
I Azure Portal går du till ditt Azure Data Explorer-kluster.
I Inställningar väljer du Konfigurationer.
I fönstret Konfigurationer väljer du På för att aktivera strömningsinmatning.
Välj Spara.
Du kan aktivera strömningsinmatning när du uppdaterar ett befintligt Azure Data Explorer-kluster.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Skapa en måltabell och definiera principen
Skapa en tabell för att ta emot strömmande inmatningsdata och definiera dess relaterade princip med hjälp av Azure Portal eller programmatiskt i C#.
Kopiera något av följande kommandon till fönstret Fråga och välj Kör. Detta definierar principen för strömningsinmatning i den tabell som du skapade eller i databasen som innehåller tabellen.
Tips
En princip som definieras på databasnivå gäller för alla befintliga och framtida tabeller i databasen. När du aktiverar principen på databasnivå behöver du inte aktivera den per tabell.
Om du vill definiera principen i tabellen som du skapade använder du:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Inaktivera strömningsinmatning i klustret
Varning
Det kan ta några timmar att inaktivera strömningsinmatning.
Innan du inaktiverar strömningsinmatning i Azure Data Explorer-klustret släpper du principen för strömningsinmatning från alla relevanta tabeller och databaser. Borttagningen av den strömmande inmatningsprincipen utlöser omorganisering av data i ditt Azure Data Explorer-kluster. Strömmande inmatningsdata flyttas från den ursprungliga lagringen till permanent lagring i kolumnlagret (utrymmen eller shards). Den här processen kan ta mellan några sekunder och några timmar, beroende på mängden data i den ursprungliga lagringen.
Ta bort principen för strömningsinmatning
Du kan ta bort principen för strömningsinmatning med hjälp av Azure Portal eller programmatiskt i C#.
I Azure Portal går du till ditt Azure Data Explorer-kluster och väljer Fråga.
Om du vill ta bort den strömmande inmatningsprincipen från tabellen kopierar du följande kommando till frågefönstret och väljer Kör.
.delete table TestTable policy streamingingestion
I Inställningar väljer du Konfigurationer.
I fönstret Konfigurationer väljer du Av för att inaktivera strömningsinmatning.
Välj Spara.
Om du vill ta bort principen för strömningsinmatning från tabellen kör du följande kod:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Om du vill inaktivera strömningsinmatning i klustret kör du följande kod:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Begränsningar
Datamappningar måste skapas i förväg för användning vid strömningsinmatning. Enskilda begäranden om direktuppspelningsinmatning har inte plats för infogade datamappningar.
Det går inte att ange utrymmestaggar för strömmande inmatningsdata.
Uppdatera princip. Uppdateringsprincipen kan bara referera till nyligen inmatade data i källtabellen och inte till andra data eller tabeller i databasen.
Om strömningsinmatning är aktiverat på ett kluster som används som ledare för efterföljande databaser, måste strömmande inmatning aktiveras på följande kluster samt för att följa strömmande inmatningsdata. Samma sak gäller om klusterdata delas via Data Share.