A streambetöltés akkor hasznos az adatok betöltéséhez, ha alacsony késésre van szüksége a betöltés és a lekérdezés között. Fontolja meg a streambetöltés használatát a következő esetekben:
- Egy másodpercnél rövidebb késésre van szükség.
- Számos olyan tábla működési feldolgozásának optimalizálása érdekében, ahol az adatok adatfolyama az egyes táblákba viszonylag kicsi (másodpercenként néhány rekord), de a teljes adatbetöltési mennyiség magas (másodpercenként több ezer rekord).
Ha az adatok adatfolyama az egyes táblákba magas (óránként több mint 4 GB), fontolja meg várólistára helyezett betöltésihasználatát.
A különböző betöltési módszerekről további információt az adatbetöltés áttekintése című részben talál.
A korábbi SDK-verziókon alapuló kódmintákért lásd az archivált cikket.
Válassza ki a megfelelő streaming alapú betöltési típust
Két streamelési betöltési típus támogatott:
Betöltési típus |
Leírás |
Adatkapcsolat |
Az Event Hubs-, IoT Hub- és Event Grid-adatkapcsolatok használhatják a streameléses adatbevitelt, ha a fürtszintű engedélyezés megtörtént. A streamelés használatáról hozott döntés a céltáblán konfigurált streamelési szabályzatnak megfelelően történik. Az adatkapcsolatok kezeléséről további információt Event Hub, IoT Hub és Event Gridcímű témakörben talál. |
Egyéni adatbetöltés |
Az egyéni betöltéshez olyan alkalmazást kell írnia, amely az Azure Data Explorer ügyfélkódtárakegyikét használja. A cikkben található információk segítségével konfigurálhatja az egyéni adatbetöltést. A C?view=azure-data-explorer&preserve-view=true# streamelési mintaalkalmazás is hasznos lehet. |
Az alábbi táblázat segítségével kiválaszthatja a környezetének megfelelő betöltési típust:
Kritérium |
Adatkapcsolat |
Egyéni adatbetöltés |
Adatkésés a betöltési kezdeményezés és a lekérdezéshez rendelkezésre álló adatok között |
Hosszabb késleltetés |
Rövidebb késleltetés |
Fejlesztési többletterhelés |
Gyors és egyszerű beállítás, nincs fejlesztési többletterhelés |
Nagy fejlesztési költségek az adatok betöltését, a hibák kezelését és az adatkonzisztenciát biztosító alkalmazások létrehozásához |
Megjegyzés:
Kezelheti a folyamatot, hogy a alapján engedélyezze, és a szerint letiltsa a streamelő adatbetöltést a fürtön, akár az Azure Portál használatával, akár programozott módon C#-ban. Ha az egyéni alkalmazáshoz C#-ot használ, a programozott megközelítéssel kényelmesebbnek találhatja.
Előfeltételek
A streambetöltést befolyásoló fő közreműködők a következők:
-
virtuális gép és fürt mérete: A streamelési betöltési teljesítmény és a kapacitás nagyobb virtuális gép- és fürtméret esetén skálázható. Az egyidejű betöltési kérelmek száma magonként legfeljebb hat lehet. Például a 16 magos termékváltozatok esetében, mint amilyen a D14 és az L16, a maximálisan támogatott terhelés 96 egyidejű beolvasási kérelem. Két alapvető SKU, például a D11 esetében, a maximálisan támogatott terhelés 12 egyidejű adatigénylés kérés.
- **
Adatméret korlát: Az adatfolyam betöltési kérelem adatméret korlátja 4 MB. Ez magában foglalja a betöltési folyamat során létrehozott adatokat, amelyek a frissítési szabályzatokra vonatkoznak.
-
sémafrissítések: A sémafrissítések, például a táblák és a betöltési leképezések létrehozása és módosítása akár öt percet is igénybe vehet a streamelési betöltési szolgáltatás számára. További információért lásd: Adatfolyam-betöltés és sémamódosítások.
-
SSD-kapacitás: A streambetöltés engedélyezése fürtön akkor is, ha az adatokat nem streamelik, a fürtgépek helyi SSD-lemezének egy részét használja a streambetöltési adatok tárolására, és csökkenti a hot cache gyorsítótára számára elérhető tárhelyet.
Streaming engedélyezése a klaszteren
Mielőtt használhatná a streambetöltést, engedélyeznie kell a lehetőséget a fürtön, és meg kell határoznia egy streamelési betöltési szabályzatot. Engedélyezheti a funkciót, amikor a fürtöt létrehozza, vagy hozzáadja egy meglévő fürthöz.
Figyelmeztetés
A streambetöltés engedélyezése előtt tekintse át korlátozásait.
Új fürt létrehozásával történő streambetöltés engedélyezése
Engedélyezheti a streaming adatbetöltést új fürt létrehozásakor az Azure portálon vagy C# segítségével programozottan.
Ha engedélyezni szeretné a streambetöltést egy új Azure Data Explorer-fürt létrehozásakor, futtassa a következő kódot:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Folyamatos adatbetöltés engedélyezése meglévő fürtön
Ha már rendelkezik fürtel, engedélyezheti a streambetöltést az Azure Portalon vagy programozott módon a C#-ban.
Az Azure Portalon nyissa meg az Azure Data Explorer-fürtöt.
A Beállításokterületen válassza Konfigurációklehetőséget.
A Konfigurációk panelen válassza a Be lehetőséget a adatfolyam-bekérésengedélyezéséhez.
Válassza az Mentésgombot.
Egy meglévő Azure Data Explorer-fürt frissítése közben engedélyezheti a streambetöltést.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Céltábla létrehozása és a szabályzat definiálása
Hozzon létre egy táblát a streamelési adatok fogadásához és a kapcsolódó szabályzat meghatározásához az Azure Portalon vagy programozott módon a C#-ban.
Az Azure portálon navigáljon a fürtjéhez.
Válassza a lekérdezést.
Ha streambetöltéssel szeretné létrehozni az adatokat fogadó táblát, másolja a következő parancsot a Lekérdezés panelre, és válassza a Futtatáslehetőséget.
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
Másolja az alábbi parancsok egyikét a Lekérdezés panelre, és válassza a Futtatáslehetőséget. Ez határozza meg a streamelési adatbevitel politikáját a létrehozott táblához vagy a táblát tartalmazó adatbázishoz.
Jótanács
Az adatbázis szintjén definiált szabályzat az adatbázis összes meglévő és jövőbeli táblája esetében érvényes. Ha az adatbázis szintjén engedélyezi az irányelvet, nem kell táblánként engedélyeznie.
A létrehozott táblán lévő szabályzat meghatározásához használja a következőt:
.alter table TestTable policy streamingingestion enable
A létrehozott táblát tartalmazó adatbázis házirendjének meghatározásához használja a következőt:
.alter database StreamingTestDb policy streamingingestion enable
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Adatfolyami beillesztő alkalmazás létrehozása adatok fürtbe való beillesztéséhez
Hozza létre az alkalmazást az adatok fürtbe történő betöltésére az ön által preferált nyelven.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.gzip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import IngestionProperties, KustoStreamingIngestClient
clusterPath = "https://<clusterName>.<region>.kusto.windows.net"
appId = "<appId>"
appKey = "<appKey>"
appTenant = "<appTenant>"
dbName = "<dbName>"
tableName = "<tableName>"
csb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterPath,
appId,
appKey,
appTenant
)
client = KustoStreamingIngestClient(csb)
ingestionProperties = IngestionProperties(
database=dbName,
table=tableName,
data_format=DataFormat.CSV
)
# Ingest from file
# Automatically detects gz format
client.ingest_from_file("MyFile.gz", ingestion_properties=ingestionProperties)
// Load modules using ES6 import statements:
import { DataFormat, IngestionProperties, StreamingIngestClient } from "azure-kusto-ingest";
import { KustoConnectionStringBuilder } from "azure-kusto-data";
// For earlier version, load modules using require statements:
// const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
// const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
// const {DataFormat} = require("azure-kusto-ingest").IngestionPropertiesEnums;
// const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
const appId = "<appId>";
const appKey = "<appKey>";
const appTenant = "<appTenant>";
const dbName = "<dbName>";
const tableName = "<tableName>";
const mappingName = "<mappingName>"; // Required for JSON formatted files
const ingestionProperties = new IngestionProperties({
database: dbName, // Your database
table: tableName, // Your table
format: DataFormat.JSON,
ingestionMappingReference: mappingName
});
// Initialize client with engine endpoint
const client = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
clusterPath,
appId,
appKey,
appTenant
),
ingestionProperties
);
// Automatically detects gz format
await client.ingestFromFile("MyFile.gz", ingestionProperties);
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
A streamelés betöltésének letiltása a fürtön
Figyelmeztetés
A streambetöltés letiltása néhány órát is igénybe vehet.
Mielőtt letiltaná a streaming adatbetöltést az Azure Data Explorer-fürtön, távolítsa el a streaming adatbetöltési szabályzatot az összes releváns táblából és adatbázisból. Az Azure Data Explorer-fürtön belüli adatok átrendezését indítja el a streaming betöltési házirend eltávolítása. A streamelési betöltési folyamat során a kezdeti tárolóból az oszloptároló állandó tárolójába kerülnek az adatok (kiterjedések vagy szegmensek). Ez a folyamat a kezdeti tárolóban lévő adatok mennyiségétől függően néhány másodperctől néhány óráig is eltarthat.
Az adatfolyam-betöltési irányelv megszüntetése
A streamelési betöltési szabályzatot az Azure Portalon vagy programozott módon is elvetheti a C#-ban.
Az Azure portálon nyissa meg az Azure Data Explorer-fürtöt, és válassza a Lekérdezéslehetőséget.
A streamelési adatbetöltési házirend tábláról való eltávolításához másolja a következő parancsot a Lekérdezés panelre, majd válassza a Futtatáslehetőséget.
.delete table TestTable policy streamingingestion
A Beállításokterületen válassza Konfigurációklehetőséget.
A Konfigurációk panelen válassza a Kikapcsolás lehetőséget a bejövő adatfolyamletiltásához.
Válassza az Mentésgombot.
A streamelési betöltési szabályzat táblából való elvetéséhez futtassa a következő kódot:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Ha le szeretné tiltani a streambetöltést a fürtön, futtassa a következő kódot:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Korlátozások
-
előre létrehozott adatleképezéseket kell a streambetöltéshez használni. Az egyes streamelési kérelmek nem támogatják a beágyazott adatleképezéseket.
-
A kiterjedéscímkék nem állíthatók be a streambetöltési adatokon.
-
Házirend frissítése. A frissítési szabályzat csak a forrástábla újonnan betöltött adataira hivatkozhat, az adatbázisban lévő többi adatra vagy táblára nem.
- Amikor egy tranzakciós szabályzathoz tartozó frissítési szabályzat meghiúsul, az újrapróbálkozások kötegbetöltésre állnak vissza.
- Ha a streaming betöltés engedélyezve van egy olyan fürtön, amelyet vezetőként használnak a követő adatbázisok számára, akkor a streaming betöltést a következő fürtökön is engedélyezni kell, hogy képesek legyenek követni a streaming betöltési adatokat. Ugyanez vonatkozik arra is, hogy a klaszter adatai a adatmegosztáskeresztül legyenek megosztva.
Kapcsolódó tartalom