Sdílet prostřednictvím


Vytvořte aplikaci pro získání dat pomocí frontovaného příjmu

Přepněte služby pomocí rozevíracího seznamu Verze . Přečtěte si další informace o navigaci.
Platí pro: ✅ Microsoft Fabric ✅ Azure Data Explorer

Kusto dokáže zpracovat hromadný příjem dat optimalizací a dávkováním přijatých dat prostřednictvím svého správce dávkování. Manažer dávkování agreguje přijatá data předtím, než dosáhnou cílové tabulky, což umožňuje efektivnější zpracování a zvýšený výkon. Dávkování se obvykle provádí ve skupinách po 1 GB nezpracovaných dat, po 1 000 jednotlivých souborech, nebo je ve výchozím nastavení časový limit 5 minut. Zásady dávkování je možné aktualizovat na úrovni databáze a tabulky, což obvykle snižuje dobu dávkování a snižuje latenci. Další informace o dávkování příjmu najdete v tématu Zásady dávkového příjmu a Změnit zásady dávkového příjmu na úrovni tabulky programově.

Poznámka

Dávkování také bere v úvahu různé faktory, jako je cílová databáze a tabulka, uživatel, který spouští příjem dat, a různé vlastnosti spojené s příjmem dat, jako jsou speciální značky.

V tomto článku se naučíte:

Důležité

Rozhraní API Ingestu má nyní dvě verze: V1 a V2. Rozhraní API V1 je původní rozhraní API, zatímco rozhraní API V2 je imaginovaná verze, která zjednodušuje příjem rozhraní API a současně nabízí další přizpůsobení.

Ingestování verze 2 je ve verzi Preview a je k dispozici v následujících jazycích: C#

Požadavky

Než začnete

  • Pomocí jedné z následujících metod vytvořte MyStormEvents tabulku a vzhledem k tomu, že se ingestuje jenom malé množství dat, nastavte časový limit zásad dávkování příjmu na 10 sekund:

    1. Vytvořte cílovou tabulku s názvem MyStormEvents ve vaší databázi spuštěním první aplikace v příkazů pro správu.
    2. Nastavte časový limit zásady dávkování ingesce na 10 sekund tak, že spustíte druhou aplikaci v rámci příkazů pro správu. Před spuštěním aplikace změňte hodnotu časového limitu na 00:00:10.
  • Stáhněte si ukázkový datový soubor stormevent.csv. Soubor obsahuje 1 000 záznamů bouřkových událostí.

    Poznámka

    Následující příklady předpokládají triviální shodu mezi sloupci přijatých dat a schématem cílové tabulky.

    Pokud ingestované data triviálně neodpovídají schématu tabulky, musíte k zarovnání sloupců dat se schématem tabulky použít mapování příjmu dat.

Zařadíte soubor do fronty pro příjem dat a odešlete dotaz na výsledky.

Ve vámi preferovaném integrovaném vývojovém prostředí (IDE) nebo textovém editoru vytvořte projekt nebo soubor s názvem základní příjem dat pomocí konvence vhodné pro váš preferovaný jazyk. Umístěte stormevent.csv soubor do stejného umístění jako vaše aplikace.

Poznámka

V následujících příkladech používáte dva klienty, jeden na dotazování vašeho clusteru a druhý na vložení dat do vašeho clusteru. Pro jazyky, ve kterých ji klientská knihovna podporuje, sdílejí oba klienti stejný ověřovací program výzvy uživatele, což vede k zobrazení výzvy jednoho uživatele místo jednoho pro každého klienta.

Přidejte následující kód:

  1. Vytvořte klientskou aplikaci, která se připojí ke clusteru, a vytiskne počet řádků v tabulce MyStormEvents. Tento počet použijete jako základní hodnotu pro porovnání s počtem řádků po každé metodě ingestu. Nahraďte zástupné symboly <your_cluster_uri> a <your_database> identifikátorem URI clusteru a názvem databáze.

    using System.Data;
    
    using Kusto.Data;
    using Kusto.Data.Common;
    using Kusto.Data.Net.Client;
    
    using Azure.Identity;
    
    namespace BatchIngest;
    
    class BatchIngest
    {
      static async Task Main()
      {
        var tokenCredential = new InteractiveBrowserCredential();
        var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
        var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
    
        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
    
        var database = "<your_database>";
        var table = "MyStormEvents";
    
        var query = table + " | count";
    
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }
      }
    
    static void PrintResultsAsValueList(IDataReader response)
    {
      while (response.Read())
      {
          for (var i = 0; i < response.FieldCount; i++)
          {
              object val = response.GetValue(i);
              string value = val.ToString() ?? "None";
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
          }
      }
      }
    }
    
  2. Vytvořte objekt tvůrce připojovacích řetězců, který definuje identifikátor URI příjmu dat, pokud je to možné, pomocí sdílení stejných přihlašovacích údajů pro ověření jako identifikátor URI clusteru. Nahraďte zástupný symbol <your_ingestion_uri> adresou URI pro příjem dat.

    using Kusto.Ingest; // Add this import
    
    // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
    
    using Kusto.Ingest.V2; // Add this import
    
    // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
    
  3. Přidejte soubor stormevent.csv do dávkové fronty pro ingestování.

    Použijete následující objekty a vlastnosti:

    • QueuedIngestClient a vytvořte klienta ingestování.

    • IngestionProperties a nastavte vlastnosti příjmu dat.

    • DataFormat a zadejte formát souboru jako CSV.

    • ignore_first_record chcete-li určit, zda je první řádek ve formátu CSV a podobných typů souborů ignorován, pomocí následující logiky:

      • True: První řádek se ignoruje. Tato možnost slouží k přetažení řádku záhlaví z tabulkových textových dat.
      • False: První řádek se ingestuje jako běžný řádek.

      Poznámka

      Příjem dat podporuje maximální velikost souboru 6 GB. Doporučujeme ingestovat soubory mezi 100 MB a 1 GB.

    using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);
    
    string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
    Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
    var ingestProps = new KustoIngestionProperties(database, table) {
      Format = DataSourceFormat.csv,
      AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
    };
    await ingestClient.IngestFromStorageAsync(filePath, ingestProps);
    

    Použijete následující objekty a vlastnosti:

    • QueuedIngestClientBuilder a vytvořte klienta ingestování.
    • IngestProperties je ve většině případů nepovinný, ale zde se používá k nastavení IgnoreFirstRecord.
    • DataFormat určit formát souboru jako DataSourceFormat.csv.
    • IgnoreFirstRecord chcete-li určit, zda je první řádek ve formátu CSV a podobných typů souborů ignorován, pomocí následující logiky:
      • True: První řádek se ignoruje. Tato možnost slouží k přetažení řádku záhlaví z tabulkových textových dat.
      • False: První řádek se ingestuje jako běžný řádek.

    Poznámka

    Příjem dat podporuje maximální velikost souboru 6 GB. Doporučujeme ingestovat soubory mezi 100 MB a 1 GB.

    using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
    
    string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
    var fileSource = new FileSource(filePath, DataSourceFormat.csv);
    var props = new IngestProperties() { IgnoreFirstRecord = true };
    
    Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
    
    await ingestClient.IngestAsync(fileSource, database, table, props);
    
  4. Po ingestování souboru zadejte dotaz na počet řádků v tabulce a zobrazte poslední přijatý řádek.

    Poznámka

    Pokud chcete umožnit dokončení příjmu dat, počkejte 30 sekund před dotazováním tabulky. Počkejte 60 sekund v jazyce C#, abyste umožnili asynchronní přidání souboru do fronty příjmu dat.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    await Task.Delay(TimeSpan.FromSeconds(60));
    
    using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

Celý kód by měl vypadat takto:

using System.Data;

using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
        var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadUserPromptAuthentication();

        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);

        var database = "<your database>";
        var table = "MyStormEvents";

        var query = table + " | count";

        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }

        using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);

        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
            Format = DataSourceFormat.csv,
            AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        await ingestClient.IngestFromStorageAsync(filePath, ingestProps);

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        await Task.Delay(TimeSpan.FromSeconds(60));

        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
            Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
            PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
            Console.WriteLine("\nLast ingested row:");
            PrintResultsAsValueList(response);
        }
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        while (response.Read())
        {
            for (var i = 0; i < response.FieldCount; i++)
            {
                object val = response.GetValue(i);
                string value = val.ToString() ?? "None";
                Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
            }
        }
    }
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;

