Event Hubs, IoT Hub 및 Event Grid 데이터 연결은 클러스터 수준에서 사용하도록 설정된 경우 스트리밍 수집을 사용할 수 있습니다. 스트리밍 수집을 사용하는 결정은 대상 테이블에 구성된 스트리밍 수집 정책에 따라 수행됩니다. 데이터 연결 관리에 대한 자세한 내용은 Event Hub, IoT Hub 및 Event Grid를 참조하세요.
사용자 지정 수집
사용자 지정 수집을 사용하려면 Azure Data Explorer 클라이언트 라이브러리 중 하나를 사용하는 애플리케이션을 작성해야 합니다. 이 항목의 정보를 사용하여 사용자 지정 수집을 구성합니다. C# 스트리밍 수집 샘플 애플리케이션이 유용할 수도 있습니다.
다음 표를 사용하여 환경에 적합한 수집 유형을 선택합니다.
조건
데이터 연결
사용자 지정 수집
수집 시작과 쿼리에 사용할 수 있는 데이터 사이의 데이터 지연
더 긴 지연
더 짧은 지연
개발 오버헤드
빠르고 쉬운 설정, 개발 오버헤드 없음
데이터를 수집하고, 오류를 처리하고, 데이터 일관성을 보장하는 애플리케이션을 만들기 위한 개발 오버헤드 높음
참고
Azure Portal을 사용하거나 C#에서 프로그래밍 방식으로 클러스터에서 스트리밍 수집을 사용하거나사용하지 않도록 설정하는 프로세스를 관리할 수 있습니다. C#을 사용자 지정 애플리케이션에 사용하는 경우 프로그래밍 방식을 사용하는 것이 더 편리할 수 있습니다.
VM 및 클러스터 크기: 스트리밍 수집 성능 및 용량은 VM 및 클러스터 크기가 증가함에 따라 크기 조정됩니다. 동시 수집 요청 수는 코어당 6개로 제한됩니다. 예를 들어, D14, L16과 같은 16 코어 SKU의 경우 최대 지원되는 부하는 동시 수집 요청 96개입니다. D11과 같은 2 코어 SKU의 경우 최대 지원되는 부하는 동시 수집 요청 12개입니다.
데이터 크기 제한: 스트리밍 수집 요청에 대한 데이터 크기 제한은 4MB입니다. 여기에는 수집 중 업데이트 정책을 위해 만들어진 모든 데이터가 포함됩니다.
스키마 업데이트: 테이블 만들기/수정 및 수집 매핑과 같은 스키마 업데이트에는 스트리밍 수집 서비스의 경우 최대 5분이 걸릴 수 있습니다. 자세한 내용은 스트리밍 수집 및 스키마 변경을 참조하세요.
SSD 용량: 클러스터에서 스트리밍 수집을 사용하도록 설정하면 스트리밍을 통해 데이터가 수집되지 않는 경우에도 클러스터 컴퓨터의 로컬 SSD 디스크 일부를 수집 데이터 스트리밍에 사용하고 핫 캐시에 사용할 수 있는 스토리지를 줄입니다.
새 Azure Data Explorer 클러스터를 만드는 동안 스트리밍 수집을 사용하도록 설정하려면 다음 코드를 실행합니다.
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
기존 클러스터에서 스트리밍 수집 사용
기존 클러스터가 있는 경우 Azure Portal을 사용하거나 C#에서 프로그래밍 방식으로 스트리밍 수집을 사용하도록 설정할 수 있습니다.
기존 Azure Data Explorer 클러스터를 업데이트하는 동안 스트리밍 수집을 사용하도록 설정할 수 있습니다.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
대상 테이블 만들기 및 정책 정의
스트리밍 수집 데이터를 받는 테이블을 만들고, Azure Portal을 사용하거나 C#에서 프로그래밍 방식으로 관련 정책을 정의합니다.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
클러스터에서 스트리밍 수집 사용 안 함
Warning
스트리밍 수집을 사용하지 않도록 설정하는 데 몇 시간이 걸릴 수 있습니다.
Azure Data Explorer 클러스터에서 스트리밍 수집을 사용하지 않도록 설정하기 전에 모든 관련 테이블과 데이터베이스에서 스트리밍 수집 정책을 삭제합니다. 스트리밍 수집 정책을 제거하면 Azure Data Explorer 클러스터 내에서 데이터 다시 정렬이 트리거됩니다. 스트리밍 수집 데이터는 초기 스토리지에서 열 저장소(익스텐트 또는 분할)의 영구 스토리지로 이동됩니다. 이 프로세스는 초기 스토리지의 데이터 양에 따라 몇 초에서 몇 시간까지 걸릴 수 있습니다.
스트리밍 수집 정책 삭제
Azure Portal을 사용하거나 C#에서 프로그래밍 방식으로 스트리밍 수집 정책을 삭제할 수 있습니다.
Azure Portal에서 Azure Data Explorer 클러스터로 이동한 다음, 쿼리를 선택합니다.
테이블에서 스트리밍 수집 정책을 삭제하려면 다음 명령을 쿼리 창에 복사하고 실행을 선택합니다.
.delete table TestTable policy streamingingestion
설정에서 구성을 선택합니다.
구성 창에서 끄기를 선택하여 스트리밍 수집을 사용하지 않도록 설정합니다.
저장을 선택합니다.
테이블에서 스트리밍 수집 정책을 삭제하려면 다음 코드를 실행합니다.
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
클러스터에서 스트리밍 수집을 사용하지 않도록 설정하려면 다음 코드를 실행합니다.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
제한 사항
스트리밍 수집에 사용하려면 데이터 매핑을 미리 만들어야 합니다. 개별 스트리밍 수집 요청은 인라인 데이터 매핑을 수용하지 않습니다.