Поделиться через


Интеграция Функции Azure с Azure Data Explorer с помощью входных и выходных привязок (предварительная версия)

Внимание

Этот соединитель можно использовать в аналитике в режиме реального времени в Microsoft Fabric. Используйте инструкции в этой статье со следующими исключениями:

Функции Azure позволяют запускать бессерверный код в облаке по расписанию или в ответ на событие. С помощью входных и выходных привязок Azure Data Explorer для Функции Azure можно интегрировать Azure Data Explorer в рабочие процессы для приема данных и выполнения запросов к кластеру.

Необходимые компоненты

Попробуйте выполнить интеграцию с примером проекта

Использование привязок Azure Data Explorer для Функции Azure

Сведения об использовании привязок Azure Data Explorer для Функции Azure см. в следующих разделах:

Сценарии использования привязок Azure Data Explorer для Функции Azure

В следующих разделах описаны некоторые распространенные сценарии использования привязок Azure Data Explorer для Функции Azure.

Входные привязки

Входные привязки выполняют запрос язык запросов Kusto (KQL) или функцию KQL, при необходимости с параметрами и возвращают выходные данные функции.

В следующих разделах описывается использование входных привязок в некоторых распространенных сценариях.

Сценарий 1. Конечная точка HTTP для запроса данных из кластера

Использование входных привязок применимо в ситуациях, когда необходимо предоставить данные Azure Data Explorer через REST API. В этом сценарии для запроса данных в кластере используется триггер HTTP Функции Azure. Этот сценарий особенно полезен в ситуациях, когда необходимо предоставить программный доступ к данным Azure Data Explorer для внешних приложений или служб. Предоставляя данные через REST API, приложения могут легко использовать данные без необходимости подключаться непосредственно к кластеру.

Код определяет функцию с триггером HTTP и входной привязкой Azure Data Explorer. Входная привязка указывает запрос для выполнения в таблице Products в базе данных productsdb . Функция использует столбец productId в качестве предиката, передаваемого в качестве параметра.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

Затем функцию можно вызвать следующим образом:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Сценарий 2. Запланированный триггер для экспорта данных из кластера

Следующий сценарий применим в ситуациях, когда данные необходимо экспортировать в расписание на основе времени.

Код определяет функцию с триггером таймера, который экспортирует агрегирование данных о продажах из базы данных productsdb в CSV-файл в Хранилище BLOB-объектов Azure.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Выходные привязки

Выходные привязки принимают одну или несколько строк и вставляют их в таблицу Azure Data Explorer.

В следующих разделах описывается использование выходных привязок в некоторых распространенных сценариях.

Сценарий 1. Конечная точка HTTP для приема данных в кластер

Следующий сценарий применим в ситуациях, когда входящие HTTP-запросы должны обрабатываться и приема в кластер. Используя выходную привязку, входящие данные из запроса можно записать в таблицы Azure Data Explorer.

Код определяет функцию с триггером HTTP и выходной привязкой Azure Data Explorer. Эта функция принимает полезные данные JSON в тексте HTTP-запроса и записывает его в таблицу продуктов в базе данных productsdb .

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

Затем функцию можно вызвать следующим образом:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Сценарий 2. Прием данных из RabbitMQ или других систем обмена сообщениями, поддерживаемых в Azure

Следующий сценарий применим в ситуациях, когда данные из системы обмена сообщениями необходимо принять в кластер. Используя выходную привязку, входящие данные из системы обмена сообщениями можно получать в таблицы Azure Data Explorer.

Код определяет функцию с сообщениями, данными в формате JSON, входящей с помощью триггера RabbitMQ, которые передаются в таблицу продуктов в базе данных productsdb .

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Дополнительные сведения о функциях см. в Функции Azure документации. Расширение Azure Data Explorer доступно в следующих разделах: