Realtidsanalys kan hjälpa dig att fatta snabba beslut och utföra automatiserade åtgärder baserat på aktuella insikter. Det kan också hjälpa dig att leverera förbättrade kundupplevelser. Den här lösningen beskriver hur du håller Azure Synapse Analytics-datapooler synkroniserade med ändringar av driftdata i MongoDB.
Arkitektur
Följande diagram visar hur du implementerar realtidssynkronisering från Atlas till Azure Synapse Analytics. Det här enkla flödet säkerställer att alla ändringar som sker i MongoDB Atlas-samlingen replikeras till standardlagringsplatsen för Azure Data Lake Storage i Azure Synapse Analytics-arbetsytan. När data finns i Data Lake Storage kan du använda Azure Synapse Analytics-pipelines för att skicka data till dedikerade SQL-pooler, Spark-pooler eller andra lösningar, beroende på dina analyskrav.
Ladda ned en PowerPoint-fil med den här arkitekturen.
Dataflöde
Realtidsändringar i MongoDB Atlas operational data store (ODS) samlas in och görs tillgängliga för Data Lake Storage på en Azure Synapse Analytics-arbetsyta för användningsfall i realtidsanalys, liverapporter och instrumentpaneler.
Dataändringar i MongoDB Atlas drift-/transaktionsdatalager samlas in av Atlas-utlösare.
När en Atlas-databasutlösare observerar en händelse skickar den ändringstypen och dokumentet som har ändrats (fullständig eller delta) till en Atlas-funktion.
Atlas-funktionen utlöser en Azure-funktion som skickar ändringshändelsen och ett JSON-dokument.
Azure Functions använder Azure Storage Files Data Lake-klientbiblioteket för att skriva det ändrade dokumentet till den konfigurerade Data Lake Storage på Azure Synapse Analytics-arbetsytan.
När data finns i Data Lake Storage kan de skickas till dedikerade SQL-pooler, Spark-pooler och andra lösningar. Du kan också konvertera data från JSON till Parquet- eller Delta-format med hjälp av Azure Synapse Analytics-dataflöden eller Kopiera pipelines för att köra ytterligare BI-rapportering eller AI/maskininlärning på aktuella data.
Komponenter
- Med MongoDB Atlas-ändringsströmmar kan du meddela program om ändringar i ett samlings-, databas- eller distributionskluster. Ändringsströmmar ger program åtkomst till dataändringar i realtid och gör det möjligt för dem att omedelbart reagera på ändringar. Den här funktionen är viktig i användningsfall som IoT-händelsespårning och ändringar av finansiella data, där larm måste aktiveras och dynamiska åtgärder måste vidtas omedelbart. Atlas-utlösare använder ändringsströmmar för att övervaka samlingar för ändringar och anropar automatiskt den associerade Atlas-funktionen som svar på utlösarhändelsen.
- Atlas-utlösare svarar på dokumentinfogningar, uppdateringar och borttagningar i en specifik samling och kan automatiskt anropa en Atlas-funktion som svar på ändringshändelsen.
- Atlas-funktioner är serverlösa JavaScript-kodimplementeringar på serversidan som kan utföra åtgärder baserat på de händelser som anropar en Atlas-utlösare. Genom att kombinera Atlas-utlösare med Atlas-funktioner förenklas implementeringen av händelsedrivna arkitekturer.
- Azure Functions är en händelsedriven, serverlös beräkningsplattform som du kan använda för att utveckla program effektivt med valfritt programmeringsspråk. Du kan också använda den för att ansluta sömlöst till andra Azure-tjänster. I det här scenariot samlar en Azure-funktion in en ändringshändelse och använder den för att skriva en blob som innehåller ändrade data till Data Lake Storage med hjälp av Azure Storage Files Data Lake-klientbiblioteket.
- Data Lake Storage är standardlösningen för lagring i Azure Synapse Analytics. Du kan använda serverlösa pooler för att fråga data direkt.
- Pipelines och dataflöden i Azure Synapse Analytics kan användas för att push-överföra bloben som innehåller MongoDB-ändrade data till dedikerade SQL-pooler eller Spark-pooler för ytterligare analys. Med pipelines kan du agera på ändrade datauppsättningar i Data Lake Storage genom att använda både utlösare för lagringshändelser och schemalagda utlösare för att skapa lösningar för användningsfall i både realtid och nära realtid. Den här integreringen påskyndar nedströmsförbrukningen av ändringsdatauppsättningar.
Alternativ
Den här lösningen använder Atlas-utlösare för att omsluta koden för att lyssna på Atlas ändringsströmmar och utlösa Azure Functions som svar på ändringshändelsen. Det är därför mycket enklare att implementera än den tidigare tillhandahållna alternativa lösningen. För den lösningen måste du skriva kod för att lyssna på ändringsströmmar i en Azure App Service-webbapp .
Ett annat alternativ är att använda MongoDB Spark Connector för att läsa MongoDB-dataströmmar och skriva dem till Delta-tabeller. Koden körs kontinuerligt i en Spark Notebook som ingår i en pipeline i Azure Synapse Analytics. Mer information om hur du implementerar den här lösningen finns i Synkronisera från Atlas till Azure Synapse Analytics med Spark-strömning.
Att använda Atlas-utlösare med Azure Functions är dock en helt serverlös lösning. Eftersom den är serverlös ger lösningen robust skalbarhet och kostnadsoptimering. Prissättningen baseras på en betala per användning-kostnadsmodell. Du kan spara mer pengar med hjälp av Atlas-funktionen för att kombinera några ändringshändelser innan du anropar Azure Functions-slutpunkten. Den här strategin kan vara användbar i scenarier med tung trafik.
Dessutom förenar Microsoft Fabric din dataegendom och gör det enklare att köra analys och AI över data, så att du snabbt får insikter. Azure Synapse Analytics-datateknik, datavetenskap, datalagerhantering och realtidsanalys i Fabric kan nu bättre använda MongoDB-data som skickas till OneLake. Du kan använda både Dataflow Gen2- och datapipelineanslutningar för Atlas för att läsa in Atlas-data direkt till OneLake. Den här mekanismen utan kod är ett kraftfullt sätt att mata in data från Atlas till OneLake.
I Infrastruktur kan du direkt referera till data som skickas till Data Lake Storage med hjälp av OneLake-genvägar, utan extrahering, transformering, inläsning (ETL).
Du kan skicka data till Power BI för att skapa rapporter och visualiseringar för BI-rapportering.
Information om scenario
MongoDB Atlas, det operativa dataskiktet i många företagsprogram, lagrar data från interna program, kundinriktade tjänster och API:er från tredje part från flera kanaler. Du kan använda datapipelines i Azure Synapse Analytics för att kombinera dessa data med relationsdata från andra traditionella program och med ostrukturerade data från källor som loggar, objektlager och klickströmmar.
Företag använder MongoDB-funktioner som sammansättningar, analysnoder, Atlas search, vektorsökning, Atlas Data Lake, Atlas SQL-gränssnitt, datafederation och diagram för att aktivera programdriven intelligens. Transaktionsdata i MongoDB extraheras, transformeras och läses in till dedikerade SQL-pooler i Azure Synapse Analytics eller Spark-pooler för batch-, AI-/maskininlärnings- och datalager-BI-analys och -intelligens.
Det finns två scenarier för dataflytt mellan Atlas och Azure Synapse Analytics: batchintegrering och realtidssynkronisering.
Batchintegration
Du kan använda batch- och mikrobatchintegrering för att flytta data från Atlas till Data Lake Storage i Azure Synapse Analytics. Du kan hämta hela historiska data samtidigt eller hämta inkrementella data baserat på filtervillkor.
Lokala MongoDB-instanser och MongoDB Atlas kan integreras som en källa eller en mottagarresurs i Azure Synapse Analytics. Information om anslutningsappar finns i Kopiera data från eller till MongoDB eller Kopiera data från eller till MongoDB Atlas.
Källanslutningen gör det enkelt att köra Azure Synapse Analytics på driftdata som lagras i lokala MongoDB eller i Atlas. Du kan hämta data från Atlas med hjälp av källanslutningsappen och läsa in data till Data Lake Storage i Parquet-, Avro-, JSON- och textformat eller som CSV-bloblagring. Dessa filer kan sedan transformeras eller kopplas till andra filer från andra datakällor i scenarier med flera databaser, flera moln eller hybridmoln. Det här användningsfallet är vanligt i företagsdatalager (EDW) och analysscenarier i stor skala. Du kan också använda mottagaranslutningsappen för att lagra resultatet av analysen i Atlas igen. Mer information om batchintegrering finns i Analysera driftdata på MongoDB Atlas med Azure Synapse Analytics.
Realtidssynkronisering
Arkitekturen som beskrivs i den här artikeln kan hjälpa dig att implementera realtidssynkronisering för att hålla Azure Synapse Analytics-lagringen aktuell med MongoDB-driftdata.
Den här lösningen består av två primära funktioner:
- Samla in ändringarna i Atlas
- Utlösa Azure-funktionen för att sprida ändringarna till Azure Synapse Analytics
Samla in ändringarna i Atlas
Du kan samla in ändringarna med hjälp av en Atlas-utlösare, som du kan konfigurera i användargränssnittet för Lägg till utlösare eller med hjälp av Atlas App Services Admin-API:et. Utlösare lyssnar efter databasändringar som orsakas av databashändelser som infogningar, uppdateringar och borttagningar. Atlasutlösare utlöser också en Atlas-funktion när en ändringshändelse identifieras. Du kan använda användargränssnittet Lägg till utlösare för att lägga till funktionen. Du kan också skapa en Atlas-funktion och associera den som slutpunkt för utlösaranrop med hjälp av Atlas Admin-API:et.
Följande skärmbild visar det formulär som du kan använda för att skapa och redigera en Atlas-utlösare. I avsnittet Information om utlösarens källa anger du den samling som utlösaren bevakar för ändringshändelser och de databashändelser som den söker efter (infoga, uppdatera, ta bort och/eller ersätt).
Utlösaren kan anropa en Atlas-funktion som svar på den händelse som den är aktiverad för. Följande skärmbild visar den enkla JavaScript-koden, som lagts till som en Atlas-funktion, för att anropa som svar på databasutlösaren. Atlas-funktionen anropar en Azure-funktion och skickar metadata för ändringshändelsen tillsammans med dokumentet som infogades, uppdaterades, togs bort eller ersattes, beroende på vad utlösaren är aktiverad för.
Atlas-funktionskod
Atlas-funktionskoden utlöser Den Azure-funktion som är associerad med Azure-funktionsslutpunkten genom att skicka hela changeEvent
i brödtexten i begäran till Azure-funktionen.
Du måste ersätta <Azure function URL endpoint>
platshållaren med den faktiska AZURE-funktions-URL-slutpunkten.
exports = function(changeEvent) {
// Invoke Azure function that inserts the change stream into Data Lake Storage.
console.log(typeof fullDocument);
const response = context.http.post({
url: "<Azure function URL endpoint>",
body: changeEvent,
encodeBodyAsJSON: true
});
return response;
};
Utlös Azure-funktionen för att sprida ändringarna till Azure Synapse Analytics
Atlas-funktionen kodas för att anropa en Azure-funktion som skriver ändringsdokumentet till Data Lake Storage i Azure Synapse Analytics. Azure-funktionen använder Azure Data Lake Storage-klientbiblioteket för Python SDK för att skapa en instans av DataLakeServiceClient
klassen som representerar ditt lagringskonto.
Azure-funktionen använder en lagringsnyckel för autentisering. Du kan också använda Microsoft Entra ID OAuth-implementeringar. Och storage_account_key
andra attribut som är relaterade till Dake Lake Storage hämtas från de konfigurerade os-miljövariablerna. När begärandetexten har avkodats fullDocument
parsas (hela det infogade eller uppdaterade dokumentet) från begärandetexten och skrivs sedan till Data Lake Storage av Data Lake-klientfunktionerna append_data
och flush_data
.
För en borttagningsåtgärd fullDocumentBeforeChange
används i stället för fullDocument
. fullDocument
har inget värde i en borttagningsåtgärd, så koden hämtar dokumentet som har tagits bort, som hämtas i fullDocumentBeforeChange
. Observera att fullDocumentBeforeChange
endast fylls i när inställningen Förinställ dokument är inställd på på, som du ser i föregående skärmbild.
import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a new request.')
logging.info(req)
storage_account_name = os.environ["storage_account_name"]
storage_account_key = os.environ["storage_account_key"]
storage_container = os.environ["storage_container"]
storage_directory = os.environ["storage_directory"]
storage_file_name = os.environ["storage_file_name"]
service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
"https", storage_account_name), credential=storage_account_key)
json_data = req.get_body()
logging.info(json_data)
object_id = "test"
try:
json_string = json_data.decode("utf-8")
json_object = json.loads(json_string)
if json_object["operationType"] == "delete":
object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
else:
object_id = json_object["fullDocument"]["_id"]["$oid"]
data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}
logging.info(object_id)
encoded_data = json.dumps(data)
except Exception as e:
logging.info("Exception occurred : "+ str(e))
file_system_client = service_client.get_file_system_client(file_system=storage_container)
directory_client = file_system_client.get_directory_client(storage_directory)
file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
file_client.flush_data(len(encoded_data))
return func.HttpResponse(f"This HTTP triggered function executed successfully.")
Hittills har du sett hur Atlas-utlösaren samlar in alla ändringar som inträffar och skickar den till en Azure-funktion via en Atlas-funktion och att Azure-funktionen skriver ändringsdokumentet som en ny fil i Data Lake Storage på Azure Synapse Analytics-arbetsytan.
När filen har lagts till i Data Lake Storage kan du konfigurera en utlösare för lagringshändelser för att utlösa en pipeline som sedan kan skriva ändringsdokumentet till en dedikerad SQL-pool eller till en Spark-pooltabell. Pipelinen kan använda aktiviteten Kopiera och transformera data med hjälp av ett dataflöde. Om det slutliga målet är en dedikerad SQL-pool kan du också ändra Azure-funktionen så att den skriver direkt till den dedikerade SQL-poolen i Azure Synapse Analytics. För en SQL-pool hämtar du ODBC-anslutningssträng för SQL-poolanslutningen. Se Använda Python för att fråga en databas om ett exempel på Python-kod som du kan använda för att köra frågor mot SQL-pooltabellen med hjälp av anslutningssträng. Du kan ändra den här koden så att den använder en Insert-fråga för att skriva till en dedikerad SQL-pool. Det finns konfigurationsinställningar och roller som måste tilldelas för att funktionen ska kunna skriva till en dedikerad SQL-pool. Information om dessa inställningar och roller ligger utanför omfånget för den här artikeln.
Om du vill ha en lösning i nära realtid och du inte behöver att data synkroniseras i realtid kan det vara ett bra alternativ att använda schemalagda pipelinekörningar. Du kan konfigurera schemalagda utlösare för att utlösa en pipeline med aktiviteten Kopiera eller ett dataflöde, med en frekvens som är nära realtidsfrekvens som företaget har råd med, för att använda MongoDB-anslutningsappen för att hämta data från MongoDB som infogades, uppdaterades eller togs bort mellan den senaste schemalagda körningen och den aktuella körningen. Pipelinen använder MongoDB-anslutningsappen som källanslutning för att hämta deltadata från MongoDB Atlas och push-överföra dem till dedikerade SQL-pooler i Data Lake Storage eller Azure Synapse Analytics med hjälp av dessa som mottagaranslutningar. Den här lösningen använder en pull-mekanism (till skillnad från huvudlösningen som beskrivs i den här artikeln, som är en push-mekanism) från MongoDB Atlas när ändringar sker i MongoDB Atlas-samlingen som Atlas-utlösaren lyssnar på.
Potentiella användningsfall
MongoDB och Azure Synapse Analytics EDW och analystjänster kan hantera flera användningsfall:
Retail
- Skapa information om produktpaketering och produkthöjning
- Implementera kund 360 och hyperanpassning
- Förutsäga lagerutarmning och optimera orderingången i leveranskedjan
- Implementera priser för dynamisk rabatt och smart sökning i e-handel
Bank och ekonomi
- Anpassa kundens finansiella tjänster
- Identifiera och blockera bedrägliga transaktioner
Telekommunikation
- Optimera nästa generations nätverk
- Maximera värdet för gränsnätverk
Fordon
- Optimera parameteriseringen av anslutna fordon
- Identifiera avvikelser i IoT-kommunikation i anslutna fordon
Manufacturing
- Tillhandahålla förutsägande underhåll för maskiner
- Optimera lagrings- och lagerhantering
Att tänka på
Dessa överväganden implementerar grundpelarna i Azure Well-Architected Framework, som är en uppsättning vägledande grundsatser som du kan använda för att förbättra kvaliteten på en arbetsbelastning. Mer information finns i Microsoft Azure Well-Architected Framework.
Säkerhet
Säkerhet ger garantier mot avsiktliga attacker och missbruk av dina värdefulla data och system. Mer information finns i Översikt över säkerhetspelare.
Azure Functions är en serverlös hanterad tjänst, så appresurserna och plattformskomponenterna skyddas av förbättrad säkerhet. Vi rekommenderar dock att du använder HTTPS-protokollet och de senaste TLS-versionerna. Det är också en bra idé att verifiera indata för att säkerställa att det är ett MongoDB-ändringsdokument. Se Skydda Azure Functions för säkerhetsöverväganden för Azure Functions.
MongoDB Atlas är en hanterad databas som en tjänst, så MongoDB ger förbättrad plattformssäkerhet. MongoDB tillhandahåller flera mekanismer för att säkerställa 360-graders säkerhet för lagrade data, inklusive databasåtkomst, nätverkssäkerhet, kryptering i vila och under överföring samt datasuveränitet. Se MongoDB Atlas Security för MongoDB Atlas säkerhetsdokument och andra artiklar som kan hjälpa dig att säkerställa att data i MongoDB är säkra under hela datalivscykeln.
Kostnadsoptimering
Kostnadsoptimering handlar om att minska onödiga utgifter och förbättra drifteffektiviteten. Mer information finns i Översikt över kostnadsoptimeringspelare.
Om du vill beräkna kostnaden för Azure-produkter och -konfigurationer använder du priskalkylatorn för Azure. Azure hjälper dig att undvika onödiga kostnader genom att fastställa rätt antal resurser som ska användas, analysera utgifter över tid och skala för att uppfylla affärsbehov utan överförbrukning. Azure Functions medför endast kostnader när de anropas. Beroende på mängden ändringar i MongoDB Atlas kan du dock utvärdera med hjälp av en batchbearbetningsmekanism i Atlas-funktionen för att lagra ändringar i en annan tillfällig samling och utlösa Azure-funktionen endast om batchen överskrider en viss gräns.
Information om Atlas-kluster finns i 5 sätt att minska kostnaderna med MongoDB Atlas och klusterkonfigurationskostnader. MongoDB-prissidan kan hjälpa dig att förstå prisalternativ för MongoDB Atlas-kluster och andra erbjudanden för MongoDB Atlas-utvecklardataplattformen. Atlas Data Federation kan distribueras i Azure och har stöd för Azure Blob Storage (i förhandsversion). Om du överväger att använda batchbearbetning för att optimera kostnaderna bör du överväga att skriva till Blob Storage i stället för en tillfällig MongoDB-samling.
Prestandaeffektivitet
Prestandaeffektivitet handlar om att effektivt skala arbetsbelastningen baserat på användarnas behov. Mer information finns i Översikt över grundpelare för prestandaeffektivitet.
Atlas-utlösare och Azure Functions är tidstestade för prestanda och skalbarhet. Se Prestanda och skalning i Durable Functions (Azure Functions) för att förstå prestanda- och skalbarhetsöverväganden för Azure Functions. Se Skala på begäran för några överväganden för att förbättra prestanda för dina MongoDB Atlas-instanser. Se Metodtipsguide för MongoDB-prestanda för bästa praxis för MongoDB Atlas-konfiguration.
Slutsats
MongoDB Atlas integreras sömlöst med Azure Synapse Analytics, vilket gör det möjligt för Atlas-kunder att enkelt använda Atlas som källa eller mottagare för Azure Synapse Analytics. Med den här lösningen kan du använda MongoDB-driftdata i realtid från Azure Synapse Analytics för komplex analys och AI-slutsatsdragning.
Distribuera det här scenariot
Realtidssynkronisering från MongoDB Atlas till Azure Synapse Analytics
Deltagare
Den här artikeln underhålls av Microsoft. Det har ursprungligen skrivits av följande medarbetare.
Huvudsakliga författare:
- Diana Annie Jenosh | Senior Solutions Architect – MongoDB-partnerteam
- Venkatesh Shanbag| Senior Solutions Architect – MongoDB-partnerteam
Övriga medarbetare:
- Sunil Sabat | Principal Program Manager – ADF-team
- Wee Hyong Tok | Huvudansvarig för PM – ADF-teamet
Om du vill se icke-offentliga LinkedIn-profiler loggar du in på LinkedIn.