Dela via


Ansluta till Azure Cosmos DB for MongoDB vCore från Azure Databricks

Den här artikeln beskriver hur du ansluter azure Cosmos DB MongoDB vCore från Azure Databricks. Den går igenom grundläggande DML-åtgärder (Data Manipulation Language) som Read, Filter, SQLs, Aggregation Pipelines och Write Tables med python-kod.

Förutsättningar

Konfigurera beroenden för anslutning

Följande är de beroenden som krävs för att ansluta till Azure Cosmos DB för MongoDB vCore från Azure Databricks:

  • Spark-anslutningsappen för MongoDB Spark-anslutningsappen används för att ansluta till Azure Cosmos DB för MongoDB vCore. Identifiera och använd den version av anslutningsappen som finns i Maven Central som är kompatibel med Spark- och Scala-versionerna av din Spark-miljö. Vi rekommenderar en miljö som stöder Spark 3.2.1 eller senare och spark-anslutningsappen som är tillgänglig vid maven-koordinaterna org.mongodb.spark:mongo-spark-connector_2.12:3.0.1.

  • Azure Cosmos DB för MongoDB-anslutningssträngar: Din Azure Cosmos DB för MongoDB vCore-anslutningssträng, användarnamn och lösenord.

Skapa en arbetsyta i Azure Databricks

Du kan följa anvisningarna för att etablera en Azure Databricks-arbetsyta. Du kan använda den tillgängliga standardberäkningen eller skapa en ny beräkningsresurs för att köra notebook-filen. Se till att välja en Databricks-körtidsmiljö som stöder Spark 3.0 eller senare.

Lägga till beroenden

Lägg till MongoDB Connector för Spark-biblioteket i din beräkning för att ansluta till både interna MongoDB- och Azure Cosmos DB för MongoDB-slutpunkter. I din beräkning väljer du Bibliotek>Installera ny>Maven och lägger sedan till org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 Maven-koordinater.

Diagram över hur du lägger till databricks-beräkningsberoenden.

Välj Installera och starta sedan om beräkningen när installationen är klar.

Kommentar

Se till att du startar om Databricks-beräkningen när MongoDB Connector för Spark-biblioteket har installerats.

Därefter kan du skapa en Scala- eller Python-notebook-fil för migrering.

Skapa En Python-notebook-fil för att ansluta till Azure Cosmos DB for MongoDB vCore

Skapa en Python Notebook i Databricks. Ange rätt värden för variablerna innan du kör följande koder.

Uppdatera Spark-konfigurationen med Azure Cosmos DB för MongoDB anslutningssträng

  1. Observera anslutningssträngen under Inställningar –> Anslutningssträngar i Azure Cosmos DB MongoDB vCore-resurs i Azure Portal. Den har formen "mongodb+srv://<user>:<password>@<database_name.mongocluster.cosmos.azure.com>"
  2. I Databricks i beräkningskonfigurationen klistrar du in anslutningssträngen för både variablerna spark.mongodb.output.uri och spark.mongodb.input.uri under Avancerade alternativ (längst ned på sidan). Fyll i det användarnamn- och lösenordsfält som är lämpligt. På så sätt använder alla arbetsböcker som körs i beräkningsmiljön den här konfigurationen.
  3. Du kan också uttryckligen option ange när du anropar API:er som: spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load(). Om du konfigurerar variablerna i klustret behöver du inte ange alternativet.
connectionString_vcore="mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
database="<database_name>"
collection="<collection_name>"

Dataexempeluppsättning

I det här labbet använder vi CSV-datauppsättningen Citibike2019. Du kan importera den: CitiBike Resehistorik 2019. Vi läste in den i en databas med namnet "CitiBikeDB" och samlingen "CitiBike2019". Vi ställer in databasen och samlingen variabler så att de pekar på de data som läses in och vi använder variabler i exemplen.

database="CitiBikeDB"
collection="CitiBike2019"

Läsa data från Azure Cosmos DB för MongoDB vCore

Den allmänna syntaxen ser ut så här:

df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()

Du kan verifiera dataramen som läses in på följande sätt:

df_vcore.printSchema()
display(df_vcore)

Nu ska vi se ett exempel:

df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
df_vcore.printSchema()
display(df_vcore)

Utdata:

SchemaSkärmbild av utskriftsschemat.

DataFrameSkärmbild av DataFrame-vyn.

Filtrera data från Azure Cosmos DB för MongoDB vCore

Den allmänna syntaxen ser ut så här:

df_v = df_vcore.filter(df_vcore[column number/column name] == [filter condition])
display(df_v)

Nu ska vi se ett exempel:

df_v = df_vcore.filter(df_vcore[2] == 1970)
display(df_v)

Utdata: Skärmbild av visa filtrerad dataram.

Skapa en vy eller tillfällig tabell och kör SQL-frågor mot den

Den allmänna syntaxen ser ut så här:

df_[dataframename].createOrReplaceTempView("[View Name]")
spark.sql("SELECT * FROM [View Name]")

Nu ska vi se ett exempel:

df_vcore.createOrReplaceTempView("T_VCORE")
df_v = spark.sql(" SELECT * FROM T_VCORE WHERE birth_year == 1970 and gender == 2 ")
display(df_v)

Utdata: Skärmbild av visad SQL-fråga.

Skriva data till Azure Cosmos DB för MongoDB vCore

Den allmänna syntaxen ser ut så här:

df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","<collection_name>").mode("append").save()

Nu ska vi se ett exempel:

df_vcore.write.format("mongo").option("spark.mongodb.output.uri", connectionString_vcore).option("database",database).option("collection","CitiBike2019").mode("append").save()

Det här kommandot har inte några utdata när det skrivs direkt till samlingen. Du kan verifiera om posten har uppdaterats med hjälp av ett läskommando.

Läsa data från Azure Cosmos DB for MongoDB vCore-samling som använder en aggregeringspipeline

Obs! Sammansättningspipeline är en kraftfull funktion som möjliggör förbearbetning och transformation av data i Azure Cosmos DB för MongoDB. Det är en bra matchning för realtidsanalys, instrumentpaneler, rapportgenerering med sammanslagningar, summor och medelvärden med data på serversidan efter bearbetning. (Obs! det finns en hel bok skriven om den).

Azure Cosmos DB for MongoDB har även stöd för omfattande sekundära/sammansatta index för att extrahera, filtrera och bearbeta endast de data som behövs.

Du kan till exempel analysera alla kunder som finns i ett visst geografiskt område direkt i databasen utan att först behöva läsa in den fullständiga datauppsättningen, minimera dataflytten och minska svarstiden.

Här är ett exempel på hur du använder aggregerad funktion:

pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).option("pipeline", pipeline).load()
display(df_vcore)

Utdata:

Skärmbild av Visa aggregerade data.

Följande artiklar visar hur du använder aggregeringspipelines i Azure Cosmos DB för MongoDB vCore: