Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato referenční architektura ukazuje kompletní kanál zpracování datových proudů. Mezi čtyři fáze tohoto potrubí patří ingestování, zpracování, ukládání a analýza a reportování. V této referenční architektuře kanál ingestuje data ze dvou zdrojů, provádí spojení souvisejících záznamů z každého datového proudu, rozšiřuje výsledek a vypočítá průměr v reálném čase. Výsledky se pak uloží pro další analýzu.
Architektura
Stáhněte si soubor Visio této architektury.
Tok dat
Následující tok dat odpovídá předchozímu diagramu:
Ingest
Dva datové proudy provozu v reálném čase dodávají systému data o jízdném a data o cestách. Zařízení nainstalovaná v taxislužbách slouží jako zdroje dat a publikují události do služby Azure Event Hubs. Každý datový proud přejde do vlastní instance centra událostí, která poskytuje nezávislé cesty příjmu dat.
Proces
Azure Databricks využívá streamy služby Event Hubs a spouští následující operace:
- Koreluje záznamy jízdného se záznamy o jízdách.
- Rozšiřuje data pomocí třetí datové sady, která obsahuje vyhledávací data sousedství uložená v systému souborů Azure Databricks.
Tento proces vytvoří jednotnou a rozšířenou datovou sadu, která je vhodná pro podřízené analýzy a úložiště.
Obchod
Výstup úloh Azure Databricks je řada záznamů. Zpracované záznamy se zapisují do služby Azure Cosmos DB for NoSQL.
Analýza/sestava
Prostředky infrastruktury zrcadlí provozní data ze služby Azure Cosmos DB for NoSQL a umožňují analytické dotazy bez ovlivnění transakčního výkonu. Tento přístup poskytuje cestu bez ETL pro analýzy. V této architektuře můžete použít zrcadlení pro následující účely:
- Zrcadlení dat Azure Cosmos DB (nebo Delta‑formátovaných dat) do platformy Fabric.
- Zachování synchronizace datových sad s operačním systémem
- Povolte analýzu pomocí následujících nástrojů:
- Koncové body analýzy SQL Fabric pro jezera a sklady
- Poznámkové bloky Apache Spark
- Analýzy v reálném čase pomocí dotazovacího jazyka Kusto (KQL) pro zkoumání časových řad a stylů protokolů
Monitor
Azure Monitor shromažďuje telemetrii z kanálu zpracování Azure Databricks. Pracovní prostor služby Log Analytics ukládá protokoly a metriky aplikací. Můžete provést následující akce:
- Dotazování provozních protokolů
- Vizualizace metrik
- Kontrola selhání, anomálií a problémů s výkonem
- Vytváření řídicích panelů
Components
Azure Databricks je analytická platforma založená na Sparku optimalizovaná pro platformu Azure. V této architektuře úlohy Azure Databricks vylepšují data jízdy taxíkem a jízdného a ukládají výsledky ve službě Azure Cosmos DB.
Event Hubs je spravovaná distribuovaná služba pro příjem dat, která může škálovat na ingestování velkých objemů událostí. Tato architektura používá dvě instance centra událostí k příjmu dat z taxislužby.
Azure Cosmos DB for NoSQL je spravovaná databázová služba s více modely. V této architektuře ukládá výstup úloh rozšiřování Azure Databricks. Fabric zrcadlí provozní data služby Azure Cosmos DB, což umožňuje vykonávat analytické dotazy.
Log Analytics je nástroj ve službě Azure Monitor, který pomáhá dotazovat a analyzovat data protokolů z různých zdrojů. V této architektuře nakonfigurují všechny prostředky Azure Diagnostics tak, aby ukládaly protokoly platformy v tomto pracovním prostoru. Pracovní prostor slouží také jako jímka dat pro metriky úloh Sparku generované z kanálů zpracování Azure Databricks.
Podrobnosti scénáře
Taxislužba shromažďuje data o každé jízdě taxíkem. V tomto scénáři předpokládáme, že dvě samostatná zařízení odesílají data. Taxi má měřič, který odesílá informace o každé jízdě, včetně doby trvání, vzdálenosti a vyzvednutí a odkládacích míst. Samostatné zařízení přijímá platby od zákazníků a odesílá data o jízdné. Pokud chce taxislužba zjistit trendy ridershipu, chce vypočítat průměrný tip na míle řízený pro každou čtvrť v reálném čase.
Příjem dat
K simulaci zdroje dat tato referenční architektura používá datovou sadu dat taxislužby v New Yorku. Tato datová sada obsahuje data o jízdách taxíkem v New Yorku od roku 2010 do roku 2013. Obsahuje záznamy dat o jízdě i jízdě. Data o jízdě zahrnují dobu jízdy, vzdálenost jízdy a vyzvednutí a odkládací místa. Údaje o jízdné zahrnují ceny jízdné, daně a tipové částky. Pole v obou typech záznamů zahrnují číslo medailiónu, licenci hacku a ID dodavatele. Kombinace těchto tří polí jednoznačně identifikuje taxi a řidiče. Data se ukládají ve formátu CSV.
Generátor dat je aplikace .NET Core, která čte záznamy a odesílá je do služby Event Hubs. Generátor odesílá data jízdy ve formátu JSON a data jízdného ve formátu CSV.
Služba Event Hubs používá k segmentování dat oddíly . Oddíly umožňují spotřebiteli číst každý přečtený datový záznam paralelně. Když odesíláte data do služby Event Hubs, můžete klíč oddílu zadat přímo. V opačném případě se záznamy přiřazují k oddílům způsobem kruhového dotazování.
V tomto scénáři by data jízdy a jízdné měly být přiřazeny stejnému ID oddílu pro konkrétní taxislužba. Toto přiřazení umožňuje Databricks použít stupeň paralelismu, když koreluje tyto dva datové proudy. Například záznam v oddílu n dat jízdy odpovídá záznamu v oddílu n dat jízdy.
Stáhněte si soubor Visia této architektury.
V generátoru dat má společný datový model pro oba typy PartitionKey záznamů vlastnost, která je zřetězením Medallion, HackLicensea VendorId.
public abstract class TaxiData
{
public TaxiData()
{
}
[JsonProperty]
public long Medallion { get; set; }
[JsonProperty]
public long HackLicense { get; set; }
[JsonProperty]
public string VendorId { get; set; }
[JsonProperty]
public DateTimeOffset PickupTime { get; set; }
[JsonIgnore]
public string PartitionKey
{
get => $"{Medallion}_{HackLicense}_{VendorId}";
}
Tato vlastnost poskytuje explicitní klíč oddílu při odesílání dat do služby Event Hubs.
using (var client = pool.GetObject())
{
return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
t.GetData(dataFormat))), t.PartitionKey);
}
Centra událostí
Kapacita propustnosti služby Event Hubs se měří v jednotkách propustnosti. Centrum událostí můžete automaticky škálovat povolením automatického nafouknutí. Tato funkce automaticky škáluje jednotky propustnosti na základě provozu až na nakonfigurované maximum.
Zpracování datových proudů
V Azure Databricks provádí úloha zpracování dat. Úloha se přiřadí ke clusteru a pak na něm běží. Úloha může být vlastní kód napsaný v Javě nebo poznámkový blok Spark .
V této referenční architektuře je úloha archiv Jazyka Java, který obsahuje třídy napsané v Javě a Scala. Když zadáte archiv Java pro úlohu Azure Databricks, cluster Azure Databricks určuje třídu pro operaci.
main Zde metoda com.microsoft.pnp.TaxiCabReader třídy obsahuje logiku zpracování dat.
Čtení datového proudu ze dvou instancí centra událostí
Logika zpracování dat používá strukturované streamování Sparku ke čtení ze dvou instancí centra událostí Azure:
// Create a token credential using Managed Identity
val credential = new DefaultAzureCredentialBuilder().build()
val rideEventHubOptions = EventHubsConf(rideEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiRideConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val rideEvents = spark.readStream
.format("eventhubs")
.options(rideEventHubOptions.toMap)
.load
val fareEventHubOptions = EventHubsConf(fareEventHubEntraIdAuthConnectionString)
.setTokenProvider(EventHubsUtils.buildTokenProvider(..., credential))
.setConsumerGroup(conf.taxiFareConsumerGroup())
.setStartingPosition(EventPosition.fromStartOfStream)
val fareEvents = spark.readStream
.format("eventhubs")
.options(fareEventHubOptions.toMap)
.load
Obohacení dat informacemi o sousedství
Data o jízdě zahrnují souřadnice zeměpisné šířky a délky vyzvednutí a odkládacích míst. Tyto souřadnice jsou užitečné, ale nejsou snadno využité k analýze. Kanál tedy tato data rozšiřuje o data sousedství načtená ze souboru shapefile.
Formát souboru shapefile je binární a není snadno parsován. Knihovna GeoTools ale poskytuje nástroje pro geoprostorová data, která používají formát souboru shapefile. Tato knihovna se používá v com.microsoft.pnp.GeoFinder třídě k určení názvu sousedství na základě souřadnic pro vyzvednutí a odkládací umístění.
val neighborhoodFinder = (lon: Double, lat: Double) => {
NeighborhoodFinder.getNeighborhood(lon, lat).get()
}
Připojte se k jízdě a datům o jízdě a jízdě
Nejprve se transformují data jízdy a jízdy:
val rides = transformedRides
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedRides.add(1)
false
}
})
.select(
$"ride.*",
to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
.as("pickupNeighborhood"),
to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
.as("dropoffNeighborhood")
)
.withWatermark("pickupTime", conf.taxiRideWatermarkInterval())
val fares = transformedFares
.filter(r => {
if (r.isNullAt(r.fieldIndex("errorMessage"))) {
true
}
else {
malformedFares.add(1)
false
}
})
.select(
$"fare.*",
$"pickupTime"
)
.withWatermark("pickupTime", conf.taxiFareWatermarkInterval())
Data jízdy se pak spojí s daty jízdného:
val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))
Zpracování dat a jejich vložení do služby Azure Cosmos DB
Průměrná částka jízdného pro každou čtvrť se vypočítá pro konkrétní časový interval:
val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
.groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
.agg(
count("*").as("rideCount"),
sum($"fareAmount").as("totalFareAmount"),
sum($"tipAmount").as("totalTipAmount"),
(sum($"fareAmount")/count("*")).as("averageFareAmount"),
(sum($"tipAmount")/count("*")).as("averageTipAmount")
)
.select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")
Průměrná částka jízdy se pak vloží do služby Azure Cosmos DB:
maxAvgFarePerNeighborhood
.writeStream
.format("cosmos.oltp")
.option("spark.cosmos.accountEndpoint", "<your-cosmos-endpoint>")
.option("spark.cosmos.accountKey", "<your-cosmos-key>")
.option("spark.cosmos.database", "<your-database-name>")
.option("spark.cosmos.container", "<your-container-name>")
.option("checkpointLocation", "/mnt/checkpoints/maxAvgFarePerNeighborhood")
.outputMode("append")
.start()
.awaitTermination()
Důležité informace
Tyto aspekty implementují pilíře dobře architektuře Azure, což je sada hlavních principů, které můžete použít ke zlepšení kvality úlohy. Další informace najdete v tématu Well-Architected Framework.
Zabezpečení
Zabezpečení poskytuje záruky proti záměrným útokům a zneužití cenných dat a systémů. Další informace naleznete v tématu Kontrolní seznam pro kontrolu návrhu prozabezpečení .
Přístup k pracovnímu prostoru Azure Databricks se řídí pomocí konzoly správce . Konzola správce obsahuje funkce pro přidání uživatelů, správu uživatelských oprávnění a nastavení jednotného přihlašování. Řízení přístupu pro pracovní prostory, clustery, úlohy a tabulky je také možné nastavit prostřednictvím konzoly správce.
Správa tajných kódů
Azure Databricks obsahuje úložiště tajných kódů, které slouží k ukládání přihlašovacích údajů a odkazování na ně v poznámkových blocích a úlohách. Rozsahy tajných kódů oddílů v úložišti tajných kódů Azure Databricks:
databricks secrets create-scope --scope "azure-databricks-job"
Tajné kódy se přidají na úrovni oboru:
databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"
Poznámka:
Místo nativního oboru Azure Databricks použijte rozsah založený na službě Azure Key Vault.
Kód přistupuje k tajným kódům prostřednictvím nástrojů pro tajné kódy Azure Databricks.
Optimalizace nákladů
Optimalizace nákladů se zaměřuje na způsoby, jak snížit zbytečné výdaje a zlepšit efektivitu provozu. Další informace naleznete v tématu Kontrolní seznam pro kontrolu návrhu proOptimalizace nákladů .
K odhadu nákladů použijte cenovou kalkulačku Azure. Vezměte v úvahu následující služby používané v této referenční architektuře.
Důležité informace o nákladech služby Event Hubs
Tato referenční architektura nasadí službu Event Hubs na úrovni Standard. Cenový model je založený na jednotkách propustnosti, událostech příchozího přenosu dat a zachytávání událostí. Událost příchozího přenosu dat je jednotka dat, která jsou 64 kB nebo menší. Větší zprávy se účtují v násobcích po 64 kB. Jednotky propustnosti zadáte prostřednictvím webu Azure Portal nebo rozhraní API pro správu služby Event Hubs.
Pokud potřebujete více dnů uchovávání, zvažte úroveň Dedicated. Tato úroveň poskytuje nasazení s jedním tenantem, která mají přísné požadavky. Tato nabídka vytvoří cluster založený na jednotkách kapacity a není závislý na jednotkách propustnosti. Úroveň Standard se také účtuje na základě událostí příchozího přenosu dat a jednotek propustnosti.
Další informace najdete v tématu ceny služby Event Hubs.
Důležité informace o nákladech na Azure Databricks
Azure Databricks poskytuje úroveň Standard a úroveň Premium, z nichž obě podporují tři úlohy. Tato referenční architektura nasadí pracovní prostor Azure Databricks na úrovni Premium.
Úlohy přípravy dat by se měly spouštět v clusteru úloh. Datoví inženýři používají clustery k vytváření a provádění úloh. Úlohy analýzy dat by se měly spouštět v clusteru pro všechny účely a jsou určené pro datové vědce, aby mohli interaktivně zkoumat, vizualizovat, manipulovat s nimi a sdílet data a přehledy.
Azure Databricks poskytuje více cenových modelů.
plán průběžných plateb
Účtují se vám virtuální počítače zřízené v clusterech a jednotkách Azure Databricks (DBU) na základě zvolené instance virtuálního počítače. DBU je jednotka zpracování, kterou Azure účtuje podle využití za sekundu. Spotřeba DBU závisí na velikosti a typu instance, která běží v Azure Databricks. Ceny závisí na zvolené úloze a úrovni.
předkupní plán
Do jednotek potvrzení Azure Databricks se zavazujete po dobu jednoho nebo tří let, abyste snížili celkové náklady na vlastnictví v daném časovém období v porovnání s modelem průběžných plateb.
Další informace najdete v tématu cenovýchAzure Databricks .
Důležité informace o nákladech služby Azure Cosmos DB
V této architektuře úloha Azure Databricks zapisuje do služby Azure Cosmos DB řadu záznamů. Účtuje se vám kapacita, kterou si rezervujete, která se měří v jednotkách žádostí za sekundu (RU/s). Tato kapacita se používá k provádění operací vložení. Jednotka pro fakturaci je 100 RU/s za hodinu. Například náklady na zápis 100 kB položek jsou 50 RU/s.
Pro operace zápisu nastavte dostatečnou kapacitu pro podporu počtu zápisů potřebných za sekundu. Zřízenou propustnost můžete zvýšit pomocí portálu nebo Azure CLI před provedením operací zápisu a následným snížením propustnosti po dokončení těchto operací. Propustnost pro období zápisu je součet minimální propustnosti potřebné pro konkrétní data a propustnost potřebnou pro operaci vložení. Tento výpočet předpokládá, že není spuštěná žádná jiná úloha.
Příklad analýzy nákladů
Předpokládejme, že v kontejneru nakonfigurujete hodnotu propustnosti 1 000 RU/s a spustíte ji nepřetržitě po dobu 30 dnů, což se rovná 720 hodinám.
Kontejner se účtuje za 10 jednotek 100 RU/s za hodinu za každou hodinu. Deset jednotek v hodnotě 0,008 USD (za 100 RU/s za hodinu) se účtují v hodnotě 0,08 USD za hodinu.
Za 720 hodin nebo 7 200 jednotek (100 RU) se vám účtuje 57,60 USD za měsíc.
Úložiště se také účtuje za každou GB, která se používá pro uložená data a index. Další informace najdete v cenovém modelu služby Azure Cosmos DB.
K rychlému odhadu nákladů na úlohy použijte kalkulačku kapacity služby Azure Cosmos DB.
Efektivita provozu
Efektivita provozu se zabývá provozními procesy, které nasazují aplikaci a udržují ji spuštěnou v produkčním prostředí. Další informace naleznete v tématu kontrolní seznam pro kontrolu efektivity provozu.
Sledování
Azure Databricks je založen na Apache Sparku. Azure Databricks i Apache Spark používají Apache Log4j jako standardní knihovnu pro protokolování. Kromě výchozího protokolování, které Poskytuje Apache Spark, můžete implementovat protokolování v Log Analytics. Další informace najdete v tématu Monitorování Azure Databricks.
Protože třída com.microsoft.pnp.TaxiCabReader zpracovává zprávy jízdy a jízdy, může být zpráva poškozena, a proto není platná. V produkčním prostředí je důležité analyzovat tyto poškozené zprávy a identifikovat problém se zdroji dat, aby bylo možné je rychle opravit, aby se zabránilo ztrátě dat. Třída com.microsoft.pnp.TaxiCabReader zaregistruje akumulátor Apache Sparku, který sleduje počet poškozených záznamů jízdy a záznamů jízdy:
@transient val appMetrics = new AppMetrics(spark.sparkContext)
appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
SparkEnv.get.metricsSystem.registerSource(appMetrics)
Apache Spark používá k odesílání metrik knihovnu Dropwizard. Některá nativní pole metrik Dropwizard nejsou kompatibilní s Log Analytics, což je důvod, proč tato referenční architektura zahrnuje vlastní jímku Dropwizard a reporter. Formátuje metriky ve formátu, který Log Analytics očekává. Když Apache Spark hlásí metriky, posílají se také vlastní metriky pro špatně zformulovaná data jízdy a jízdného.
Pomocí následujících ukázkových dotazů v pracovním prostoru služby Log Analytics můžete monitorovat provoz úlohy streamování. Argument ago(1d) v každém dotazu vrátí všechny záznamy, které byly vygenerovány za poslední den. Tento parametr můžete upravit, aby se zobrazilo jiné časové období.
Výjimky zaprotokolované během operace dotazu streamu
SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"
Akumulace poškozených jízdných dat a dat o jízdě
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Operace úlohy v průběhu času
SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart
Organizace prostředků a nasazení
Vytvořte samostatné skupiny prostředků pro produkční, vývojové a testovací prostředí. Samostatné skupiny prostředků usnadňují správu nasazení, odstraňování testovacích nasazení a přiřazování přístupových práv.
Pomocí šablony Azure Resource Manageru nasaďte prostředky Azure podle procesu infrastruktury jako kódu. Pomocí šablon můžete automatizovat nasazení pomocí služeb Azure DevOps nebo jiných řešení kontinuální integrace a průběžného doručování (CI/CD).
Každou úlohu umístěte do samostatné šablony nasazení a uložte prostředky do systémů správy zdrojového kódu. Šablony můžete nasadit společně nebo jednotlivě jako součást procesu CI/CD. Tento přístup zjednodušuje proces automatizace.
V této architektuře jsou služba Event Hubs, Log Analytics a Azure Cosmos DB identifikovány jako jedna úloha. Tyto prostředky jsou součástí jedné šablony Azure Resource Manageru.
Zvažte přípravu úloh. Nasaďte je do různých fází a před přechodem do další fáze spusťte kontroly ověřování v každé fázi. Díky tomu můžete řídit způsob nabízení aktualizací do produkčních prostředí a minimalizovat neočekávané problémy s nasazením.
V této architektuře existuje několik fází nasazení. Zvažte vytvoření kanálu Azure DevOps a přidání těchto fází. Můžete automatizovat následující fáze:
- Spusťte cluster Azure Databricks.
- Nakonfigurujte Azure Databricks CLI.
- Nainstalujte nástroje Scala.
- Přidejte tajné kódy Azure Databricks.
Zvažte psaní automatizovaných integračních testů za účelem zlepšení kvality a spolehlivosti kódu Azure Databricks a jeho životního cyklu.