Přepněte služby pomocí rozevíracího seznamu Verze .
Přečtěte si další informace o navigaci.
Platí pro: ✅ Microsoft Fabric ✅ Azure Data Explorer
Kusto dokáže zpracovat hromadný příjem dat optimalizací a dávkováním přijatých dat prostřednictvím svého správce dávkování. Manažer dávkování agreguje přijatá data předtím, než dosáhnou cílové tabulky, což umožňuje efektivnější zpracování a zvýšený výkon. Dávkování se obvykle provádí ve skupinách po 1 GB nezpracovaných dat, po 1 000 jednotlivých souborech, nebo je ve výchozím nastavení časový limit 5 minut. Zásady dávkování je možné aktualizovat na úrovni databáze a tabulky, což obvykle snižuje dobu dávkování a snižuje latenci. Další informace o dávkování příjmu najdete v tématu Zásady dávkového příjmu a Změnit zásady dávkového příjmu na úrovni tabulky programově.
Poznámka
Dávkování také bere v úvahu různé faktory, jako je cílová databáze a tabulka, uživatel, který spouští příjem dat, a různé vlastnosti spojené s příjmem dat, jako jsou speciální značky.
V tomto článku se naučíte:
Důležité
Rozhraní API Ingestu má nyní dvě verze: V1 a V2. Rozhraní API V1 je původní rozhraní API, zatímco rozhraní API V2 je imaginovaná verze, která zjednodušuje příjem rozhraní API a současně nabízí další přizpůsobení.
Ingestování verze 2 je ve verzi Preview a je k dispozici v následujících jazycích: C#
Požadavky
Než začnete
Pomocí jedné z následujících metod vytvořte MyStormEvents tabulku a vzhledem k tomu, že se ingestuje jenom malé množství dat, nastavte časový limit zásad dávkování příjmu na 10 sekund:
- Vytvořte cílovou tabulku s názvem MyStormEvents ve vaší databázi spuštěním první aplikace v příkazů pro správu.
- Nastavte časový limit zásady dávkování ingesce na 10 sekund tak, že spustíte druhou aplikaci v rámci příkazů pro správu. Před spuštěním aplikace změňte hodnotu časového limitu na
00:00:10.
V prostředí dotazu vytvořte cílovou tabulku s názvem MyStormEvents v databázi spuštěním následujícího dotazu:
.create table MyStormEvents
(StartTime: datetime,
EndTime: datetime,
State: string,
DamageProperty: int,
DamageCrops: int,
Source: string,
StormSummary: dynamic)
Spuštěním následujícího dotazu nastavte časový limit zásad dávkování příjmu dat na 10 sekund:
.alter-merge table MyStormEvents policy ingestionbatching '{ "MaximumBatchingTimeSpan":"00:00:10" }'
Poznámka
Rozšíření nového nastavení zásad dávkování do správce dávek může trvat několik minut.
Stáhněte si ukázkový datový soubor stormevent.csv. Soubor obsahuje 1 000 záznamů bouřkových událostí.
Poznámka
Následující příklady předpokládají triviální shodu mezi sloupci přijatých dat a schématem cílové tabulky.
Pokud ingestované data triviálně neodpovídají schématu tabulky, musíte k zarovnání sloupců dat se schématem tabulky použít mapování příjmu dat.
Zařadíte soubor do fronty pro příjem dat a odešlete dotaz na výsledky.
Ve vámi preferovaném integrovaném vývojovém prostředí (IDE) nebo textovém editoru vytvořte projekt nebo soubor s názvem základní příjem dat pomocí konvence vhodné pro váš preferovaný jazyk. Umístěte stormevent.csv soubor do stejného umístění jako vaše aplikace.
Poznámka
V následujících příkladech používáte dva klienty, jeden na dotazování vašeho clusteru a druhý na vložení dat do vašeho clusteru. Pro jazyky, ve kterých ji klientská knihovna podporuje, sdílejí oba klienti stejný ověřovací program výzvy uživatele, což vede k zobrazení výzvy jednoho uživatele místo jednoho pro každého klienta.
Přidejte následující kód:
Vytvořte klientskou aplikaci, která se připojí ke clusteru, a vytiskne počet řádků v tabulce MyStormEvents. Tento počet použijete jako základní hodnotu pro porovnání s počtem řádků po každé metodě ingestu. Nahraďte zástupné symboly <your_cluster_uri> a <your_database> identifikátorem URI clusteru a názvem databáze.
using System.Data;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Azure.Identity;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
var tokenCredential = new InteractiveBrowserCredential();
var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
var database = "<your_database>";
var table = "MyStormEvents";
var query = table + " | count";
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
{
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
static void PrintResultsAsValueList(IDataReader response)
{
while (response.Read())
{
for (var i = 0; i < response.FieldCount; i++)
{
object val = response.GetValue(i);
string value = val.ToString() ?? "None";
Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withTokenCredential(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Poznámka
Používejte pro Node.js aplikace InteractiveBrowserCredentialNodeOptions místo InteractiveBrowserCredentialInBrowserOptions.
Poznámka
Sada Java SDK v současné době nepodporuje sdílení ověřovacího nástroje pro uživatelskou výzvu mezi klienty, což vede k zobrazení výzvy pro každého klienta zvlášť.
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
String clusterUri = "<your_cluster_uri>";
ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
String query = table + " | count";
KustoOperationResult results = kustoClient.execute(database, query);
KustoResultSetTable primaryResults = results.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(primaryResults);
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
while (results.next()) {
KustoResultColumn[] columns = results.getColumns();
for (int i = 0; i < columns.length; i++) {
System.out.println("\t" + columns[i].getColumnName() + " - " + (results.getObject(i) == null ? "None" : results.getString(i)));
}
}
}
}
Vytvořte objekt tvůrce připojovacích řetězců, který definuje identifikátor URI příjmu dat, pokud je to možné, pomocí sdílení stejných přihlašovacích údajů pro ověření jako identifikátor URI clusteru. Nahraďte zástupný symbol <your_ingestion_uri> adresou URI pro příjem dat.
using Kusto.Ingest; // Add this import
// No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
const ingestUri = "<your_ingestion_uri>";
const ingestKcsb = KustoConnectionStringBuilder.withTokenCredential(ingestUri, credentials);
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
String ingestUri = "<your_ingestion_uri>";
ConnectionStringBuilder ingestKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestUri);
using Kusto.Ingest.V2; // Add this import
// No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
Přidejte soubor stormevent.csv do dávkové fronty pro ingestování.
Použijete následující objekty a vlastnosti:
QueuedIngestClient a vytvořte klienta ingestování.
IngestionProperties a nastavte vlastnosti příjmu dat.
DataFormat a zadejte formát souboru jako CSV.
ignore_first_record chcete-li určit, zda je první řádek ve formátu CSV a podobných typů souborů ignorován, pomocí následující logiky:
-
True: První řádek se ignoruje. Tato možnost slouží k přetažení řádku záhlaví z tabulkových textových dat.
-
False: První řádek se ingestuje jako běžný řádek.
Poznámka
Příjem dat podporuje maximální velikost souboru 6 GB. Doporučujeme ingestovat soubory mezi 100 MB a 1 GB.
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
await ingestClient.IngestFromStorageAsync(filePath, ingestProps);
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Použijete následující objekty a vlastnosti:
-
QueuedIngestClientBuilder a vytvořte klienta ingestování.
-
IngestProperties je ve většině případů nepovinný, ale zde se používá k nastavení IgnoreFirstRecord.
-
DataFormat určit formát souboru jako DataSourceFormat.csv.
-
IgnoreFirstRecord chcete-li určit, zda je první řádek ve formátu CSV a podobných typů souborů ignorován, pomocí následující logiky:
-
True: První řádek se ignoruje. Tato možnost slouží k přetažení řádku záhlaví z tabulkových textových dat.
-
False: První řádek se ingestuje jako běžný řádek.
Poznámka
Příjem dat podporuje maximální velikost souboru 6 GB. Doporučujeme ingestovat soubory mezi 100 MB a 1 GB.
using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
var fileSource = new FileSource(filePath, DataSourceFormat.csv);
var props = new IngestProperties() { IgnoreFirstRecord = true };
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
await ingestClient.IngestAsync(fileSource, database, table, props);
Po ingestování souboru zadejte dotaz na počet řádků v tabulce a zobrazte poslední přijatý řádek.
Poznámka
Pokud chcete umožnit dokončení příjmu dat, počkejte 30 sekund před dotazováním tabulky. Počkejte 60 sekund v jazyce C#, abyste umožnili asynchronní přidání souboru do fronty příjmu dat.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
await Task.Delay(TimeSpan.FromSeconds(60));
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Celý kód by měl vypadat takto:
using System.Data;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadUserPromptAuthentication();
using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
var database = "<your database>";
var table = "MyStormEvents";
var query = table + " | count";
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
{
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
await ingestClient.IngestFromStorageAsync(filePath, ingestProps);
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
await Task.Delay(TimeSpan.FromSeconds(60));
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
}
static void PrintResultsAsValueList(IDataReader response)
{
while (response.Read())
{
for (var i = 0; i < response.FieldCount; i++)
{
object val = response.GetValue(i);
string value = val.ToString() ?? "None";
Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
}
}
}
}
import os
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
from azure.identity import InteractiveBrowserCredential
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withTokenCredential(clusterUri, credentials);
const ingestUri = "<your_ingestion_uri>";
const ingestKcsb = KustoConnectionStringBuilder.withTokenCredential(ingestUri, credentials);
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>";
const table = "MyStormEventsJS";
const filePath = path.join(__dirname, "stormevents.csv");
const query = table + ` | count`;
let response = await kustoClient.execute(database, query);
printResultsAsValueList(response);
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, queryCount);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
}
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
String clusterUri = "<your_cluster_uri>";
ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri);
String ingestUri = "<your_ingestion_uri>";
ConnectionStringBuilder ingestKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestUri);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
String query = table + " | count";
KustoOperationResult results = kustoClient.execute(database, query);
KustoResultSetTable primaryResults = results.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(primaryResults);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
while (results.next()) {
KustoResultColumn[] columns = results.getColumns();
for (int i = 0; i < columns.length; i++) {
System.out.println("\t" + columns[i].getColumnName() + " - " + (results.getObject(i) == null ? "None" : results.getString(i)));
}
}
}
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
var tokenCredential = new InteractiveBrowserCredential();
var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
var database = "<your_database>";
var table = "MyStormEvents";
var query = table + " | count";
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
{
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
var fileSource = new FileSource(filePath, DataSourceFormat.csv);
var props = new IngestProperties() { IgnoreFirstRecord = true };
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
await ingestClient.IngestAsync(fileSource, database, table, props);
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
await Task.Delay(TimeSpan.FromSeconds(60));
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
}
static void PrintResultsAsValueList(IDataReader response)
{
while (response.Read())
{
for (var i = 0; i < response.FieldCount; i++)
{
object val = response.GetValue(i);
string value = val.ToString() ?? "None";
Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
}
}
}
}
Spuštění aplikace
V příkazovém prostředí spusťte aplikaci pomocí následujícího příkazu:
# Change directory to the folder that contains the management commands project
dotnet run .
python basic_ingestion.py
V prostředí Node.js:
node basic-ingestion.js
V prostředí prohlížeče spusťte aplikaci pomocí příslušného příkazu. Například pro Vite-React:
npm run dev
mvn install exec:java -Dexec.mainClass="<groupId>.BatchIngestion"
Měl by se zobrazit výsledek podobný následujícímu:
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Fronta dat v paměti pro příjem dat a dotazování výsledků
Data z paměti můžete ingestovat vytvořením datového proudu obsahujícího data a následným řazením do fronty pro příjem dat.
Například můžete upravit aplikaci tak, že nahradíte importování z kódu souboru takto:
Přidejte balíček popisovače streamu do importů v horní části souboru.
Nejsou vyžadovány žádné další balíčky.
import io
from azure.kusto.ingest import StreamDescriptor
import { Readable } from "stream";
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
Přidejte do paměti řetězec s daty pro zpracování.
string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
string_stream = io.StringIO(single_line)
const singleLine = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"';
const stringStream = new Readable();
stringStream.push(singleLine);
stringStream.push(null);
String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
InputStream stream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(singleLine).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream);
Nastavte vlastnosti příjmu dat tak, aby neignorovaly první záznam, protože řetězec v paměti neobsahuje řádek záhlaví.
ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=False)
ingestProps.ignoreFirstRecord = false;
ingestProps.setIgnoreFirstRecord(false);
// Remove the IngestionProperties object `props`
Ingestování dat v paměti jejich přidáním do dávkové fronty Pokud je to možné, zadejte velikost nezpracovaných dat.
_= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line))
ingest_client.ingest_from_stream(stream_descriptor, ingest_props)
stringStream.size = singleLine.length;
await ingestClient.ingestFromStream(stringStream, ingestProps);
ingestClient.ingestFromStream(streamSourceInfo, ingestProps);
var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);
await ingestClient.IngestAsync(streamSource, database, table);
Přehled aktualizovaného kódu by měl vypadat takto:
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
...
_= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
...
}
static void PrintResultsAsValueList(IDataReader response)
{
...
}
}
import io
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, StreamDescriptor
from azure.identity import InteractiveBrowserCredential
def main():
...
single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
string_stream = io.StringIO(single_line)
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
...
print("\nIngesting data from memory:")
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=False)
stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line))
ingest_client.ingest_from_stream(stream_descriptor, ingest_props)
...
def print_result_as_value_list(response):
...
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
import { Readable } from "stream";
async function main() {
...
const singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
const stringStream = Readable.from(singleLine);
stringStream.push(singleLine);
stringStream.push(null);
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>"
const table = "MyStormEvents"
...
console.log("\nIngesting data from memory:");
stringStream.size = singleLine.length;
ingestProps.ignoreFirstRecord = false;
await ingestClient.ingestFromStream(stringStream, ingestProps);
...
}
function sleep(time) {
...
}
function printResultsAsValueList(response) {
...
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
...
String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
InputStream stream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(singleLine).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
...
System.out.println("\nIngesting data from memory:");
ingestProps.setIgnoreFirstRecord(false);
ingestClient.ingestFromStream(streamSourceInfo, ingestProps);
...
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
...
}
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
...
var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);
await ingestClient.IngestAsync(streamSource, database, table);
...
}
static void PrintResultsAsValueList(IDataReader response)
{
...
}
}
Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o jeden.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Vytvoření fronty objektu blob pro příjem dat a dotazování výsledků
Můžete ingestovat data z objektů blob služby Azure Storage, souborů Azure Data Lake a souborů Amazon S3.
Aplikaci můžete například upravit tak, že ingestujete z paměti kód následujícím kódem:
Začněte tím, že nahrajete soubor stormevent.csv do účtu úložiště a vygenerujete identifikátor URI s oprávněními ke čtení, například pomocí tokenu SAS pro objekty blob Azure.
Přidejte balíček popisovače objektů blob do importů v horní části souboru.
Nejsou vyžadovány žádné další balíčky.
from azure.kusto.ingest import BlobDescriptor
No additional packages are required.
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
Pomocí identifikátoru URI objektu blob vytvořte popisovač objektů blob, nastavte vlastnosti příjmu dat a pak ingestujte data z objektu blob. Nahraďte zástupný symbol <your_blob_uri> identifikátorem URI blobu.
string blobUri = "<your_blob_uri>";
ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
_= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
blob_uri = "<your_blob_uri>"
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
blob_descriptor = BlobDescriptor(blob_uri)
ingest_client.ingest_from_blob(blob_descriptor, ingest_props)
const blobUri = "<your_blob_uri>";
ingestProps.ignoreFirstRecord = true;
await ingestClient.ingestFromBlob(blobUri, ingestProps);
String blobUri = "<your_blob_uri>";
ingestProps.setIgnoreFirstRecord(true);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUri, 100);
ingestClient.ingestFromBlob(blobSourceInfo, ingestProps);
var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
await ingestClient.IngestAsync(blobSource, database, table);
Přehled aktualizovaného kódu by měl vypadat takto:
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
string blobUri = "<your_blob_uri>";
...
Console.WriteLine("\nIngesting data from memory:");
ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
await ingestClient.IngestFromStorageAsync(blobUri, ingestProps);
...
}
static void PrintResultsAsValueList(IDataReader response)
{
...
}
}
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, BlobDescriptor
from azure.identity import InteractiveBrowserCredential
def main():
...
blob_uri = "<your_blob_uri>"
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
...
print("\nIngesting data from a blob:")
blob_descriptor = BlobDescriptor(blob_uri)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_blob(blob_descriptor, ingest_props)
...
def print_result_as_value_list(response):
...
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
import { Readable } from "stream";
async function main() {
...
const blobUri = "<your_blob_uri>";
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>"
const table = "MyStormEvents"
...
console.log("\nIngesting data from a blob:");
ingestProps.ignoreFirstRecord = true;
await ingestClient.ingestFromBlob(blobUri, ingestProps);
...
}
function sleep(time) {
...
}
function printResultsAsValueList(response) {
...
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
...
String blobUri = "<your_blob_uri>";
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
...
System.out.println("\nIngesting data from a blob:");
ingestProps.setIgnoreFirstRecord(true);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUri, 100);
ingestClient.ingestFromBlob(blobSourceInfo, ingestProps);
...
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
...
}
}
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
string blobUri = "<your_blob_uri>";
...
Console.WriteLine("\nIngesting data from memory:");
var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
var props = new IngestProperties() { IgnoreFirstRecord = true };
await ingestClient.IngestAsync(blobSource, database, table, props);
...
}
static void PrintResultsAsValueList(IDataReader response)
{
...
}
}
Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o 1 000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Další krok