當您在擷取和查詢之間需要低延遲時,串流擷取對於載入數據很有用。 請考慮在下列案例中使用串流擷取:
- 需要少於一秒的延遲。
- 若要優化許多數據表的作業處理,其中每個數據表的數據串流相對較小(每秒記錄數筆),但整體數據擷取量很高(每秒數千筆記錄)。
如果每個數據表中的數據串流很高(每小時超過 4 GB),請考慮使用 佇列擷取。
若要深入瞭解不同的擷取方法,請參閱 數據擷取概觀。
如需以舊版 SDK 為基礎的程式代碼範例,請參閱 封存一文。
選擇適當的串流擷取類型
支援兩種串流擷取類型:
使用下表可協助您選擇適合您環境的擷取類型:
準則 |
資料連線 |
自定義擷取 |
擷取起始與可供查詢的數據之間的數據延遲 |
較長的延遲時間 |
較短的延遲 |
開發額外負荷 |
快速且容易設定,不需要開發額外負荷 |
建立應用程式擷取數據、處理錯誤並確保數據一致性的高開發額外負荷 |
注意
您可以使用 C# Azure 入口網站 或以程式設計方式,管理在叢集上啟用和停用串流擷取的程式。 如果您使用 C# 進行 自訂應用程式,您可能會發現使用程式設計方法更方便。
必要條件
可能會影響串流擷取的主要參與者包括:
-
VM 和叢集大小:隨著 VM 和叢集大小增加,串流擷取效能和容量調整。 並行擷取要求的數目限制為每個核心6個。 例如,對於16個核心SKU,例如 D14 和 L16,最大支援的負載是96個並行擷取要求。 對於兩個核心 SKU,例如 D11,最大支援的負載是 12 個並行擷取要求。
-
數據大小限制:串流擷取要求的數據大小限製為4 MB。 這包括在擷取期間針對更新原則建立的任何數據。
-
架構更新:串流擷取服務最多可能需要五分鐘的時間,例如建立和修改數據表和擷取對應等架構更新。 如需詳細資訊,請參閱 串流擷取和架構變更。
-
SSD 容量:在叢集上啟用串流擷取,即使數據未透過串流擷取,也會使用叢集機器的本機 SSD 磁碟部分來串流擷取數據,並減少可供經常性快取使用的記憶體。
在您的叢集上啟用串流擷取
您必須先在叢集上啟用功能,並定義 串流擷取原則,才能使用串流擷取。 您可以在建立叢集時啟用此功能,或將其新增至現有的叢集。
在建立新的叢集時啟用串流擷取
您可以使用 Azure 入口網站 或在 C# 中以程式設計方式建立新的叢集時,啟用串流擷取。
使用建立 Azure 數據總管叢集和資料庫中的步驟建立叢集時,請在 [組態] 索引標籤中,選取
若要在建立新的 Azure 數據總管叢集時啟用串流擷取,請執行下列程式代碼:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
在現有叢集上啟用串流擷取
如果您有現有的叢集,您可以使用 Azure 入口網站 或在 C# 中以程式設計方式啟用串流擷取。
在 Azure 入口網站 中,移至您的 Azure 數據總管叢集。
在 [設定] 中,選取 [組態]。
在 [ 組態 ] 窗格中,選取 [開啟 ] 以啟用 串流擷取。
選取儲存。
您可以在更新現有的 Azure 數據總管叢集時啟用串流擷取。
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
建立目標數據表並定義原則
建立數據表以接收串流擷取數據,並使用 C# Azure 入口網站 或以程式設計方式定義其相關原則。
在 Azure 入口網站中瀏覽至您的叢集。
選取查詢。
若要建立將透過串流擷取接收數據的數據表,請將下列命令 複製到 [查詢] 窗格中 ,然後選取 [ 執行]。
.create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
將下列其中一個命令複製到 [ 查詢] 窗格中 ,然後選取 [ 執行]。 這會定義 您所建立數據表或包含數據表的資料庫上的串流擷取 原則。
提示
在資料庫層級定義的原則會套用至資料庫中所有現有和未來的數據表。 當您在資料庫層級啟用原則時,不需要為每個數據表啟用它。
若要在您所建立的數據表上定義原則,請使用:
.alter table TestTable policy streamingingestion enable
若要在包含您所建立數據表的資料庫上定義原則,請使用:
.alter database StreamingTestDb policy streamingingestion enable
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
建立串流擷取應用程式以將數據內嵌至叢集
使用慣用的語言,建立應用程式以將數據內嵌至叢集。
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.gzip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
from azure.kusto.data import KustoConnectionStringBuilder, DataFormat
from azure.kusto.ingest import IngestionProperties, KustoStreamingIngestClient
clusterPath = "https://<clusterName>.<region>.kusto.windows.net"
appId = "<appId>"
appKey = "<appKey>"
appTenant = "<appTenant>"
dbName = "<dbName>"
tableName = "<tableName>"
csb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
clusterPath,
appId,
appKey,
appTenant
)
client = KustoStreamingIngestClient(csb)
ingestionProperties = IngestionProperties(
database=dbName,
table=tableName,
data_format=DataFormat.CSV
)
# Ingest from file
# Automatically detects gz format
client.ingest_from_file("MyFile.gz", ingestion_properties=ingestionProperties)
// Load modules using ES6 import statements:
import { DataFormat, IngestionProperties, StreamingIngestClient } from "azure-kusto-ingest";
import { KustoConnectionStringBuilder } from "azure-kusto-data";
// For earlier version, load modules using require statements:
// const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
// const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
// const {DataFormat} = require("azure-kusto-ingest").IngestionPropertiesEnums;
// const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
const appId = "<appId>";
const appKey = "<appKey>";
const appTenant = "<appTenant>";
const dbName = "<dbName>";
const tableName = "<tableName>";
const mappingName = "<mappingName>"; // Required for JSON formatted files
const ingestionProperties = new IngestionProperties({
database: dbName, // Your database
table: tableName, // Your table
format: DataFormat.JSON,
ingestionMappingReference: mappingName
});
// Initialize client with engine endpoint
const client = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(
clusterPath,
appId,
appKey,
appTenant
),
ingestionProperties
);
// Automatically detects gz format
await client.ingestFromFile("MyFile.gz", ingestionProperties);
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
停用叢集上的串流擷取
在 Azure 數據總管叢集上停用串流擷取之前,請先從所有相關數據表和資料庫卸 載串流擷取原則 。 拿掉串流擷取原則會觸發 Azure 數據總管叢集內的數據重新排列。 串流擷取數據會從初始記憶體移至數據行存放區中的永久記憶體(範圍或分區)。 此程式可能需要幾秒鐘到幾個小時的時間,視初始記憶體中的數據量而定。
卸除串流擷取原則
您可以使用 Azure 入口網站 或以程序設計方式在 C# 中卸載串流擷取原則。
在 Azure 入口網站 中,移至您的 Azure 數據總管叢集,然後選取 [查詢]。
若要從數據表卸除串流擷取原則,請將下列命令 複製到 [查詢] 窗格中 ,然後選取 [ 執行]。
.delete table TestTable policy streamingingestion
在 [設定] 中,選取 [組態]。
在 [ 組態 ] 窗格中,選取 [ 關閉 ] 以停用 串流擷取。
選取儲存。
若要從數據表卸除串流擷取原則,請執行下列程序代碼:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
若要停用叢集上的串流擷取,請執行下列程序代碼:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
限制
相關內容