Pemrosesan aliran dengan Azure Databricks
Arsitektur referensi ini menunjukkan alur pemrosesan aliran end-to-end. Empat tahap alur ini adalah menyerap, memproses, menyimpan, dan menganalisis dan melaporkan. Untuk arsitektur referensi ini, alur menyerap data dari dua sumber, melakukan penggabungan pada catatan terkait dari setiap aliran, memperkaya hasil, dan menghitung rata-rata secara real time. Hasilnya kemudian disimpan untuk analisis lebih lanjut.
Sistem
Unduh file Visio dari arsitektur ini.
Alur kerja
Aliran data berikut sesuai dengan diagram sebelumnya:
Dalam arsitektur ini, ada dua sumber data yang menghasilkan aliran secara real time. Aliran pertama berisi informasi perjalanan, dan aliran kedua berisi informasi tarif. Arsitektur referensi mencakup generator data simulasi yang dibaca dari sekumpulan file statis dan mendorong data ke Azure Event Hubs. Sumber data dalam aplikasi nyata adalah perangkat yang dipasang di taksi.
azure Event Hubs adalah layanan penyerapan peristiwa. Arsitektur ini menggunakan dua instans event hub, satu untuk setiap sumber data. Setiap sumber data mengirimkan aliran data ke hub peristiwa terkait.
Azure Databricks adalah platform analitik berbasis Apache Spark yang dioptimalkan untuk platform layanan cloud Microsoft Azure. Azure Databricks digunakan untuk menghubungkan data perjalanan dan tarif taksi dan untuk memperkaya data yang berkorelasi dengan data lingkungan yang disimpan dalam sistem file Azure Databricks.
azure Cosmos DB adalah layanan database multimodel yang dikelola sepenuhnya. Output dari pekerjaan Azure Databricks adalah serangkaian rekaman, yang ditulis ke Azure Cosmos DB untuk Apache Cassandra. Azure Cosmos DB for Apache Cassandra digunakan karena mendukung pemodelan data rangkaian waktu.
Azure Synapse Link for Azure Cosmos DB memungkinkan Anda menjalankan analitik mendekati real-time pada data operasional di Azure Cosmos DB, tanpa efek performa atau biaya pada beban kerja transaksional Anda. Anda dapat mencapai hasil ini dengan menggunakan kumpulan SQL tanpa server dan kumpulan Spark . Mesin analitik ini tersedia dari ruang kerja Azure Synapse Analytics Anda.
Mencerminkan Azure Cosmos DB for NoSQL di Microsoft Fabric memungkinkan Anda mengintegrasikan data Azure Cosmos DB dengan data Lainnya di Microsoft Fabric.
Log Analytics adalah alat dalam Azure Monitor yang memungkinkan Anda mengkueri dan menganalisis data log dari berbagai sumber. Data log aplikasi yang dikumpulkan azure Monitor disimpan di ruang kerja analitik log . Anda dapat menggunakan kueri Analitik Log untuk menganalisis dan memvisualisasikan metrik dan memeriksa pesan log untuk mengidentifikasi masalah dalam aplikasi.
Detail skenario
Perusahaan taksi mengumpulkan data tentang setiap perjalanan taksi. Untuk skenario ini, kami berasumsi bahwa dua perangkat terpisah mengirim data. Taksi memiliki meteran yang mengirimkan informasi tentang setiap perjalanan, termasuk durasi, jarak, dan lokasi penjemputan dan pengantaran. Perangkat yang terpisah menerima pembayaran dari pelanggan dan mengirimkan data tentang tarif. Untuk melihat tren pengendara, perusahaan taksi ingin menghitung tip rata-rata per mil yang didorong untuk setiap lingkungan, secara real time.
Penyerapan data
Untuk mensimulasikan sumber data, arsitektur referensi ini menggunakan himpunan data taksi New York City1. Himpunan data ini berisi data tentang perjalanan taksi di New York City dari 2010 hingga 2013. Ini berisi catatan data perjalanan dan tarif. Data perjalanan mencakup durasi perjalanan, jarak perjalanan, dan lokasi penjemputan dan pengantaran. Data tarif mencakup tarif, pajak, dan jumlah tip. Bidang di kedua jenis rekaman mencakup nomor medali, lisensi hack, dan ID vendor. Kombinasi ketiga bidang ini secara unik mengidentifikasi taksi dan pengemudi. Data disimpan dalam format CSV.
[1] Donovan, Brian; Work, Dan (2016): Data Perjalanan Taksi Kota New York (2010-2013). Universitas Illinois di Urbana-Champaign. https://doi.org/10.13012/J8PN93H8
Generator data adalah aplikasi .NET Core yang membaca rekaman dan mengirimkannya ke Azure Event Hubs. Generator tersebut mengirimkan data perjalanan dalam format JSON dan data tarif dalam format CSV.
Azure Event Hubs menggunakan partisi untuk melakukan segmentasi data. Partisi memungkinkan konsumen membaca setiap partisi secara paralel. Saat mengirim data ke Azure Event Hubs, Anda dapat menentukan kunci partisi secara langsung. Jika tidak, catatan ditetapkan ke partisi secara round-robin.
Dalam skenario ini, data perjalanan dan data tarif harus diberi ID partisi yang sama untuk taksi tertentu. Penugasan ini memungkinkan Databricks untuk menerapkan tingkat paralelisme ketika berkorelasi dengan dua aliran. Misalnya, rekaman dalam partisi n data perjalanan cocok dengan rekaman di partisi n data tarif.
Unduh file Visio arsitektur ini.
Di generator data, model data umum untuk kedua jenis catatan memiliki properti PartitionKey yang merupakan perangkaian dari Medallion, HackLicense, dan VendorId.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Properti ini digunakan untuk menyediakan kunci partisi eksplisit saat mengirim data ke Azure Event Hubs.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Pusat Aktivitas
Kapasitas throughput Azure Event Hubs diukur dalam unit throughput. Anda dapat menskalakan otomatis pusat aktivitas dengan mengaktifkan inflate otomatis . Fitur ini secara otomatis menskalakan unit throughput berdasarkan lalu lintas, hingga maksimum yang dikonfigurasi.
Pemrosesan aliran
Di Azure Databricks, pekerjaan melakukan pemrosesan data. Pekerjaan ditetapkan ke kluster dan kemudian berjalan di atasnya. Pekerjaan dapat berupa kode kustom yang ditulis dalam Java atau buku catatan Spark .
Dalam arsitektur referensi ini, pekerjaan adalah arsip Java yang memiliki kelas yang ditulis dalam Java dan Scala. Saat Anda menentukan arsip Java untuk pekerjaan Databricks, kluster Databricks menentukan kelas untuk operasi. Di sini, main metode com.microsoft.pnp.TaxiCabReader kelas berisi logika pemrosesan data.
Membaca aliran dari dua instans pusat aktivitas
Logika pemrosesan data menggunakan aliran terstruktur Spark untuk membaca dari dua instans Azure event hub:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Memperkaya data dengan informasi lingkungan
Data perjalanan mencakup koordinat garis lintang dan bujur lokasi penjemputan dan pengantaran. Koordinat ini berguna tetapi tidak mudah digunakan untuk analisis. Oleh karena itu, data ini diperkaya dengan data lingkungan yang dibaca dari shapefile .
Format shapefile adalah biner dan tidak mudah diurai. Tetapi pustaka GeoTools menyediakan alat untuk data geospasial yang menggunakan format shapefile. Pustaka ini digunakan di kelas com.microsoft.pnp.GeoFinder untuk menentukan nama lingkungan berdasarkan koordinat untuk lokasi penjemputan dan pengantaran.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Menggabungkan data perjalanan dan tarif
Pertama, data perjalanan dan tarif diubah:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Kemudian data perjalanan digabungkan dengan data tarif:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Memproses data dan menyisipkannya ke Azure Cosmos DB
Jumlah tarif rata-rata untuk setiap lingkungan dihitung untuk interval waktu tertentu:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Jumlah tarif rata-rata kemudian dimasukkan ke dalam Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.queryName("maxAvgFarePerNeighborhood_cassandra_insert")
.outputMode(OutputMode.Append())
.foreach(new CassandraSinkForeach(connector))
.start()
.awaitTermination()
Pertimbangan
Pertimbangan ini mengimplementasikan pilar Azure Well-Architected Framework, yang merupakan serangkaian tenet panduan yang dapat Anda gunakan untuk meningkatkan kualitas beban kerja. Untuk informasi selengkapnya, lihat Microsoft Azure Well-Architected Framework.
Keamanan
Keamanan memberikan jaminan terhadap serangan yang disukai dan penyalahgunaan data dan sistem berharga Anda. Untuk informasi selengkapnya, lihat daftar periksa tinjauan desain untuk Keamanan.
Akses ke ruang kerja Azure Databricks dikontrol dengan menggunakan konsol administrator . Konsol administrator menyertakan fungsionalitas untuk menambahkan pengguna, mengelola izin pengguna, dan menyiapkan akses menyeluruh. Kontrol akses untuk ruang kerja, kluster, pekerjaan, dan tabel juga dapat diatur melalui konsol administrator.
Mengelola rahasia
Azure Databricks menyertakan penyimpanan rahasia yang digunakan untuk menyimpan kredensial dan mereferensikannya dalam buku catatan dan pekerjaan. Cakupan rahasia partisi dalam penyimpanan rahasia Azure Databricks:
databricks secrets create-scope --scope "azure-databricks-job"
Rahasia ditambahkan pada tingkat cakupan:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Catatan
Gunakan cakupan yang didukung Azure Key Vault alih-alih cakupan Azure Databricks asli.
Dalam kode, rahasia diakses melalui utilitas rahasia Azure Databricks.
Pengoptimalan Biaya
Pengoptimalan Biaya berfokus pada cara untuk mengurangi pengeluaran yang tidak perlu dan meningkatkan efisiensi operasional. Untuk informasi selengkapnya, lihat daftar periksa Design review untuk Pengoptimalan Biaya.
Gunakan kalkulator harga Azure untuk memperkirakan biaya. Pertimbangkan layanan berikut yang digunakan dalam arsitektur referensi ini.
Pertimbangan biaya Azure Event Hubs
Arsitektur referensi ini menyebarkan Azure Event Hubs di tingkat Standar. Model penetapan harga didasarkan pada unit throughput, kejadian masuk, dan kejadian penangkapan. Peristiwa ingress adalah unit data berukuran 64 KB atau kurang. Pesan dengan ukuran yang lebih besar ditagih dalam kelipatan 64 KB. Anda menentukan unit throughput baik melalui API manajemen portal Azure atau Azure Event Hubs.
Jika Anda memerlukan lebih banyak hari retensi, pertimbangkan tingkat Khusus. Tingkat ini menyediakan penyebaran penyewa tunggal yang memiliki persyaratan yang ketat. Penawaran ini membangun kluster yang didasarkan pada unit kapasitas dan tidak bergantung pada unit throughput. Tingkat Standar juga ditagih berdasarkan peristiwa masuk dan unit throughput.
Untuk informasi selengkapnya, lihat harga Azure Event Hubs.
Pertimbangan biaya Azure Databricks
Azure Databricks menyediakan tingkat Standar dan tingkat Premium, yang keduanya mendukung tiga beban kerja. Arsitektur referensi ini menyebarkan ruang kerja Azure Databricks di tingkat Premium.
Beban kerja rekayasa data harus berjalan pada kluster pekerjaan. Teknisi data menggunakan kluster untuk membangun dan melakukan pekerjaan. Beban kerja analitik data harus berjalan pada kluster serba guna dan ditujukan bagi ilmuwan data untuk menjelajahi, memvisualisasikan, memanipulasi, dan berbagi data dan wawasan secara interaktif.
Azure Databricks menyediakan beberapa model harga.
paket prabayar
Anda ditagih untuk komputer virtual (VM) yang disediakan dalam kluster dan unit Azure Databricks (DBA) berdasarkan instans VM yang dipilih. DBU adalah unit kemampuan pemrosesan yang ditagih berdasarkan penggunaan per detik. Konsumsi DBU tergantung pada ukuran dan jenis instans yang berjalan di Azure Databricks. Harga tergantung pada beban kerja dan tingkat yang dipilih.
paket pra-pembelian
Anda berkomitmen pada DBA sebagai unit penerapan Azure Databricks selama satu atau tiga tahun untuk mengurangi total biaya kepemilikan selama periode waktu tersebut jika dibandingkan dengan model bayar sesuai penggunaan.
Untuk informasi selengkapnya, lihat harga Azure Databricks.
Pertimbangan biaya Azure Cosmos DB
Dalam arsitektur ini, pekerjaan Azure Databricks menulis serangkaian rekaman ke Azure Cosmos DB. Anda dikenakan biaya untuk kapasitas yang Anda pesan, yang diukur dalam Unit Permintaan per detik (RU/s). Kapasitas ini digunakan untuk melakukan operasi penyisipan. Unit untuk penagihan adalah 100 RU/dtk per jam. Misalnya, biaya menulis item berukuran 100 KB adalah 50 RU/d.
Untuk operasi penulisan, sediakan kapasitas yang cukup untuk mendukung jumlah penulisan yang dibutuhkan per detik. Anda dapat meningkatkan throughput yang disediakan dengan menggunakan portal atau Azure CLI sebelum melakukan operasi tulis lalu mengurangi throughput setelah operasi tersebut selesai. Throughput Anda untuk periode tulis adalah jumlah throughput minimum yang diperlukan untuk data tertentu dan throughput yang diperlukan untuk operasi penyisipan. Perhitungan ini mengasumsikan bahwa tidak ada beban kerja lain yang berjalan.
Contoh analisis biaya
Misalkan Anda mengonfigurasi nilai throughput 1.000 RU/dtk pada kontainer. Ini disebarkan selama 24 jam selama 30 hari, selama total 720 jam.
Kontainer ditagih sebesar 10 unit 100 RU/dtk per jam untuk setiap jam. Sepuluh unit seharga $0,008 (per 100 RU/dtk per jam) dikenakan biaya sebesar $0,08 per jam.
Selama 720 jam atau 7.200 unit (dari 100 RU), Anda ditagih $57,60 untuk bulan tersebut.
Penyimpanan juga ditagih untuk setiap GB yang digunakan untuk data dan indeks tersimpan Anda. Untuk informasi selengkapnya, lihat Model harga Azure Cosmos DB.
Gunakan kalkulator kapasitas Azure Cosmos DB untuk perkiraan cepat biaya beban kerja.
Keunggulan Operasional
Keunggulan Operasional mencakup proses operasi yang menyebarkan aplikasi dan membuatnya tetap berjalan dalam produksi. Untuk informasi selengkapnya, lihat daftar periksa tinjauan desain untukKeunggulan Operasional.
Pemantauan
Azure Databricks didasarkan pada Apache Spark. Azure Databricks dan Apache Spark menggunakan Apache Log4j sebagai pustaka standar untuk pengelogan. Selain pengelogan default yang disediakan Apache Spark, Anda dapat menerapkan pengelogan di Log Analytics. Untuk informasi selengkapnya, lihat Memantau Azure Databricks.
Karena kelas com.microsoft.pnp.TaxiCabReader memproses pesan naik dan tarif, pesan mungkin salah bentuk dan karenanya tidak valid. Dalam lingkungan produksi, penting untuk menganalisis pesan cacat ini untuk mengidentifikasi masalah dengan sumber data sehingga dapat diperbaiki dengan cepat untuk mencegah kehilangan data. Kelas com.microsoft.pnp.TaxiCabReader mendaftarkan Apache Spark Accumulator yang melacak jumlah catatan tarif salah format dan catatan perjalanan:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark menggunakan pustaka Dropwizard untuk mengirim metrik. Beberapa bidang metrik Dropwizard asli tidak kompatibel dengan Analitik Log, itulah sebabnya arsitektur referensi ini mencakup sink dan reporter Dropwizard kustom. Ini memformat metrik dalam format yang diharapkan Analitik Log. Saat Apache Spark melaporkan metrik, metrik kustom untuk data perjalanan dan tarif dengan format yang salah juga dikirim.
Anda dapat menggunakan contoh kueri berikut di ruang kerja Analitik Log Anda untuk memantau pengoperasian pekerjaan streaming. Argumen ago(1d) di setiap kueri mengembalikan semua rekaman yang dihasilkan di hari terakhir. Anda dapat menyesuaikan parameter ini untuk melihat periode waktu yang berbeda.
Pengecualian yang dicatat selama operasi kueri aliran
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Akumulasi data tarif dan perjalanan yang salah format
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Operasi pekerjaan dari waktu ke waktu
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Organisasi dan penyebaran sumber daya
Buat grup sumber daya terpisah untuk lingkungan produksi, pengembangan, dan pengujian. Grup sumber daya yang terpisah akan mempermudah pengelolaan penyebaran, penghapusan penyebaran pengujian, dan penetapan hak akses.
Gunakan templat Azure Resource Manager untuk menyebarkan sumber daya Azure sesuai dengan proses infrastruktur sebagai kode. Dengan menggunakan templat, Anda dapat mengotomatiskan penyebaran dengan layanan Azure DevOps atau solusi integrasi berkelanjutan dan pengiriman berkelanjutan (CI/CD) lainnya.
Tempatkan setiap beban kerja dalam template penyebaran terpisah dan simpan sumber daya dalam sistem kontrol sumber. Anda dapat menyebarkan templat bersama-sama atau secara individual sebagai bagian dari proses CI / CD. Pendekatan ini menyederhanakan proses otomatisasi.
Dalam arsitektur ini, Azure Event Hubs, Log Analytics, dan Azure Cosmos DB diidentifikasi sebagai satu beban kerja. Sumber daya ini disertakan dalam satu templat Azure Resource Manager.
Pertimbangkan untuk melakukan pentahapan beban kerja. Sebarkan ke berbagai tahap dan jalankan pemeriksaan validasi di setiap tahap sebelum Anda pindah ke tahap berikutnya. Dengan begitu Anda dapat mengontrol bagaimana Anda mendorong pembaruan ke lingkungan produksi Anda dan meminimalkan masalah penyebaran yang tidak tertandingi.
Dalam arsitektur ini, ada beberapa tahap penyebaran. Pertimbangkan untuk membuat alur Azure DevOps dan menambahkan tahapan tersebut. Anda dapat mengotomatiskan tahapan berikut:
- Mulai kluster Databricks.
- Mengonfigurasi Databricks CLI.
- Instal alat Scala.
- Tambahkan rahasia Databricks.
Pertimbangkan untuk menulis pengujian integrasi otomatis untuk meningkatkan kualitas dan keandalan kode Databricks dan siklus hidupnya.