namespace BatchIngest;

class BatchIngest
{
   static async Task Main()
   {
       var tokenCredential = new InteractiveBrowserCredential();
       var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.windows.net"
       var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);

       using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);

       var database = "<your_database>";
       var table = "MyStormEvents";

       var query = table + " | count";

       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
       {
           Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
           PrintResultsAsValueList(response);
       }

       using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();

       string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

       var fileSource = new FileSource(filePath, DataSourceFormat.csv);
       var props = new IngestProperties() { IgnoreFirstRecord = true };

       Console.WriteLine("\nIngesting data from file: \n\t " + filePath);

       await ingestClient.IngestAsync(fileSource, database, table, props);

       Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
       await Task.Delay(TimeSpan.FromSeconds(60));

       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
           Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
           PrintResultsAsValueList(response);
       }

       query = table + " | top 1 by ingestion_time()";
       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
           Console.WriteLine("\nLast ingested row:");
           PrintResultsAsValueList(response);
       }
   }

   static void PrintResultsAsValueList(IDataReader response)
   {
       while (response.Read())
       {
           for (var i = 0; i < response.FieldCount; i++)
           {
               object val = response.GetValue(i);
               string value = val.ToString() ?? "None";
               Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
           }
       }
   }
}

Spuštění aplikace

V příkazovém prostředí spusťte aplikaci pomocí následujícího příkazu:

# Change directory to the folder that contains the management commands project
dotnet run .

Měl by se zobrazit výsledek podobný následujícímu:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Fronta dat v paměti pro příjem dat a dotazování výsledků

Data z paměti můžete ingestovat vytvořením datového proudu obsahujícího data a následným řazením do fronty pro příjem dat.

Například můžete upravit aplikaci tak, že nahradíte importování z kódu souboru takto:

  1. Přidejte balíček popisovače streamu do importů v horní části souboru.

    Nejsou vyžadovány žádné další balíčky.

  2. Přidejte do paměti řetězec s daty pro zpracování.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Nastavte vlastnosti příjmu dat tak, aby neignorovaly první záznam, protože řetězec v paměti neobsahuje řádek záhlaví.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
    // Remove the IngestionProperties object `props`
    
  4. Ingestování dat v paměti jejich přidáním do dávkové fronty Pokud je to možné, zadejte velikost nezpracovaných dat.

  5. _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
    
        var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);
    
        await ingestClient.IngestAsync(streamSource, database, table);
    

Přehled aktualizovaného kódu by měl vypadat takto:

using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
        var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
        ...

        _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
        var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
        ...

        var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);

        await ingestClient.IngestAsync(streamSource, database, table);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}

Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o jeden.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Vytvoření fronty objektu blob pro příjem dat a dotazování výsledků

Můžete ingestovat data z objektů blob služby Azure Storage, souborů Azure Data Lake a souborů Amazon S3.

Aplikaci můžete například upravit tak, že ingestujete z paměti kód následujícím kódem:

  1. Začněte tím, že nahrajete soubor stormevent.csv do účtu úložiště a vygenerujete identifikátor URI s oprávněními ke čtení, například pomocí tokenu SAS pro objekty blob Azure.

  2. Přidejte balíček popisovače objektů blob do importů v horní části souboru.

    Nejsou vyžadovány žádné další balíčky.

  3. Pomocí identifikátoru URI objektu blob vytvořte popisovač objektů blob, nastavte vlastnosti příjmu dat a pak ingestujte data z objektu blob. Nahraďte zástupný symbol <your_blob_uri> identifikátorem URI blobu.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    
    var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
    
    await ingestClient.IngestAsync(blobSource, database, table);
    

Přehled aktualizovaného kódu by měl vypadat takto:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string blobUri = "<your_blob_uri>";
        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        await ingestClient.IngestFromStorageAsync(blobUri, ingestProps);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string blobUri = "<your_blob_uri>";
        ...

        Console.WriteLine("\nIngesting data from memory:");
        var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
        var props = new IngestProperties() { IgnoreFirstRecord = true };

        await ingestClient.IngestAsync(blobSource, database, table, props);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}

Při spuštění aplikace by se měl zobrazit výsledek podobný následujícímu. Všimněte si, že po příjmu dat se počet řádků v tabulce zvýšil o 1 000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Další krok