Share via


Azure Functions integreren met Azure Data Explorer met behulp van invoer- en uitvoerbindingen (preview)

Belangrijk

Deze connector kan worden gebruikt in realtime analyses in Microsoft Fabric. Gebruik de instructies in dit artikel met de volgende uitzonderingen:

met Azure Functions kunt u serverloze code in de cloud uitvoeren volgens een schema of als reactie op een gebeurtenis. Met Azure Data Explorer invoer- en uitvoerbindingen voor Azure Functions kunt u Azure Data Explorer integreren in uw werkstromen om gegevens op te nemen en query's uit te voeren op uw cluster.

Vereisten

De integratie met ons voorbeeldproject uitproberen

Azure Data Explorer-bindingen gebruiken voor Azure Functions

Zie de volgende onderwerpen voor meer informatie over het gebruik van Azure Data Explorer-bindingen voor Azure Functions:

Scenario's voor het gebruik van Azure Data Explorer-bindingen voor Azure Functions

In de volgende secties worden enkele veelvoorkomende scenario's beschreven voor het gebruik van Azure Data Explorer-bindingen voor Azure Functions.

Invoerbindingen

Invoerbindingen voeren een KQL-query (Kusto-querytaal) of KQL-functie uit, optioneel met parameters, en retourneren de uitvoer naar de functie.

In de volgende secties wordt beschreven hoe u invoerbindingen gebruikt in enkele veelvoorkomende scenario's.

Scenario 1: Een HTTP-eindpunt om gegevens uit een cluster op te vragen

Het gebruik van invoerbindingen is van toepassing in situaties waarin u Azure Data Explorer gegevens beschikbaar moet maken via een REST API. In dit scenario gebruikt u een Azure Functions HTTP-trigger om gegevens in uw cluster op te vragen. Het scenario is met name handig in situaties waarin u programmatische toegang moet bieden tot Azure Data Explorer-gegevens voor externe toepassingen of services. Door uw gegevens beschikbaar te maken via een REST API, kunnen toepassingen de gegevens gemakkelijk gebruiken zonder dat ze rechtstreeks verbinding hoeven te maken met uw cluster.

De code definieert een functie met een HTTP-trigger en een Azure Data Explorer invoerbinding. De invoerbinding geeft de query op die moet worden uitgevoerd op de tabel Products in de database productsdb . De functie gebruikt de kolom productId als het predicaat dat als parameter is doorgegeven.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

De functie kan vervolgens als volgt worden aangeroepen:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Scenario 2: Een geplande trigger voor het exporteren van gegevens uit een cluster

Het volgende scenario is van toepassing in situaties waarin gegevens moeten worden geëxporteerd volgens een op tijd gebaseerd schema.

De code definieert een functie met een timertrigger waarmee een aggregatie van verkoopgegevens uit de database productsdb wordt geëxporteerd naar een CSV-bestand in Azure Blob Storage.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Uitvoerbindingen

Uitvoerbindingen neemt een of meer rijen en voegt deze in een Azure Data Explorer-tabel in.

In de volgende secties wordt beschreven hoe u uitvoerbindingen gebruikt in enkele veelvoorkomende scenario's.

Scenario 1: HTTP-eindpunt om gegevens op te nemen in een cluster

Het volgende scenario is van toepassing in situaties waarin binnenkomende HTTP-aanvragen moeten worden verwerkt en opgenomen in uw cluster. Met behulp van een uitvoerbinding kunnen binnenkomende gegevens van de aanvraag worden geschreven naar Azure Data Explorer-tabellen.

De code definieert een functie met een HTTP-trigger en een Azure Data Explorer uitvoerbinding. Deze functie neemt een JSON-nettolading in de HTTP-aanvraagbody en schrijft deze naar de tabel products in de database productsdb .

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

De functie kan vervolgens als volgt worden aangeroepen:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Scenario 2: Gegevens opnemen van RabbitMQ of andere berichtensystemen die worden ondersteund in Azure

Het volgende scenario is van toepassing in situaties waarin gegevens van een berichtensysteem moeten worden opgenomen in uw cluster. Met behulp van een uitvoerbinding kunnen binnenkomende gegevens van het berichtensysteem worden opgenomen in Azure Data Explorer-tabellen.

De code definieert een functie met berichten, gegevens in JSON-indeling, die binnenkomen via een RabbitMQ-trigger die worden opgenomen in de tabel products in de database productsdb .

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Zie Azure Functions documentatie voor meer informatie over functies. De Azure Data Explorer-extensie is beschikbaar op: