Pemrosesan aliran dengan Azure Databricks

Azure Cosmos DB
Azure Databricks
Azure Event Hubs
Azure Log Analytics
Azure Monitor

Arsitektur referensi ini menunjukkan alur pemrosesan aliran end-to-end. Jenis alur ini memiliki empat tahap: penyerapan, pemrosesan, penyimpanan, serta analisis dan pelaporan. Untuk arsitektur referensi ini, alur menyerap data dari dua sumber, melakukan penggabungan pada catatan terkait dari setiap aliran, memperkaya hasil, dan menghitung rata-rata secara real time. Hasilnya disimpan untuk analisis selengkapnya.

GitHub logo Implementasi referensi untuk arsitektur ini tersedia di GitHub.

Arsitektur

Diagram showing a reference architecture for stream processing with Azure Databricks.

Unduh file Visio arsitektur ini.

Alur kerja

Arsitektur terdiri dari komponen-komponen berikut:

Sumber data. Dalam arsitektur ini, ada dua sumber data yang menghasilkan aliran secara real time. Aliran pertama berisi informasi perjalanan, dan aliran kedua berisi informasi tarif. Arsitektur referensi mencakup generator data yang disimulasikan, yang membaca dari sekumpulan file statis dan mendorong data ke Azure Event Hubs. Sumber data dalam aplikasi yang sebenarnya adalah perangkat yang dipasang di taksi.

Azure Event Hubs. Azure Event Hubs merupakan layanan penyerapan kejadian. Arsitektur ini menggunakan dua instans event hub, satu untuk setiap sumber data. Setiap sumber data mengirimkan aliran data ke hub peristiwa terkait.

Azure Databricks. Databricks adalah platform analitik berbasis Apache Spark yang dioptimalkan untuk platform layanan cloud Microsoft Azure. Databricks digunakan untuk menghubungkan data perjalanan dan tarif taksi, serta untuk memperkaya data yang berhubungan dengan data lingkungan yang disimpan dalam sistem file Databricks.

Azure Cosmos DB. Output dari pekerjaan Azure Databricks adalah serangkaian rekaman, yang ditulis ke Azure Cosmos DB untuk Apache Cassandra. Azure Cosmos DB for Apache Cassandra digunakan karena mendukung pemodelan data rangkaian waktu.

  • Azure Synapse Link untuk Azure Cosmos DB memungkinkan Anda menjalankan analisis mendekati real-time terhadap data operasional di Azure Cosmos DB, tanpa menimbulkan dampak performa atau biaya pada beban kerja transaksional, dengan menggunakan dua mesin analitik yang tersedia dari ruang kerja Azure Synapse Anda: SQL Serverless dan Spark Pools.

Azure Log Analytics. Data log aplikasi yang dikumpulkan oleh Azure Monitor disimpan di ruang kerja Log Analytics. Kueri Log Analytics dapat digunakan untuk menganalisis dan memvisualisasikan metrik dan memeriksa pesan log untuk mengidentifikasi masalah dalam aplikasi.

Alternatif

  • Synapse Link adalah solusi pilihan Microsoft untuk analitik di atas data Azure Cosmos DB.

Detail skenario

Skenario: Sebuah perusahaan taksi mengumpulkan data tentang setiap perjalanan taksi. Untuk skenario ini, kita menganggap ada dua perangkat yang terpisah yang mengirim data. Taksi memiliki meteran yang mengirimkan informasi tentang setiap perjalanan — yakni durasi, jarak, serta lokasi penjemputan dan pengantaran. Perangkat yang terpisah menerima pembayaran dari pelanggan dan mengirimkan data tentang tarif. Untuk mengetahui tren penumpang, perusahaan taksi ini ingin menghitung tip rata-rata per mil yang ditempuh, secara real time, untuk setiap lingkungan.

Kemungkinan kasus penggunaan

Solusi ini dioptimalkan untuk industri ritel.

Penyerapan data

Untuk menyimulasikan sumber data, arsitektur referensi ini menggunakan himpunan data Data Taksi Kota New York[1]. Himpunan data ini berisi data tentang perjalanan taksi di New York City selama periode empat tahun (2010 – 2013). Ini berisi dua jenis catatan: Data perjalanan dan data tarif. Data perjalanan meliputi durasi perjalanan, jarak perjalanan, serta lokasi penjemputan dan pengantaran. Data tarif mencakup tarif, pajak, dan jumlah tip. Bidang umum di kedua jenis catatan ini termasuk nomor medali, lisensi hack, dan ID vendor. Bersama-sama, ketiga bidang ini secara unik mengidentifikasi taksi serta pengemudi. Data disimpan dalam format CSV.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universitas Illinois di Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Generator data adalah aplikasi .NET Core yang membaca catatan dan mengirimkannya ke Azure Event Hubs. Generator tersebut mengirimkan data perjalanan dalam format JSON dan data tarif dalam format CSV.

Azure Event Hubs menggunakan partisi untuk melakukan segmentasi data. Partisi memungkinkan konsumen membaca setiap partisi secara paralel. Saat mengirim data ke Azure Event Hubs, Anda dapat menentukan kunci partisi secara eksplisit. Jika tidak, catatan ditetapkan ke partisi secara round-robin.

Dalam skenario ini, data perjalanan dan data tarif harus berakhir dengan ID partisi yang sama untuk taksi tertentu. Hal ini memungkinkan Databricks untuk menerapkan tingkat paralelisme ketika menghubungkan dua aliran. Catatan di partisi n data perjalanan akan cocok dengan catatan di partisi n data tarif.

Diagram of stream processing with Azure Databricks and Event Hubs.

Unduh file Visio arsitektur ini.

Di generator data, model data umum untuk kedua jenis catatan memiliki properti PartitionKey yang merupakan perangkaian dari Medallion, HackLicense, dan VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Properti ini digunakan untuk menyediakan kunci partisi eksplisit saat mengirim ke Azure Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Pusat Aktivitas

Kapasitas throughput Azure Event Hubs diukur dalam unit throughput. Anda dapat menskalakan hub peristiwa secara otomatis dengan mengaktifkan perluas otomatis, yang secara otomatis memperluas skala unit throughput berdasarkan lalu lintas, hingga jumlah maksimum yang dikonfigurasi.

Pemrosesan aliran

Di Azure Databricks, pemrosesan data dilakukan oleh pekerjaan. Pekerjaan ditugaskan pada dan berjalan di sebuah kluster. Pekerjaan dapat berupa kode kustom yang ditulis dalam Java, atau notebook Spark.

Dalam arsitektur referensi ini, pekerjaannya adalah arsip Java dengan kelas yang ditulis dalam Java dan Scala. Saat menentukan arsip Java untuk pekerjaan Databricks, kelas ditentukan untuk dieksekusi oleh kluster Databricks. Di sini, metode utama kelas com.microsoft.pnp.TaxiCabReader berisi logika pemrosesan data.

Membaca aliran dari dua instans event hub

Logika pemrosesan data menggunakan aliran terstruktur Spark untuk membaca dari dua instans Azure event hub:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Memperkaya data dengan informasi lingkungan

Data perjalanan mencakup koordinat lintang dan bujur dari lokasi penjemputan dan pengantaran. Meskipun koordinat ini berguna, tetapi koordinat ini tidak mudah digunakan untuk analisis. Oleh karena itu, data ini diperkaya dengan data lingkungan yang dibaca dari shapefile.

Format shapefile adalah biner dan tidak mudah diuraikan, tetapi pustaka GeoTools menyediakan alat untuk data geospasial yang menggunakan format shapefile. Pustaka ini digunakan di kelas com.microsoft.pnp.GeoFinder untuk menentukan nama lingkungan berdasarkan koordinat penjemputan dan pengantaran.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Menggabungkan data perjalanan dan tarif

Pertama, data perjalanan dan tarif diubah:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Kemudian data perjalanan digabungkan dengan data tarif:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Memproses data dan menyisipkan ke Azure Cosmos DB

Jumlah tarif rata-rata untuk setiap lingkungan dihitung untuk interval waktu tertentu:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Yang kemudian dimasukkan ke dalam Azure Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Pertimbangan

Pertimbangan ini mengimplementasikan pilar Azure Well-Architected Framework, yang merupakan serangkaian tenet panduan yang dapat digunakan untuk meningkatkan kualitas beban kerja. Untuk informasi selengkapnya, lihat Microsoft Azure Well-Architected Framework.

Keamanan

Keamanan memberikan jaminan terhadap serangan yang disukai dan penyalahgunaan data dan sistem berharga Anda. Untuk informasi selengkapnya, lihat Gambaran Umum pilar keamanan.

Akses ke ruang kerja Azure Databricks dikontrol menggunakan konsol administrator. Konsol administrator mencakup fungsionalitas untuk menambahkan pengguna, mengelola izin pengguna, dan menyiapkan akses menyeluruh. Kontrol akses untuk ruang kerja, kluster, pekerjaan, dan tabel juga dapat diatur melalui konsol administrator.

Mengelola rahasia

Azure Databricks menyertakan penyimpanan rahasia yang digunakan untuk menyimpan rahasia, termasuk string koneksi, kunci akses, nama pengguna, dan kata sandi. Rahasia dalam penyimpanan rahasia Azure Databricks dipartisi berdasarkan cakupan:

databricks secrets create-scope --scope "azure-databricks-job"

Rahasia ditambahkan pada tingkat cakupan:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Catatan

Cakupan yang didukung Azure Key Vault dapat digunakan sebagai ganti dari cakupan Azure Databricks native. Untuk mempelajari selengkapnya, lihat cakupan yang didukung Azure Key Vault.

Dalam kode, rahasia diakses melalui utilitas rahasia Azure Databricks.

Pemantauan

Azure Databricks didasarkan pada Apache Spark, dan keduanya menggunakan log4j sebagai pustaka standar untuk pengelogan. Selain pengelogan default yang disediakan oleh Apache Spark, Anda dapat mengimplementasikan pengelogan ke Azure Log Analytics dengan mengikuti artikel Memantau Azure Databricks.

Karena kelas com.microsoft.pnp.TaxiCabReader memproses pesan perjalanan dan tarif, mungkin salah satu formatnya salah sehingga tidak valid. Di lingkungan produksi, penting untuk menganalisis pesan yang salah format ini untuk mengidentifikasi masalah pada sumber data, sehingga masalah tersebut dapat diperbaiki dengan cepat untuk mencegah kehilangan data. Kelas com.microsoft.pnp.TaxiCabReader mendaftarkan Apache Spark Accumulator yang melacak jumlah catatan tarif dan perjalanan yang salah format:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark menggunakan pustaka Dropwizard untuk mengirim metrik, dan beberapa bidang metrik Dropwizard native tidak kompatibel dengan Azure Log Analytics. Oleh karena itu, arsitektur referensi ini menyertakan sink dan reporter Dropwizard kustom. Ini memformat metrik dalam format yang diharapkan oleh Azure Log Analytics. Saat Apache Spark melaporkan metrik, metrik kustom untuk data perjalanan dan tarif dengan format yang salah juga dikirim.

Berikut ini adalah contoh kueri yang dapat Anda gunakan di ruang kerja Azure Log Analytics untuk memantau eksekusi pekerjaan aliran. Argumen ago(1d) di setiap kueri akan mengembalikan semua catatan yang dihasilkan pada hari terakhir, dan dapat disesuaikan untuk melihat periode waktu yang berbeda.

Pengecualian dicatat selama eksekusi kueri streaming

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Akumulasi data tarif dan perjalanan yang salah format

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Eksekusi pekerjaan dari waktu ke waktu

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Untuk informasi selengkapnya, lihat Memantau Azure Databricks.

DevOps

  • Buat grup sumber daya terpisah untuk lingkungan produksi, pengembangan, dan pengujian. Grup sumber daya yang terpisah akan mempermudah pengelolaan penyebaran, penghapusan penyebaran pengujian, dan penetapan hak akses.

  • Gunakan template Azure Resource Manager untuk menyebarkan sumber daya Azure dengan mengikuti Proses Infrastruktur sebagai Kode (IaC). Dengan template, melakukan otomatisasi penyebaran menggunakan Azure DevOps, atau solusi CI/CD lainnya menjadi lebih mudah.

  • Tempatkan setiap beban kerja dalam template penyebaran terpisah dan simpan sumber daya dalam sistem kontrol sumber. Anda dapat menerapkan template secara bersama-sama atau sendiri-sendiri sebagai bagian dari proses CI/CD, sehingga proses otomatisasi menjadi lebih mudah.

    Dalam arsitektur ini, Azure Event Hubs, Log Analytics, dan Azure Cosmos DB diidentifikasi sebagai satu beban kerja. Sumber daya ini disertakan dalam satu templat ARM.

  • Pertimbangkan untuk melakukan pentahapan beban kerja. Sebarkan ke berbagai tahap dan jalankan pemeriksaan validasi di setiap tahap sebelum melanjutkan ke tahap berikutnya. Dengan begitu, Anda dapat menerapkan pembaruan ke lingkungan produksi dengan cara yang sangat terkontrol dan meminimalkan masalah penyebaran yang tidak terduga.

    Dalam arsitektur ini ada beberapa tahap penyebaran. Pertimbangkan untuk membuat Azure DevOps Pipeline dan menambahkan tahapan tersebut. Berikut adalah beberapa contoh tahapan yang dapat Anda automasi:

    • Memulai Kluster Databricks
    • Mengonfigurasi Databricks CLI
    • Memasang Alat Scala
    • Menambahkan rahasia Databricks

    Selain itu, pertimbangkan untuk menulis pengujian integrasi otomatis untuk meningkatkan kualitas dan keandalan kode Databricks dan siklus hidupnya.

  • Pertimbangkan untuk menggunakan Azure Monitor untuk menganalisis performa alur pemrosesan aliran Anda. Untuk informasi selengkapnya, lihat Memantau Azure Databricks.

Untuk informasi selengkapnya, lihat bagian DevOps di Microsoft Azure Well-Architected Framework.

Pengoptimalan biaya

Optimalisasi biaya adalah tentang mencari cara untuk mengurangi pengeluaran yang tidak perlu dan meningkatkan efisiensi operasional. Untuk informasi selengkapnya, lihat Gambaran umum pilar pengoptimalan biaya.

Gunakan kalkulator harga Azure untuk memperkirakan biaya. Berikut adalah beberapa pertimbangan untuk layanan yang digunakan dalam arsitektur referensi ini.

Pusat Aktivitas

Arsitektur referensi ini menyebarkan Azure Event Hubs di tingkat Standar. Model penetapan harga didasarkan pada unit throughput, kejadian masuk, dan kejadian penangkapan. Peristiwa ingress adalah unit data berukuran 64 KB atau kurang. Pesan dengan ukuran yang lebih besar ditagih dalam kelipatan 64 KB. Anda menentukan unit throughput baik melalui API manajemen portal Azure atau Azure Event Hubs.

Jika Anda membutuhkan lebih banyak hari retensi, pertimbangkan tingkat Khusus. Tingkat ini menawarkan penyebaran penyewa tunggal dengan kebutuhan yang paling menuntut. Penawaran ini membangun kluster berdasarkan unit kapasitas (CU) yang tidak terikat oleh unit throughput.

Tingkat Standar juga ditagih berdasarkan kejadian masuk dan unit throughput.

Untuk informasi tentang harga Azure Event Hubs, lihat Harga Azure Event Hubs.

Azure Databricks

Azure Databricks menawarkan dua tingkat Standar dan Premium, yang masing-masing mendukung tiga beban kerja. Arsitektur referensi ini menyebarkan ruang kerja Azure Databricks di tingkat Premium.

Beban kerja Data Engineering dan Data Engineering Light ditujukan bagi para insinyur data untuk membangun dan mengeksekusi pekerjaan. Beban kerja Analisis Data ditujukan bagi ilmuwan data untuk menjelajahi, memvisualisasikan, memanipulasi, serta berbagi data dan insight secara interaktif.

Azure Databricks menawarkan banyak model penetapan harga.

  • Paket tagihan prabayar

    Anda ditagih untuk mesin virtual (VM) yang tersedia dalam kluster dan Databricks Units (DBU) berdasarkan instans VM yang dipilih. DBU adalah unit kemampuan pemrosesan, yang ditagih berdasarkan penggunaan per detik. Konsumsi DBU bergantung pada ukuran dan jenis instans yang menjalankan Azure Databricks. Harga akan tergantung pada beban kerja dan tingkat yang dipilih.

  • Paket pra-pembelian

    Anda berkomitmen pada Azure Databricks Units (DBU) sebagai Databricks Commit Units (DBCU) selama satu atau tiga tahun. Jika dibandingkan dengan model PAYG, Anda dapat menghemat hingga 37%.

Untuk informasi selengkapnya, lihat Harga Azure Databricks.

Azure Cosmos DB

Dalam arsitektur ini, serangkaian rekaman ditulis ke Azure Cosmos DB oleh pekerjaan Azure Databricks. Anda dikenakan biaya untuk kapasitas yang Anda pesan, dinyatakan dalam RU per detik (RU/d), yang digunakan untuk melakukan operasi penyisipan. Unit untuk penagihan adalah 100 RU/detik per jam. Misalnya, biaya menulis item berukuran 100 KB adalah 50 RU/d.

Untuk operasi penulisan, sediakan kapasitas yang cukup untuk mendukung jumlah penulisan yang dibutuhkan per detik. Anda dapat meningkatkan throughput yang disediakan dengan menggunakan portal atau Azure CLI sebelum melakukan operasi penulisan dan kemudian mengurangi throughput setelah operasi tersebut selesai. Throughput Anda untuk periode penulisan adalah throughput minimum yang diperlukan untuk data tertentu ditambah throughput yang diperlukan untuk operasi penyisipan dengan asumsi tidak ada beban kerja lain yang berjalan.

Contoh analisis biaya

Misalkan Anda mengonfigurasi nilai throughput 1.000 RU/detik pada sebuah kontainer. Ini disebarkan selama 24 jam selama 30 hari, dengan total 720 jam.

Kontainer ditagih 10 unit 100 RU/detik per jam untuk setiap jam. 10 unit seharga $0,008 (per 100 RU/detik per jam) dikenai biaya $0,08 per jam.

Untuk 720 jam atau 7.200 unit (dari 100 RU), Anda akan ditagih $57,60 untuk bulan tersebut.

Penyimpanan juga ditagih, untuk setiap GB yang digunakan untuk data dan indeks yang Anda simpan. Untuk informasi selengkapnya, lihat Model harga Azure Cosmos DB.

Gunakan kalkulator kapasitas Azure Cosmos DB untuk mendapatkan perkiraan cepat biaya beban kerja.

Untuk informasi selengkapnya, lihat bagian biaya di Microsoft Azure Well-Architected Framework.

Menyebarkan skenario ini

Untuk menyebarkan dan menjalankan penerapan referensi, ikuti langkah-langkah dalam pembacaan GitHub.

Langkah berikutnya