Поделиться через


Обработка потоков данных с помощью Azure Databricks

Azure Cosmos DB
Azure Databricks
Концентраторы событий Azure
Azure Log Analytics
Azure Monitor

На схеме эталонной архитектуры представлен сквозной конвейер обработки потоков данных. Четыре этапа этого конвейера включают прием, обработку, хранение и анализ и отчет. В этой эталонной архитектуре конвейер принимает данные из двух источников, объединяет связанные записи из каждого потока, дополняет результат и вычисляет среднее значение в реальном времени. Затем результаты сохраняются для дальнейшего анализа.

Архитектура

диаграмме, которая показывает эталонную архитектуру для потоковой обработки с помощью Azure Databricks.

Скачайте файл Visio данной архитектуры.

Поток данных

Следующий поток данных соответствует предыдущей схеме:

  1. Ingest

    Два потока операционных данных в режиме реального времени передают систему: данные о тарифах и данные о поездках . Устройства, установленные в такси, служат источниками данных и публикуют события в Azure Event Hubs. Каждый поток направляется в свой экземпляр концентратора событий, который предоставляет независимые пути обработки.

  2. Процесс

    Azure Databricks использует потоки центров событий и выполняет следующие операции:

    • Сопоставляет записи тарифа с записями поездки
    • Обогащение данных с помощью третьего набора данных, содержащего данные поиска по соседству, хранящиеся в файловой системе Azure Databricks

    Этот процесс создает унифицированный, обогащенный набор данных, подходящий для нисходящей аналитики и хранилища.

  3. Store

    Выходные данные заданий Azure Databricks — это серия записей. Обработанные записи записываются в Azure Cosmos DB для NoSQL.

  4. Анализ и отчет

    Структура зеркально отражает операционные данные из Azure Cosmos DB для NoSQL , чтобы включить аналитические запросы, не влияя на производительность транзакций. Этот подход предоставляет путь без ETL для аналитики. В этой архитектуре для следующих целей можно использовать зеркальное отображение:

    • Отражение данных Azure Cosmos DB (или данных в формате Delta) в Fabric
    • Синхронизация наборов данных с операционной системой
    • Включите анализ с помощью следующих средств:
      • Конечные точки аналитики SQL Fabric для озерных домов и складов
      • Записные книжки Apache Spark
      • Аналитика в режиме реального времени с помощью языка запросов Kusto (KQL) для изучения временных рядов и журналов
  5. Monitor

    Azure Monitor собирает данные телеметрии из конвейера обработки Azure Databricks. Рабочая область Log Analytics хранит журналы приложений и метрики. Вы можете выполнить следующие действия:

    • Запрос операционных журналов
    • Визуализация метрик
    • Проверка сбоев, аномалий и проблем с производительностью
    • Создание панелей мониторинга

Components

  • Azure Databricks — это платформа аналитики на основе Spark, оптимизированная для платформы Azure. В этой архитектуре задания Azure Databricks обогащают данные о поездках и тарифах такси и хранят результаты в Azure Cosmos DB.

  • Центры событий — это управляемая распределенная служба приема, которая может масштабироваться до приема больших объемов событий. Эта архитектура использует два экземпляра концентратора событий для получения данных от такси.

  • Azure Cosmos DB для NoSQL — это управляемая служба базы данных с несколькими моделями. В этой архитектуре он хранит выходные данные заданий обогащения Azure Databricks. Структура зеркально отражает операционные данные Azure Cosmos DB для включения аналитических запросов.

  • Log Analytics — это средство в Azure Monitor, которое помогает запрашивать и анализировать данные журнала из различных источников. В этой архитектуре все ресурсы настраивают диагностику Azure для хранения журналов платформ в этой рабочей области. Рабочая область также служит приемником данных для метрик задач Spark, передаваемых из обрабатывающих конвейеров Azure Databricks.

Подробности сценария

Компания такси собирает данные о каждой поездке на такси. В этом сценарии предполагается, что два отдельных устройства отправляют данные. Такси имеет метр, который отправляет информацию о каждой поездке, включая длительность, расстояние и посадку и раскрывающиеся места. Отдельное устройство принимает платежи от клиентов и отправляет данные о тарифах. Чтобы определить тенденции всадника, компания такси хочет вычислить средний чаевые на милю, управляемый для каждого района, в режиме реального времени.

Прием данных

Для имитации источника данных эта эталонная архитектура использует набор данных о такси в Нью-Йорке. Этот набор данных содержит данные о поездках по такси в Нью-Йорке с 2010 по 2013 год. Он содержит записи данных о поездках и тарифах. Данные о поездках включают длительность поездки, расстояние поездки, а также места сбора и удаления. Данные о тарифах включают сведения о тарифе, налоге и сумме чаевых. Поля в обоих типах записей включают номер медальона, лицензию взлома и идентификатор поставщика. Сочетание этих трех полей однозначно определяет такси и водителя. Данные хранятся в формате CSV.

Генератор данных — это приложение .NET Core, которое считывает записи и отправляет их в Центры событий. Генератор отправляет данные о поездке в формате JSON, а данные о тарифах — в формате CSV.

Для сегментации данных Центры событий используют секции. Разделы позволяют потребителю читать данные параллельно. При отправке данных в Центры событий можно указать ключ секции напрямую. В противном случае записи назначаются секциям методом циклического перебора.

В этом сценарии данные о поездках и тарифы должны быть назначены одному и тому же идентификатору секции для конкретного такси такси. Это назначение позволяет Databricks применять степень параллелизма при сопоставлении двух потоков. Например, запись в разделе n данных о поездке соответствует записи в разделе n данных тарифа.

Схема потоковой обработки с помощью Azure Databricks и Центров событий.

Скачайте файл Visio этой архитектуры.

В генераторе данных общая модель данных для обоих типов записей имеет свойство PartitionKey, в котором объединены Medallion, HackLicense и VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Это свойство предоставляет явный ключ секции при отправке данных в Центры событий.

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Центры событий

Пропускная способность Центров событий вычисляется в единицах пропускной способности. Вы можете автоматически масштабировать концентратор событий, включив автонадувание. Эта функция автоматически масштабирует единицы пропускной способности на основе трафика до заданного максимума.

Потоковая обработка

В Azure Databricks задание выполняет обработку данных. Задание назначается кластеру, а затем выполняется на нем. Задание может быть пользовательским кодом, написанным на Java или записной книжкой Spark.

В этой эталонной архитектуре задание — это архив Java, который содержит классы, написанные на Java и Scala. При указании архива Java для задания Azure Databricks кластер Azure Databricks указывает класс для операции. main Здесь метод com.microsoft.pnp.TaxiCabReader класса содержит логику обработки данных.

Чтение потока из двух экземпляров концентратора событий

Для считывания данных из двух экземпляров концентратора событий Azure в логике обработки данных используется Spark Structured Streaming.

// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()

val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiRideConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
  .format("eventhubs")
  .options(rideEventHubOptions.toMap)
  .load

val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
  .setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
  .setConsumerGroup(conf.taxiFareConsumerGroup())
  .setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
  .format("eventhubs")
  .options(fareEventHubOptions.toMap)
  .load

Обогащение данных с помощью сведений о районе

Данные о поездке включают координаты широты и долготы мест сбора и удаления. Эти координаты полезны, но не легко используются для анализа. Таким образом, поток обогатит эти данные данными о соседстве, считываемыми из shapefile.

Формат файла фигуры является двоичным и не легко анализируется. Но библиотека GeoTools предоставляет средства для геопространственных данных, использующих формат файла фигуры. Эта библиотека используется в классе com.microsoft.pnp.GeoFinder для определения имени района на основе координат для расположений сбора и удаления.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Присоединение к данным о поездках и тарифах

Сначала выполняется преобразование данных о поездках и тарифах:

val rides = transformedRides
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedRides.add(1)
      false
    }
  })
  .select(
    $"ride.*",
    to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
      .as("pickupNeighborhood"),
    to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
      .as("dropoffNeighborhood")
  )
  .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

val fares = transformedFares
  .filter(r => {
    if (r.isNullAt(r.fieldIndex("errorMessage"))) {
      true
    }
    else {
      malformedFares.add(1)
      false
    }
  })
  .select(
    $"fare.*",
    $"pickupTime"
  )
  .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Затем данные о поездках присоединяются к данным тарифа:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Обработка данных и его вставка в Azure Cosmos DB

Средняя сумма тарифа для каждого района рассчитывается для определенного интервала времени:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Затем средняя сумма тарифа вставляется в Azure Cosmos DB:

maxAvgFarePerNeighborhood
  .writeStream
  .format("cosmos.oltp")
  .option("spark.cosmos.accountEndpoint", "<your-cosmos-endpoint>")
  .option("spark.cosmos.accountKey", "<your-cosmos-key>")
  .option("spark.cosmos.database", "<your-database-name>")
  .option("spark.cosmos.container", "<your-container-name>")
  .option("checkpointLocation", "/mnt/checkpoints/maxAvgFarePerNeighborhood")
  .outputMode("append")
  .start()
  .awaitTermination()

Рекомендации

Эти рекомендации реализуют основные принципы платформы Azure Well-Architected Framework, которая представляет собой набор руководящих принципов, которые можно использовать для улучшения качества рабочей нагрузки. Дополнительные сведения см. вWell-Architected Framework.

Безопасность

Безопасность обеспечивает гарантии от преднамеренного нападения и неправильного использования ценных данных и систем. Дополнительные сведения см. в контрольном списке конструктора длябезопасности.

Доступ к рабочей области Azure Databricks управляется с помощью консоли администрирования. Консоль администрирования включает функции для добавления пользователей, управления разрешениями пользователей и настройки единого входа. Также в ней можно настроить управление доступом для рабочих областей, кластеров, заданий и таблиц.

Управление секретами

Azure Databricks включает в себя секретное хранилище, которое используется для хранения учетных данных и ссылки на них в записных книжках и заданиях. Области секционирования секретов в хранилище секретов Azure Databricks:

databricks secrets create-scope --scope "azure-databricks-job"

Секреты добавляются на уровне области:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Примечание.

Используйте область, поддерживаемую Azure Key Vault, вместо собственной области Azure Databricks.

Код обращается к секретам с помощью утилит для управления секретами Azure Databricks.

Оптимизация затрат

Оптимизация затрат фокусируется на способах сокращения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в контрольном списке конструктора дляоптимизации затрат.

Для оценки затрат используйте калькулятор цен Azure. Рассмотрим следующие службы, используемые в этой эталонной архитектуре.

Рекомендации по затратам центров событий

Эта эталонная архитектура развертывает центры событий на уровне "Стандартный". Модель ценообразования основана на единицах пропускной способности, событиях входящего трафика и записи событий. Событие входящего трафика — это единица данных, которая составляет 64 КБ или меньше. Оплата зависит от размера сообщений (одно событие до 64 КБ). Единицы пропускной способности указываются через API управления портал Azure или Центрами событий.

Если требуется больше дней хранения, рассмотрите уровень "Выделенный". Этот уровень предоставляет развертывания с одним клиентом, которые имеют строгие требования. Это предложение создает кластер на основе единиц емкости и не зависит от единиц пропускной способности. Плата за уровень "Стандартный" также взимается на основе событий входящего трафика и единиц пропускной способности.

Дополнительные сведения см. в ценовыхЦентров событий.

Рекомендации по затратам Azure Databricks

Azure Databricks предоставляет уровень "Стандартный" и уровень "Премиум", оба из которых поддерживают три рабочих нагрузки. Эта эталонная архитектура развертывает рабочую область Azure Databricks на уровне "Премиум".

Рабочие нагрузки проектирования данных должны выполняться в кластере заданий. Инженеры данных используют кластеры для создания и выполнения заданий. Рабочие нагрузки аналитики данных должны выполняться в кластере всех целей и предназначены для специалистов по обработке и анализу данных для интерактивного изучения, визуализации, управления данными и обмена ими.

Azure Databricks предоставляет несколько моделей ценообразования.

  • плана оплаты по мере использования

    Плата за виртуальные машины, подготовленные в кластерах и единицах субД Azure Databricks, взимается на основе выбранного экземпляра виртуальной машины. DBU — это единица измерения вычислительных возможностей, по которой Azure выставляет счета на основе использования в секунду. Потребление DBU зависит от размера и типа экземпляра, работающего в Azure Databricks. Цены зависят от выбранной рабочей нагрузки и уровня.

  • план предварительной покупки

    Вы фиксируете базы данных в качестве единиц фиксации Azure Databricks в течение одного или трех лет, чтобы снизить общую стоимость владения в течение этого периода времени по сравнению с моделью оплаты по мере использования.

Дополнительные сведения см. в цен на Azure Databricks.

Рекомендации по затратам Azure Cosmos DB

В этой архитектуре задание Azure Databricks записывает ряд записей в Azure Cosmos DB. Плата взимается за резервную емкость, которая измеряется в единицах запросов в секунду (ЕЗ/с). Эта емкость используется для выполнения операций вставки. Единица выставления счетов составляет 100 ЕЗ/с в час. Например, стоимость записи элементов размером 100 КБ составляет 50 ЕЗ/с.

Для операций записи настройте достаточно емкости для поддержки количества операций записи, необходимых в секунду. Вы можете увеличить подготовленную пропускную способность с помощью портала или Azure CLI перед выполнением операций записи, а затем сократить пропускную способность после завершения этих операций. Пропускная способность для периода записи — это сумма минимальной пропускной способности, необходимой для конкретных данных, и пропускной способности, необходимой для операции вставки. В этом вычислении предполагается, что не выполняется другая рабочая нагрузка.

Пример анализа затрат

Предположим, что вы настраиваете значение пропускной способности 1000 RU/с в контейнере и используете его непрерывно в течение 30 дней, что равно 720 часам.

Контейнер выставляется по 10 единицам ез/с в час за каждый час. 10 единиц в 0,008 долл. США (за 100 ЕЗ/с в час) взимается по 0,08 долл. США в час.

В течение 720 часов или 7200 единиц (из 100 единиц), вы выставляете счета за 57,60 долл. США в течение месяца.

Плата за хранение также взимается за каждый ГБ, используемый для хранимых данных и индекса. Дополнительные сведения см. в модели ценообразования Azure Cosmos DB.

Используйте калькулятор емкости Azure Cosmos DB для быстрой оценки затрат на рабочую нагрузку.

Операционное превосходство

Операционное превосходство охватывает процессы, которые развертывают приложение и продолжают работать в рабочей среде. Дополнительные сведения см. в контрольном списке проверки конструктора дляоперационного превосходства.

Наблюдение

Azure Databricks основан на Apache Spark. Azure Databricks и Apache Spark используют Apache Log4j в качестве стандартной библиотеки для ведения журнала. Помимо ведения журнала по умолчанию, которое предоставляет Apache Spark, можно реализовать ведение журнала в Log Analytics. Дополнительные сведения см. в статье Мониторинг в Azure Databricks.

Поскольку com.microsoft.pnp.TaxiCabReader класс обрабатывает сообщения о поездках и тарифах, сообщение может быть неправильно сформировано и поэтому недопустимо. В рабочей среде важно проанализировать эти неправильные сообщения, чтобы определить проблему с источниками данных, чтобы ее можно было быстро устранить, чтобы предотвратить потерю данных. Класс com.microsoft.pnp.TaxiCabReader регистрирует накопитель Apache Spark, отслеживающий количество неправильно сформированных записей тарифа и записей о поездках:

@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark использует библиотеку Dropwizard для отправки метрик. Некоторые из собственных полей метрик Dropwizard несовместимы с Log Analytics, поэтому эта эталонная архитектура включает в себя настраиваемый приемник Dropwizard и репортер. Он форматирует метрики в формате, который ожидает Log Analytics. Apache Spark передает метрики вместе с пользовательскими метриками для данных о поездках и тарифах неправильного формата.

Чтобы отслеживать работу задания потоковой передачи, можно использовать следующие примеры запросов в рабочей области Log Analytics. Аргумент ago(1d) в каждом запросе возвращает все записи, созданные за последний день. Этот параметр можно настроить для просмотра другого периода времени.

Исключения, зарегистрированные во время операции потокового запроса

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Сбор данных неправильного формата о поездках и тарифах

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Операция задания с течением времени

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Организация ресурсов и развертывания

  • Создайте отдельные группы ресурсов для рабочей среды, сред разработки и тестирования. Так будет проще управлять развертываниями, удалять тестовые развертывания и назначать права доступа.

  • Используйте шаблон Azure Resource Manager для развертывания ресурсов Azure в соответствии с процессом инфраструктуры как кода. С помощью шаблонов можно автоматизировать развертывания с помощью служб Azure DevOps или других решений непрерывной интеграции и непрерывной доставки (CI/CD).

  • Поместите каждую рабочую нагрузку в отдельный шаблон развертывания и сохраните ресурсы в системах управления версиями. Можно развернуть шаблоны вместе или по отдельности в процессе непрерывной интеграции и непрерывного развертывания. Этот подход упрощает процесс автоматизации.

    В этой архитектуре Центры событий, Log Analytics и Azure Cosmos DB определяются как одна рабочая нагрузка. Эти ресурсы включены в один шаблон Azure Resource Manager.

  • Рассмотрите возможность промежуточного хранения рабочих нагрузок. Развернитесь на различных этапах и выполните проверки на каждом этапе перед переходом к следующему этапу. Таким образом вы можете управлять отправкой обновлений в рабочие среды и свести к минимуму непредвиденные проблемы с развертыванием.

    В этой архитектуре существует несколько этапов развертывания. Рассмотрите возможность создания конвейера Azure DevOps и добавления этих этапов. Вы можете автоматизировать следующие этапы:

    • Запустите кластер Azure Databricks.
    • Настройте интерфейс командной строки Azure Databricks.
    • Установите средства Scala.
    • Добавьте секреты Azure Databricks.

    Рассмотрите возможность написания автоматических тестов интеграции для повышения качества и надежности кода Azure Databricks и его жизненного цикла.

Следующий шаг