Streamelési adatbetöltés konfigurálása az Azure Data Explorer-fürtön
Cikk
A streamelési betöltés akkor hasznos az adatok betöltéséhez, ha kis késésre van szüksége a betöltés és a lekérdezés között. Fontolja meg a streambetöltés használatát a következő esetekben:
Egy másodpercnél rövidebb késésre van szükség.
Számos olyan tábla működési feldolgozásának optimalizálása érdekében, ahol az adatok adatfolyama az egyes táblákba viszonylag kicsi (másodpercenként néhány rekord), de a teljes adatbetöltési mennyiség magas (másodpercenként több ezer rekord).
Válassza ki a megfelelő streamelési betöltési típust
Két streamelési betöltési típus támogatott:
Betöltési típus
Leírás
Adatkapcsolat
Az Event Hub, a IoT Hub és az Event Grid-adatkapcsolatok streambetöltést használhatnak, feltéve, hogy engedélyezve van a fürt szintjén. A streamelési betöltés használatára vonatkozó döntés a céltáblán konfigurált streamelési betöltési szabályzatnak megfelelően történik. Az adatkapcsolatok kezelésével kapcsolatos információkért lásd: Event Hub, IoT Hub és Event Grid.
Egyéni betöltés
Az egyéni betöltéshez olyan alkalmazást kell írnia, amely az Azure Data Explorer ügyfélkódtárak egyikét használja. Az ebben a témakörben található információk alapján konfigurálhatja az egyéni betöltést. A C# streamelési mintaalkalmazás is hasznos lehet.
Az alábbi táblázat segítségével kiválaszthatja a környezetének megfelelő betöltési típust:
Kritérium
Adatkapcsolat
Egyéni betöltés
Adatbetöltés kezdeményezése és a lekérdezéshez rendelkezésre álló adatok közötti adatkésés
Hosszabb késleltetés
Rövidebb késleltetés
Fejlesztési többletterhelés
Gyors és egyszerű beállítás, nincs fejlesztési többletterhelés
Nagy fejlesztési többletterhelés egy olyan alkalmazás létrehozásához, amely betölti az adatokat, kezeli a hibákat, és biztosítja az adatkonzisztenciát
Megjegyzés
A streambetöltés engedélyezésére és letiltására szolgáló folyamatot a Azure Portal vagy programozott módon a C#-ban kezelheti. Ha az egyéni alkalmazáshoz C#-ot használ, a programozott megközelítéssel kényelmesebbnek találhatja.
Teljesítménnyel és működésével kapcsolatos szempontok
A streambetöltést befolyásoló fő közreműködők a következők:
Virtuális gép és fürt mérete: A streamelési betöltési teljesítmény és a kapacitás skálázása nagyobb virtuális gép és fürtméret mellett. Az egyidejű betöltési kérelmek száma magonként legfeljebb hat lehet. Például a 16 magos termékváltozatok(például D14 és L16) esetében a maximálisan támogatott terhelés 96 egyidejű betöltési kérelem. Két alapvető termékváltozat, például a D11 esetében a maximálisan támogatott terhelés 12 egyidejű betöltési kérelem.
Adatméretkorlát: A streamelési betöltési kérelmek adatméretkorlátja 4 MB. Ez magában foglalja a frissítési szabályzatokhoz a betöltés során létrehozott összes adatot.
Sémafrissítések: A sémafrissítések, például a táblák és a betöltési leképezések létrehozása és módosítása akár öt percet is igénybe vehet a streamelési szolgáltatás számára. További információ: Streambetöltés és sémamódosítások.
SSD-kapacitás: A streamelési betöltés engedélyezése a fürtön még akkor is, ha az adatokat nem streameli, a fürtgépek helyi SSD-lemezének egy részét használja a betöltési adatok streamelésére, és csökkenti a gyorsgyorsítótár számára rendelkezésre álló tárhelyet.
A Azure Portal lépjen az Azure Data Explorer-fürtre.
A Beállítások területen válassza a Konfigurációk lehetőséget.
A Konfigurációk panelen válassza a Be lehetőséget a Streambetöltés engedélyezéséhez.
Kattintson a Mentés gombra.
Új Azure-Data Explorer-fürt létrehozásakor engedélyezheti a streambetöltést.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Céltábla létrehozása és a szabályzat definiálása
Hozzon létre egy táblát a streamelési betöltési adatok fogadásához, és definiálja a kapcsolódó szabályzatot a Azure Portal vagy programozott módon a C#-ban.
Ha létre szeretné hozni azt a táblát, amely streambetöltéssel fogadja az adatokat, másolja a következő parancsot a Lekérdezés panelre , és válassza a Futtatás lehetőséget.
Az adatbázis szintjén definiált szabályzat az adatbázis összes meglévő és jövőbeli táblájára érvényes. Ha az adatbázis szintjén engedélyezi a szabályzatot, táblánként nem kell engedélyeznie.
A létrehozott táblán lévő szabályzat meghatározásához használja a következőt:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Streamelési alkalmazás létrehozása adatok fürtbe való betöltéséhez
Hozza létre az alkalmazást az adatok fürtbe való betöltéséhez az előnyben részesített nyelv használatával.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Streambetöltés letiltása a fürtön
Figyelmeztetés
A streambetöltés letiltása eltarthat néhány óráig.
Mielőtt letiltja a streambetöltést az Azure Data Explorer-fürtön, elvetheti a streamelési betöltési szabályzatot az összes releváns táblából és adatbázisból. A streamelési betöltési szabályzat eltávolítása adatátrendezést vált ki az Azure Data Explorer-fürtön belül. A streamelési betöltési adatok átkerülnek a kezdeti tárolóból az oszloptároló állandó tárolójába (kiterjedések vagy szegmensek). Ez a folyamat a kezdeti tárolóban lévő adatok mennyiségétől függően néhány másodperctől néhány óráig is eltarthat.
A streamelési betöltési szabályzat elvetése
A streamelési betöltési szabályzatot elvetheti a Azure Portal vagy programozott módon C# nyelven.
A Azure Portal lépjen az Azure Data Explorer-fürtre, és válassza a Lekérdezés lehetőséget.
Ha el szeretné dobni a streamelési betöltési szabályzatot a táblából, másolja a következő parancsot a Lekérdezés panelre , és válassza a Futtatás lehetőséget.
.delete table TestTable policy streamingingestion
A Beállítások területen válassza a Konfigurációk lehetőséget.
A Konfigurációk panelen válassza a Ki lehetőséget a streambetöltés letiltásához.
Kattintson a Mentés gombra.
A streamelési betöltési szabályzat táblából való elvetéséhez futtassa a következő kódot:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Ha le szeretné tiltani a streambetöltést a fürtön, futtassa a következő kódot:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
A bővítménycímkék nem állíthatók be a streamelési betöltési adatokon.
Szabályzat frissítése. A frissítési szabályzat csak a forrástáblában újonnan betöltött adatokra hivatkozhat, az adatbázisban lévő többi adatra vagy táblára nem.
Ha a streambetöltés engedélyezve van egy olyan fürtön, amely vezető szerepet tölt be a követő adatbázisokban, a streambetöltést a következő fürtökön is engedélyezni kell a streamelési betöltési adatok követéséhez. Ugyanez vonatkozik arra is, hogy a fürt adatai Data Share keresztül legyenek-e megosztva.