Strömbearbetning med Azure Stream Analytics

Cosmos DB
Event Hubs
Monitor
Stream Analytics

Den här referensarkitekturen visar en dataströmbearbetningspipeline från slutpunkt till slutpunkt. Pipelinen matar in data från två källor, korrelerar poster i de två strömmarna och beräknar ett rullande medelvärde över ett tidsfönster. Resultaten lagras för ytterligare analys.

GitHub-logotyp En referensimplementering för den här arkitekturen finns på GitHub.

Arkitektur

Diagram som visar referensarkitektur för att skapa en pipeline för dataströmbearbetning med Azure Stream Analytics.

Ladda ned en Visio-fil med den här arkitekturen.

Arbetsflöde

Arkitekturen består av följande komponenter:

Datakällor. I den här arkitekturen finns det två datakällor som genererar dataströmmar i realtid. Den första strömmen innehåller körinformation och den andra innehåller prisinformation. Referensarkitekturen innehåller en simulerad datagenerator som läser från en uppsättning statiska filer och skickar data till Event Hubs. I ett verkligt program skulle datakällorna vara enheter installerade i taxibilarna.

Azure Event Hubs. Event Hubs är en tjänst för händelseinmatning. Den här arkitekturen använder två event hub-instanser, en för varje datakälla. Varje datakälla skickar en dataström till den associerade händelsehubben.

Azure Stream Analytics. Stream Analytics är en motor för händelsebearbetning. Ett Stream Analytics-jobb läser dataströmmarna från de två händelsehubbarna och utför dataströmbearbetning.

Azure Cosmos DB. Utdata från Stream Analytics-jobbet är en serie poster som skrivs som JSON-dokument till en Azure Cosmos DB-dokumentdatabas.

Microsoft Power BI. Power BI är en uppsättning affärsanalysverktyg för att analysera data för affärsinsikter. I den här arkitekturen läser den in data från Azure Cosmos DB. På så sätt kan användare analysera den fullständiga uppsättningen historiska data som har samlats in. Du kan också strömma resultaten direkt från Stream Analytics till Power BI för en realtidsvy av data. Mer information finns i Realtidsströmning i Power BI.

Azure Monitor. Azure Monitor samlar in prestandamått om de Azure-tjänster som distribueras i lösningen. Genom att visualisera dessa på en instrumentpanel kan du få insikter om lösningens hälsotillstånd.

Scenarioinformation

Scenario: Ett taxiföretag samlar in data om varje taxiresa. I det här scenariot förutsätter vi att det finns två separata enheter som skickar data. Taxin har en mätare som skickar information om varje resa – varaktighet, avstånd och upphämtnings- och avlämningsplatser. En separat enhet accepterar betalningar från kunder och skickar data om priser. Taxibolaget vill beräkna det genomsnittliga tipset per körd mil i realtid för att upptäcka trender.

Potentiella användningsfall

Den här lösningen är optimerad för detaljhandelsscenariot.

Datainhämtning

För att simulera en datakälla använder den här referensarkitekturen datauppsättningen New York City Taxi Data[1]. Den här datamängden innehåller data om taxiresor i New York under en fyraårsperiod (2010–2013). Den innehåller två typer av poster: kördata och prisdata. Resedata omfattar resans varaktighet, reseavstånd och upphämtnings- och avlämningsplats. Prisdata omfattar belopp för biljettpriser, skatter och tips. Vanliga fält i båda posttyperna är medaljongnummer, hacklicens och leverantörs-ID. Tillsammans identifierar dessa tre fält unikt en taxi plus en förare. Data lagras i CSV-format.

[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). University of Illinois vid Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Datageneratorn är ett .NET Core-program som läser posterna och skickar dem till Azure Event Hubs. Generatorn skickar kördata i JSON-format och prisdata i CSV-format.

Event Hubs använder partitioner för att segmentera data. Med partitioner kan en konsument läsa varje partition parallellt. När du skickar data till Event Hubs kan du uttryckligen ange partitionsnyckeln. Annars tilldelas poster till partitioner med resursallokering.

I det här scenariot bör resedata och prisdata få samma partitions-ID för en viss taxi. Detta gör att Stream Analytics kan tillämpa en grad av parallellitet när det korrelerar de två strömmarna. En post i partition n av kördata matchar en post i partition n av prisdata.

Diagram över dataströmbearbetning med Azure Stream Analytics och Event Hubs

I datageneratorn har den gemensamma datamodellen för båda posttyperna en PartitionKey egenskap som är sammanfogningen av Medallion, HackLicenseoch VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Den här egenskapen används för att tillhandahålla en explicit partitionsnyckel när du skickar till Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Strömbearbetning

Dataströmbearbetningsjobbet definieras med hjälp av en SQL-fråga med flera olika steg. De första två stegen väljer helt enkelt poster från de två indataströmmarna.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

I nästa steg kopplas de två indataströmmarna till att välja matchande poster från varje dataström.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Den här frågan kopplar poster till en uppsättning fält som unikt identifierar matchande poster (PartitionId och PickupTime).

Anteckning

Vi vill att strömmarna TaxiRide och TaxiFare ska kopplas med den unika kombinationen av Medallion, HackLicenseVendorId och PickupTime. I det här fallet PartitionId omfattar fälten Medallion, HackLicense och VendorId , men detta bör inte tas som vanligt.

I Stream Analytics är kopplingar temporala, vilket innebär att poster kopplas inom en viss tidsperiod. Annars kan jobbet behöva vänta på obestämd tid på en matchning. Funktionen DATEDIFF anger hur långt två matchande poster kan avgränsas i tid för en matchning.

Det sista steget i jobbet beräknar det genomsnittliga tipset per mil, grupperat efter ett hoppande fönster på 5 minuter.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics har flera fönsterfunktioner. Ett hoppande fönster flyttas framåt i tiden med en fast period, i det här fallet 1 minut per hopp. Resultatet är att beräkna ett glidande medelvärde under de senaste 5 minuterna.

I arkitekturen som visas här sparas endast resultatet av Stream Analytics-jobbet i Azure Cosmos DB. I ett scenario med stordata kan du även använda Event Hubs Capture för att spara rådata i Azure Blob Storage. Om du behåller rådata kan du köra batchfrågor över dina historiska data vid ett senare tillfälle, för att härleda nya insikter från data.

Överväganden

Dessa överväganden implementerar grundpelarna i Azure Well-Architected Framework, som är en uppsättning vägledande grundsatser som kan användas för att förbättra kvaliteten på en arbetsbelastning. Mer information finns i Microsoft Azure Well-Architected Framework.

Skalbarhet

Event Hubs

Dataflödeskapaciteten för Event Hubs mäts i dataflödesenheter. Du kan autoskala en händelsehubb genom att aktivera automatisk ökning, vilket automatiskt skalar genomflödesenheterna baserat på trafik, upp till ett konfigurerat maxvärde.

Stream Analytics

För Stream Analytics mäts de beräkningsresurser som allokeras till ett jobb i strömningsenheter. Stream Analytics-jobb skalas bäst om jobbet kan parallelliseras. På så sätt kan Stream Analytics distribuera jobbet över flera beräkningsnoder.

För Event Hubs-indata använder du nyckelordet PARTITION BY för att partitioneras Stream Analytics-jobbet. Data delas upp i delmängder baserat på Event Hubs-partitionerna.

Fönsterfunktioner och tidsmässiga kopplingar kräver ytterligare SU. Använd när det är möjligt PARTITION BY så att varje partition bearbetas separat. Mer information finns i Förstå och justera strömningsenheter.

Om det inte går att parallellisera hela Stream Analytics-jobbet kan du försöka dela upp jobbet i flera steg, med början i ett eller flera parallella steg. På så sätt kan de första stegen köras parallellt. I den här referensarkitekturen kan du till exempel:

  • Steg 1 och 2 är enkla SELECT instruktioner som väljer poster i en enda partition.
  • Steg 3 utför en partitionerad koppling mellan två indataströmmar. Det här steget drar nytta av det faktum att matchande poster delar samma partitionsnyckel och därför garanterat har samma partitions-ID i varje indataström.
  • Steg 4 aggregeras över alla partitioner. Det går inte att parallellisera det här steget.

Använd Stream Analytics-jobbdiagrammet för att se hur många partitioner som tilldelas till varje steg i jobbet. Följande diagram visar jobbdiagrammet för den här referensarkitekturen:

Diagram som visar Stream Analytics-jobb.

Azure Cosmos DB

Dataflödeskapaciteten för Azure Cosmos DB mäts i enheter för programbegäran (RU). För att kunna skala en Azure Cosmos DB-container efter 10 000 RU måste du ange en partitionsnyckel när du skapar containern och inkludera partitionsnyckeln i varje dokument.

I den här referensarkitekturen skapas nya dokument bara en gång per minut (intervall för hoppande fönster), så dataflödeskraven är ganska låga. Därför behöver du inte tilldela en partitionsnyckel i det här scenariot.

Övervakning

Med alla dataströmbearbetningslösningar är det viktigt att övervaka systemets prestanda och hälsotillstånd. Azure Monitor samlar in mått och diagnostikloggar för de Azure-tjänster som används i arkitekturen. Azure Monitor är inbyggt i Azure-plattformen och kräver ingen ytterligare kod i ditt program.

Någon av följande varningssignaler indikerar att du bör skala ut relevant Azure-resurs:

  • Event Hubs begränsar begäranden eller ligger nära den dagliga meddelandekvoten.
  • Stream Analytics-jobbet använder konsekvent mer än 80 % av de allokerade direktuppspelningsenheterna (SU).
  • Azure Cosmos DB börjar begränsa begäranden.

Referensarkitekturen innehåller en anpassad instrumentpanel som distribueras till Azure Portal. När du har distribuerat arkitekturen kan du visa instrumentpanelen genom att öppna Azure Portal och välja TaxiRidesDashboard från listan över instrumentpaneler. Mer information om hur du skapar och distribuerar anpassade instrumentpaneler i Azure Portal finns i Skapa Azure-instrumentpaneler programmatiskt.

Följande bild visar instrumentpanelen efter att Stream Analytics-jobbet kördes i ungefär en timme.

Skärmbild av instrumentpanelen Taxi Rides

Panelen längst ned till vänster visar att SU-förbrukningen för Stream Analytics-jobbet ökar under de första 15 minuterna och sedan planar ut. Det här är ett typiskt mönster när jobbet når ett stabilt tillstånd.

Observera att Event Hubs begränsar begäranden, som visas i den övre högra panelen. En tillfällig begränsad begäran är inte ett problem, eftersom Event Hubs-klient-SDK:et automatiskt försöker igen när den får ett begränsningsfel. Men om du ser konsekventa begränsningsfel innebär det att händelsehubben behöver fler dataflödesenheter. I följande diagram visas en testkörning med funktionen Automatisk ökning av Event Hubs, som automatiskt skalar ut dataflödesenheterna efter behov.

Skärmbild av autoskalning av Event Hubs.

Automatisk ökning aktiverades vid ungefär 06:35-märket. Du kan se p-minskningen i begränsade begäranden, eftersom Event Hubs automatiskt skalas upp till 3 dataflödesenheter.

Intressant nog hade detta sidoeffekten att öka SU-användningen i Stream Analytics-jobbet. Genom begränsning minskade Event Hubs inmatningshastigheten för Stream Analytics-jobbet på ett artificiellt sätt. Det är faktiskt vanligt att lösa en prestandaflaskhals avslöjar en annan. I det här fallet löste allokeringen av ytterligare SU för Stream Analytics-jobbet problemet.

Kostnadsoptimering

Kostnadsoptimering handlar om att titta på sätt att minska onödiga utgifter och förbättra driftseffektiviteten. Mer information finns i Översikt över grundpelare för kostnadsoptimering.

Normalt beräknar du kostnader med hjälp av priskalkylatorn för Azure. Här följer några överväganden för tjänster som används i den här referensarkitekturen.

Azure Stream Analytics

Azure Stream Analytics prissätts med det antal strömningsenheter (0,11 USD/timme) som krävs för att bearbeta data till tjänsten.

Stream Analytics kan vara dyrt om du inte bearbetar data i realtid eller små mängder data. För dessa användningsfall bör du överväga att använda Azure Functions eller Logic Apps för att flytta data från Azure Event Hubs till ett datalager.

Azure Event Hubs och Azure Cosmos DB

Kostnadsöverväganden om Azure Event Hubs och Azure Cosmos DB finns i Kostnadsöverväganden i Referensarkitektur för Stream-bearbetning med Azure Databricks.

DevOps

  • Skapa separata resursgrupper för produktions-, utvecklings- och testmiljöer. Med separata resursgrupper blir det enklare att hantera distributioner, ta bort testdistributioner och tilldela åtkomsträttigheter.

  • Använd Azure Resource Manager-mall för att distribuera Azure-resurserna efter IaC-processen (infrastruktur som kod). Med mallar är det enklare att automatisera distributioner med Hjälp av Azure DevOps Services eller andra CI/CD-lösningar.

  • Placera varje arbetsbelastning i en separat distributionsmall och lagra resurserna i källkontrollsystemen. Du kan distribuera mallarna tillsammans eller individuellt som en del av en CI/CD-process, vilket gör automatiseringsprocessen enklare.

    I den här arkitekturen identifieras Azure Event Hubs, Log Analytics och Azure Cosmos DB som en enda arbetsbelastning. Dessa resurser ingår i en enda ARM-mall.

  • Överväg att mellanlagring av dina arbetsbelastningar. Distribuera till olika faser och kör valideringskontroller i varje steg innan du går vidare till nästa steg. På så sätt kan du push-överföra uppdateringar till dina produktionsmiljöer på ett mycket kontrollerat sätt och minimera oväntade distributionsproblem.

  • Överväg att använda Azure Monitor för att analysera prestanda för din pipeline för dataströmbearbetning. Mer information finns i Övervaka Azure Databricks.

Mer information finns i grundpelare för driftseffektivitet i Microsoft Azure Well-Architected Framework.

Distribuera det här scenariot

Om du vill distribuera och köra referensimplementeringen följer du stegen i GitHub-läsningen.

Du kanske vill granska följande Azure-exempelscenarier som demonstrerar specifika lösningar som använder några av samma tekniker: