Integrowanie Azure Functions z usługą Azure Data Explorer przy użyciu powiązań wejściowych i wyjściowych (wersja zapoznawcza)

Ważne

Ten łącznik może być używany w analizie w czasie rzeczywistym w usłudze Microsoft Fabric. Skorzystaj z instrukcji opisanych w tym artykule z następującymi wyjątkami:

Azure Functions umożliwia uruchamianie kodu bezserwerowego w chmurze zgodnie z harmonogramem lub w odpowiedzi na zdarzenie. Dzięki powiązaniom wejściowym i wyjściowym platformy Azure Data Explorer dla Azure Functions można zintegrować usługę Azure Data Explorer z przepływami pracy w celu pozyskiwania danych i uruchamiania zapytań względem klastra.

Wymagania wstępne

Wypróbuj integrację z naszym przykładowym projektem

Jak używać powiązań usługi Azure Data Explorer do Azure Functions

Aby uzyskać informacje na temat używania powiązań usługi Azure Data Explorer dla Azure Functions, zobacz następujące tematy:

Scenariusze korzystania z powiązań usługi Azure Data Explorer dla Azure Functions

W poniższych sekcjach opisano kilka typowych scenariuszy używania powiązań usługi Azure Data Explorer dla Azure Functions.

Powiązania wejściowe

Powiązania wejściowe uruchamiają zapytanie język zapytań Kusto (KQL) lub funkcję KQL, opcjonalnie z parametrami i zwracają dane wyjściowe do funkcji.

W poniższych sekcjach opisano sposób używania powiązań wejściowych w niektórych typowych scenariuszach.

Scenariusz 1. Punkt końcowy HTTP do wykonywania zapytań dotyczących danych z klastra

Użycie powiązań wejściowych ma zastosowanie w sytuacjach, w których musisz uwidocznić dane usługi Azure Data Explorer za pośrednictwem interfejsu API REST. W tym scenariuszu użyjesz wyzwalacza HTTP Azure Functions do wykonywania zapytań dotyczących danych w klastrze. Scenariusz jest szczególnie przydatny w sytuacjach, w których należy zapewnić programowy dostęp do danych usługi Azure Data Explorer dla zewnętrznych aplikacji lub usług. Uwidaczniając dane za pośrednictwem interfejsu API REST, aplikacje mogą łatwo korzystać z danych bez konieczności nawiązywania bezpośredniego połączenia z klastrem.

Kod definiuje funkcję z wyzwalaczem HTTP i powiązaniem wejściowym usługi Azure Data Explorer. Powiązanie wejściowe określa zapytanie uruchamiane względem tabeli Products w bazie danych productsdb . Funkcja używa kolumny productId jako predykatu przekazywanego jako parametru.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

Następnie można wywołać funkcję w następujący sposób:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Scenariusz 2. Zaplanowany wyzwalacz do eksportowania danych z klastra

Poniższy scenariusz ma zastosowanie w sytuacjach, w których dane muszą być eksportowane zgodnie z harmonogramem opartym na czasie.

Kod definiuje funkcję z wyzwalaczem czasomierza, który eksportuje agregację danych sprzedaży z bazy danych productsdb do pliku CSV w Azure Blob Storage.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Powiązania wyjściowe

Powiązania wyjściowe pobiera co najmniej jeden wiersz i wstawia je do tabeli usługi Azure Data Explorer.

W poniższych sekcjach opisano sposób używania powiązań wyjściowych w niektórych typowych scenariuszach.

Scenariusz 1. Punkt końcowy HTTP do pozyskiwania danych do klastra

Poniższy scenariusz ma zastosowanie w sytuacjach, w których przychodzące żądania HTTP muszą być przetwarzane i pozyskiwane do klastra. Za pomocą powiązania wyjściowego dane przychodzące z żądania można zapisywać w tabelach usługi Azure Data Explorer.

Kod definiuje funkcję z wyzwalaczem HTTP i powiązaniem wyjściowym usługi Azure Data Explorer. Ta funkcja pobiera ładunek JSON w treści żądania HTTP i zapisuje go w tabeli products w bazie danych productsdb .

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

Następnie można wywołać funkcję w następujący sposób:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Scenariusz 2. Pozyskiwanie danych z oprogramowania RabbitMQ lub innych systemów obsługi komunikatów obsługiwanych na platformie Azure

Poniższy scenariusz ma zastosowanie w sytuacjach, w których dane z systemu obsługi komunikatów muszą zostać pozyskane do klastra. Przy użyciu powiązania wyjściowego dane przychodzące z systemu obsługi komunikatów można pozyskiwać do tabel usługi Azure Data Explorer.

Kod definiuje funkcję z komunikatami, danymi w formacie JSON, przychodzącymi za pośrednictwem wyzwalacza RabbitMQ pozyskanego do tabeli products w bazie danych productsdb .

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Aby uzyskać więcej informacji na temat funkcji, zobacz dokumentację Azure Functions. Rozszerzenie azure Data Explorer jest dostępne w następujących miejscach: