Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här artikeln beskriver hur du ansluter azure Cosmos DB MongoDB vCore från Azure Databricks. Den går igenom grundläggande DML-åtgärder (Data Manipulation Language) som Read, Filter, SQLs, Aggregation Pipelines och Write Tables med python-kod.
Förutsättningar
Etablera ditt val av Spark-miljö i Azure Databricks.
Konfigurera beroenden för anslutning
Följande är de beroenden som krävs för att ansluta till Azure Cosmos DB för MongoDB vCore från Azure Databricks:
Spark-anslutningsappen för MongoDB Spark-anslutningsappen används för att ansluta till Azure Cosmos DB för MongoDB vCore. Identifiera och använd den version av anslutningsappen som finns i Maven Central som är kompatibel med Spark- och Scala-versionerna av din Spark-miljö. Vi rekommenderar en miljö som stöder Spark 3.2.1 eller senare och spark-anslutningsappen som är tillgänglig vid maven-koordinaterna
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
.Azure Cosmos DB för MongoDB-anslutningssträngar: Din Azure Cosmos DB för MongoDB vCore-anslutningssträng, användarnamn och lösenord.
Skapa en arbetsyta i Azure Databricks
Du kan följa anvisningarna för att etablera en Azure Databricks-arbetsyta. Du kan använda den tillgängliga standardberäkningen eller skapa en ny beräkningsresurs för att köra notebook-filen. Se till att välja en Databricks-körtidsmiljö som stöder Spark 3.0 eller senare.
Lägga till beroenden
Lägg till MongoDB Connector för Spark-biblioteket i din beräkning för att ansluta till både interna MongoDB- och Azure Cosmos DB för MongoDB-slutpunkter. I din beräkning väljer du Bibliotek>Installera ny>Maven och lägger sedan till org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
Maven-koordinater.
Välj Installera och starta sedan om beräkningen när installationen är klar.
Kommentar
Se till att du startar om Databricks-beräkningen när MongoDB Connector för Spark-biblioteket har installerats.
Därefter kan du skapa en Scala- eller Python-notebook-fil för migrering.
Skapa En Python-notebook-fil för att ansluta till Azure Cosmos DB for MongoDB vCore
Skapa en Python Notebook i Databricks. Ange rätt värden för variablerna innan du kör följande koder.
Uppdatera Spark-konfigurationen med Azure Cosmos DB för MongoDB anslutningssträng
- Observera anslutningssträngen under Inställningar –> Anslutningssträngar i Azure Cosmos DB MongoDB vCore-resurs i Azure Portal. Den har formen "mongodb+srv://<user>:<password>@<database_name.mongocluster.cosmos.azure.com>"
- I Databricks i beräkningskonfigurationen klistrar du in anslutningssträngen för både variablerna
spark.mongodb.output.uri
ochspark.mongodb.input.uri
under Avancerade alternativ (längst ned på sidan). Fyll i det användarnamn- och lösenordsfält som är lämpligt. På så sätt använder alla arbetsböcker som körs i beräkningsmiljön den här konfigurationen. - Du kan också uttryckligen
option
ange när du anropar API:er som:spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load()
. Om du konfigurerar variablerna i klustret behöver du inte ange alternativet.
connectionString_vcore="mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
database="<database_name>"
collection="<collection_name>"
Dataexempeluppsättning
I det här labbet använder vi CSV-datauppsättningen Citibike2019. Du kan importera den: CitiBike Resehistorik 2019. Vi läste in den i en databas med namnet "CitiBikeDB" och samlingen "CitiBike2019". Vi ställer in databasen och samlingen variabler så att de pekar på de data som läses in och vi använder variabler i exemplen.
database="CitiBikeDB"
collection="CitiBike2019"
Läsa data från Azure Cosmos DB för MongoDB vCore
Den allmänna syntaxen ser ut så här:
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
Du kan verifiera dataramen som läses in på följande sätt:
df_vcore.printSchema()
display(df_vcore)
Nu ska vi se ett exempel:
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
df_vcore.printSchema()
display(df_vcore)
Utdata:
Schema
DataFrame
Filtrera data från Azure Cosmos DB för MongoDB vCore
Den allmänna syntaxen ser ut så här:
df_v = df_vcore.filter(df_vcore[column number/column name] == [filter condition])
display(df_v)
Nu ska vi se ett exempel:
df_v = df_vcore.filter(df_vcore[2] == 1970)
display(df_v)
Utdata:
Skapa en vy eller tillfällig tabell och kör SQL-frågor mot den
Den allmänna syntaxen ser ut så här:
df_[dataframename].createOrReplaceTempView("[View Name]")
spark.sql("SELECT * FROM [View Name]")
Nu ska vi se ett exempel:
df_vcore.createOrReplaceTempView("T_VCORE")
df_v = spark.sql(" SELECT * FROM T_VCORE WHERE birth_year == 1970 and gender == 2 ")
display(df_v)
Utdata:
Skriva data till Azure Cosmos DB för MongoDB vCore
Den allmänna syntaxen ser ut så här:
df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","<collection_name>").mode("append").save()
Nu ska vi se ett exempel:
df_vcore.write.format("mongo").option("spark.mongodb.output.uri", connectionString_vcore).option("database",database).option("collection","CitiBike2019").mode("append").save()
Det här kommandot har inte några utdata när det skrivs direkt till samlingen. Du kan verifiera om posten har uppdaterats med hjälp av ett läskommando.
Läsa data från Azure Cosmos DB for MongoDB vCore-samling som använder en aggregeringspipeline
Obs! Sammansättningspipeline är en kraftfull funktion som möjliggör förbearbetning och transformation av data i Azure Cosmos DB för MongoDB. Det är en bra matchning för realtidsanalys, instrumentpaneler, rapportgenerering med sammanslagningar, summor och medelvärden med data på serversidan efter bearbetning. (Obs! det finns en hel bok skriven om den).
Azure Cosmos DB for MongoDB har även stöd för omfattande sekundära/sammansatta index för att extrahera, filtrera och bearbeta endast de data som behövs.
Du kan till exempel analysera alla kunder som finns i ett visst geografiskt område direkt i databasen utan att först behöva läsa in den fullständiga datauppsättningen, minimera dataflytten och minska svarstiden.
Här är ett exempel på hur du använder aggregerad funktion:
pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).option("pipeline", pipeline).load()
display(df_vcore)
Utdata:
Relaterat innehåll
Följande artiklar visar hur du använder aggregeringspipelines i Azure Cosmos DB för MongoDB vCore:
- Maven central är platsen där du hittar Spark Connector.
- Sammansättningspipeline