Aplica-se a: ✅Microsoft Fabric✅Azure Data Explorer
Kusto é capaz de lidar com a ingestão de dados em massa, otimizando e agrupando os dados ingeridos por meio de seu gerenciador de lotes. O gestor de lotes agrega os dados ingeridos antes de atingirem a tabela de destino, permitindo um processamento mais eficiente e um melhor desempenho. O processamento em lote geralmente é feito em volumes de 1 GB de dados brutos, mil ficheiros individuais ou, por padrão, um tempo de espera de 5 minutos. As políticas de processamento em lote podem ser atualizadas nos níveis de banco de dados e tabela, geralmente para diminuir o tempo de processamento em lote e reduzir a latência. Para obter mais informações sobre o processamento em lote de ingestão, consulte política IngestionBatching e Alterar a política de lote de ingestão de nível de tabela programaticamente.
Observação
O processamento em lote também leva em conta vários fatores, como o banco de dados e a tabela de destino, o usuário que executa a ingestão e várias propriedades associadas à ingestão, como tags especiais.
Neste artigo, você aprenderá a:
Pré-requisitos
Antes de começar
Use um dos seguintes métodos para criar a tabela MyStormEvents e, como apenas uma pequena quantidade de dados está sendo ingerida, defina o tempo limite da política de lote de ingestão para 10 segundos:
- Crie uma tabela de destino chamada MyStormEvents no seu banco de dados executando a primeira aplicação em comandos de gerenciamento.
- Defina o tempo limite da política de lote de ingestão para 10 segundos, executando a segunda aplicação nos comandos de gestão . Antes de executar o aplicativo, altere o valor de tempo limite para
00:00:10
.
Em seu ambiente de consulta, crie uma tabela de destino chamada MyStormEvents em seu banco de dados executando a seguinte consulta:
.create table MyStormEvents
(StartTime: datetime,
EndTime: datetime,
State: string,
DamageProperty: int,
DamageCrops: int,
Source: string,
StormSummary: dynamic)
Defina o tempo limite da política de lote de ingestão para 10 segundos executando a seguinte consulta:
.alter-merge table MyStormEvents policy ingestionbatching '{ "MaximumBatchingTimeSpan":"00:00:10" }'
Observação
Pode levar alguns minutos para que as novas configurações de política de lote se propaguem para o gerenciador de lotes.
Baixe o arquivo de dados de exemplo stormevent.csv. O arquivo contém 1.000 registros de eventos de tempestade.
Observação
Os exemplos a seguir assumem uma correspondência trivial entre as colunas dos dados ingeridos e o esquema da tabela de destino.
Se os dados ingeridos não corresponderem trivialmente ao esquema da tabela, você deverá usar um mapeamento de ingestão para alinhar as colunas dos dados com o esquema da tabela.
Enfileirar um ficheiro para ingestão e consultar os resultados
Em seu IDE ou editor de texto preferido, crie um projeto ou arquivo chamado ingestão básica usando a convenção apropriada para seu idioma preferido. Coloque o arquivo stormevent.csv no mesmo local do seu aplicativo.
Observação
Nos exemplos a seguir, você usa dois clientes, um para consultar seu cluster e outro para ingerir dados em seu cluster. Para idiomas em que a biblioteca do cliente oferece suporte a ele, ambos os clientes compartilham o mesmo autenticador de prompt do usuário, resultando em um único prompt de usuário em vez de um para cada cliente.
Adicione o seguinte código:
Crie uma aplicação cliente que se ligue ao seu cluster e imprima o número de linhas na tabela MyStormEvents. Você usará essa contagem como uma linha de base para comparação com o número de linhas após cada método de ingestão. Substitua os marcadores de posição <your_cluster_uri>
e <your_database>
pelo URI do cluster e pelo nome do banco de dados, respectivamente.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Observação
Para aplicativos Node.js, use InteractiveBrowserCredentialNodeOptions
em vez de InteractiveBrowserCredentialInBrowserOptions
.
Observação
Atualmente, o Java SDK não suporta ambos os clientes que compartilham o mesmo autenticador de prompt de usuário, resultando em um prompt de usuário para cada cliente.
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
String clusterUri = "<your_cluster_uri>";
ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
String query = table + " | count";
KustoOperationResult results = kustoClient.execute(database, query);
KustoResultSetTable primaryResults = results.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(primaryResults);
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
while (results.next()) {
KustoResultColumn[] columns = results.getColumns();
for (int i = 0; i < columns.length; i++) {
System.out.println("\t" + columns[i].getColumnName() + " - " + (results.getObject(i) == null ? "None" : results.getString(i)));
}
}
}
}
Crie um objeto construtor de cadeia de conexão que defina o URI de ingestão de dados, sempre que possível, usando o compartilhamento das mesmas credenciais de autenticação que o URI do cluster. Substitua o espaço reservado <your_ingestion_uri>
pelo URI de ingestão de dados.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
const ingestUri = "<your_ingestion_uri>";
const ingestKcsb = KustoConnectionStringBuilder.withTokenCredential(ingestUri, credentials);
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
String ingestUri = "<your_ingestion_uri>";
ConnectionStringBuilder ingestKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestUri);
Ingerir o arquivo stormevent.csv adicionando-o à fila de lotes. Você usa os seguintes objetos e propriedades:
-
QueuedIngestClient para criar o cliente de ingestão.
-
IngestionProperties para definir as propriedades de ingestão.
-
DataFormat especificar o formato de arquivo como CSV.
-
ignore_first_record para especificar se a primeira linha de um arquivo CSV e de tipos de ficheiros semelhantes é ignorada, usando a seguinte lógica:
-
True: A primeira linha é ignorada. Use esta opção para remover a linha de cabeçalho dos dados tabulares textuais.
-
Falso: A primeira linha é ingerida como uma linha regular.
Observação
A ingestão suporta um tamanho máximo de ficheiro de 6 GB. A recomendação é ingerir arquivos entre 100 MB e 1 GB.
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Consulte o número de linhas na tabela após a ingestão do arquivo e mostre a última linha ingerida.
Observação
Para dar tempo para que a ingestão seja concluída, aguarde 30 segundos antes de consultar a tabela. Para C#, aguarde 60 segundos para dar tempo de adicionar o arquivo à fila de ingestão de forma assíncrona.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
O código completo deve ter esta aparência:
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null))
{
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
if (response.GetDataTypeName(i) == "Int64")
Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
else
Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
}
}
}
}
}
import os
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
from azure.identity import InteractiveBrowserCredential
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withTokenCredential(clusterUri, credentials);
const ingestUri = "<your_ingestion_uri>";
const ingestKcsb = KustoConnectionStringBuilder.withTokenCredential(ingestUri, credentials);
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>";
const table = "MyStormEventsJS";
const filePath = path.join(__dirname, "stormevents.csv");
const query = table + ` | count`;
let response = await kustoClient.execute(database, query);
printResultsAsValueList(response);
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, queryCount);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
}
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
String clusterUri = "<your_cluster_uri>";
ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri);
String ingestUri = "<your_ingestion_uri>";
ConnectionStringBuilder ingestKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestUri);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
String query = table + " | count";
KustoOperationResult results = kustoClient.execute(database, query);
KustoResultSetTable primaryResults = results.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultsAsValueList(primaryResults);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
while (results.next()) {
KustoResultColumn[] columns = results.getColumns();
for (int i = 0; i < columns.length; i++) {
System.out.println("\t" + columns[i].getColumnName() + " - " + (results.getObject(i) == null ? "None" : results.getString(i)));
}
}
}
}
Executar seu aplicativo
Em um shell de comando, use o seguinte comando para executar seu aplicativo:
# Change directory to the folder that contains the management commands project
dotnet run .
python basic_ingestion.py
Num ambiente Node.js:
node basic-ingestion.js
Em um ambiente de navegador, use o comando apropriado para executar seu aplicativo. Por exemplo, para Vite-React:
npm run dev
Observação
Em um ambiente de navegador, abra o console de ferramentas de desenvolvedor para ver a saída.
mvn install exec:java -Dexec.mainClass="<groupId>.BatchIngestion"
Deverá ver um resultado semelhante ao seguinte:
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Enfileirar dados na memória para ingestão e consultar os resultados
Você pode ingerir dados da memória criando um fluxo contendo os dados e, em seguida, enfileirando-os para ingestão.
Por exemplo, você pode modificar o aplicativo substituindo a ingestão de do código de de arquivo, da seguinte maneira:
Adicione o pacote do descritor de fluxo às importações na parte superior do arquivo.
Não são necessários pacotes adicionais.
import io
from azure.kusto.ingest import StreamDescriptor
import { Readable } from "stream";
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
Adicione uma cadeia de caracteres na memória com os dados a serem ingeridos.
string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
string_stream = io.StringIO(single_line)
const singleLine = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"';
const stringStream = new Readable();
stringStream.push(singleLine);
stringStream.push(null);
String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
InputStream stream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(singleLine).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream);
Defina as propriedades de ingestão para não ignorar o primeiro registro, pois a cadeia de caracteres na memória não tem uma linha de cabeçalho.
ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=False)
ingestProps.ignoreFirstRecord = false;
ingestProps.setIgnoreFirstRecord(false);
Ingerir os dados na memória adicionando-os à fila de lotes. Sempre que possível, forneça o tamanho dos dados brutos.
_= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line))
ingest_client.ingest_from_stream(stream_descriptor, ingest_props)
stringStream.size = singleLine.length;
await ingestClient.ingestFromStream(stringStream, ingestProps);
ingestClient.ingestFromStream(streamSourceInfo, ingestProps);
Um esboço do código atualizado deve ter esta aparência:
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
...
string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
...
Console.WriteLine("\nIngesting data from memory:");
ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
_= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
...
}
}
static void PrintResultsAsValueList(IDataReader response) {
...
}
}
}
import io
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, StreamDescriptor
from azure.identity import InteractiveBrowserCredential
def main():
...
single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"'
string_stream = io.StringIO(single_line)
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
...
print("\nIngesting data from memory:")
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=False)
stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line))
ingest_client.ingest_from_stream(stream_descriptor, ingest_props)
...
def print_result_as_value_list(response):
...
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
import { Readable } from "stream";
async function main() {
...
const singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
const stringStream = Readable.from(singleLine);
stringStream.push(singleLine);
stringStream.push(null);
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>"
const table = "MyStormEvents"
...
console.log("\nIngesting data from memory:");
stringStream.size = singleLine.length;
ingestProps.ignoreFirstRecord = false;
await ingestClient.ingestFromStream(stringStream, ingestProps);
...
}
function sleep(time) {
...
}
function printResultsAsValueList(response) {
...
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
...
String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
InputStream stream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(singleLine).array());
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream);
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
...
System.out.println("\nIngesting data from memory:");
ingestProps.setIgnoreFirstRecord(false);
ingestClient.ingestFromStream(streamSourceInfo, ingestProps);
...
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
...
}
}
Ao executar o aplicativo, você verá um resultado semelhante ao seguinte. Observe que, após a ingestão, o número de linhas na tabela aumentou em um.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Colocar um blob em fila para processamento e consultar os resultados
Você pode ingerir dados de blobs de Armazenamento do Azure, arquivos do Azure Data Lake e arquivos do Amazon S3.
Por exemplo, poderá modificar a aplicação substituindo o componente de ingestão do código de memória pelo seguinte:
Comece carregando o arquivo stormevent.csv em sua conta de armazenamento e gere um URI com permissões de leitura, por exemplo, usando um token SAS para blobs do Azure.
Adicione o pacote descritor de blob às importações na parte superior do arquivo.
Não são necessários pacotes adicionais.
from azure.kusto.ingest import BlobDescriptor
No additional packages are required.
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
Crie um descritor de blob usando o URI de blob, defina as propriedades de ingestão e, em seguida, ingira dados do blob. Substitua o marcador <your_blob_uri>
pelo URI do blob.
string blobUri = "<your_blob_uri>";
ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
_= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
blob_uri = "<your_blob_uri>"
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
blob_descriptor = BlobDescriptor(blob_uri)
ingest_client.ingest_from_blob(blob_descriptor, ingest_props)
const blobUri = "<your_blob_uri>";
ingestProps.ignoreFirstRecord = true;
await ingestClient.ingestFromBlob(blobUri, ingestProps);
String blobUri = "<your_blob_uri>";
ingestProps.setIgnoreFirstRecord(true);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUri, 100);
ingestClient.ingestFromBlob(blobSourceInfo, ingestProps);
Um esboço do código atualizado deve ter esta aparência:
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
...
string blobUri = "<your_blob_uri>";
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
...
Console.WriteLine("\nIngesting data from memory:");
ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
_=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
...
}
}
static void PrintResultsAsValueList(IDataReader response) {
...
}
}
}
import time
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, BlobDescriptor
from azure.identity import InteractiveBrowserCredential
def main():
...
blob_uri = "<your_blob_uri>"
with KustoClient(cluster_kcsb) as kusto_client:
with QueuedIngestClient(ingest_kcsb) as ingest_client:
database = "<your_database>"
table = "MyStormEvents"
...
print("\nIngesting data from a blob:")
blob_descriptor = BlobDescriptor(blob_uri)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_blob(blob_descriptor, ingest_props)
...
def print_result_as_value_list(response):
...
if __name__ == "__main__":
main()
import path from 'path';
import { Client, KustoConnectionStringBuilder } from "azure-kusto-data";
import { IngestClient, IngestionProperties, DataFormat } from "azure-kusto-ingest";
import { InteractiveBrowserCredential } from "@azure/identity";
import { Readable } from "stream";
async function main() {
...
const blobUri = "<your_blob_uri>";
const kustoClient = new Client(clusterKcsb);
const ingestClient = new IngestClient(ingestKcsb);
const database = "<your_database>"
const table = "MyStormEvents"
...
console.log("\nIngesting data from a blob:");
ingestProps.ignoreFirstRecord = true;
await ingestClient.ingestFromBlob(blobUri, ingestProps);
...
}
function sleep(time) {
...
}
function printResultsAsValueList(response) {
...
}
main();
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.KustoResultColumn;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat;
import com.microsoft.azure.kusto.ingest.QueuedIngestClient;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
public class BatchIngestion {
public static void main(String[] args) throws Exception {
...
String blobUri = "<your_blob_uri>";
try (Client kustoClient = ClientFactory.createClient(clusterKcsb);
QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
String database = "<your_database>";
String table = "MyStormEvents";
...
System.out.println("\nIngesting data from a blob:");
ingestProps.setIgnoreFirstRecord(true);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobUri, 100);
ingestClient.ingestFromBlob(blobSourceInfo, ingestProps);
...
}
}
public static void printResultsAsValueList(KustoResultSetTable results) {
...
}
}
Ao executar o aplicativo, você verá um resultado semelhante ao seguinte. Observe que, após a ingestão, o número de linhas na tabela aumentou em 1.000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Próximo passo