Delen via


Verbinding maken met Azure Cosmos DB voor MongoDB vCore vanuit Azure Databricks

In dit artikel wordt uitgelegd hoe u Azure Cosmos DB MongoDB vCore verbindt vanuit Azure Databricks. Het begeleidt eenvoudige DML-bewerkingen (Data Manipulation Language), zoals Lezen, Filteren, SQLs, Aggregatiepijplijnen en Tabellen schrijven met behulp van Python-code.

Vereisten

Afhankelijkheden configureren voor connectiviteit

Hier volgen de afhankelijkheden die nodig zijn om vanuit Azure Databricks verbinding te maken met Azure Cosmos DB voor MongoDB vCore:

  • Spark-connector voor MongoDB Spark-connector wordt gebruikt om verbinding te maken met Azure Cosmos DB voor MongoDB vCore. Identificeer en gebruik de versie van de connector in Maven Central die compatibel is met de Spark- en Scala-versies van uw Spark-omgeving. We raden een omgeving aan die Ondersteuning biedt voor Spark 3.2.1 of hoger en de Spark-connector die beschikbaar is op maven-coördinaten org.mongodb.spark:mongo-spark-connector_2.12:3.0.1.

  • Azure Cosmos DB voor MongoDB-verbindingsreeks s: uw Azure Cosmos DB voor MongoDB vCore-verbindingsreeks, gebruikersnaam en wachtwoorden.

Een Azure Databricks-cluster inrichten

U kunt de instructies volgen voor het inrichten van een Azure Databricks-cluster. U wordt aangeraden Databricks Runtime versie 7.6 te selecteren, die ondersteuning biedt voor Spark 3.0.

Diagram van het maken van een nieuw databricks-cluster.

Afhankelijkheden toevoegen

Voeg de MongoDB-connector voor Spark-bibliotheek toe aan uw cluster om verbinding te maken met zowel systeemeigen MongoDB- als Azure Cosmos DB voor MongoDB-eindpunten. Selecteer In uw cluster Bibliotheken>installeren nieuwe>Maven en voeg vervolgens Maven-coördinaten toe.org.mongodb.spark:mongo-spark-connector_2.12:3.0.1

Diagram van het toevoegen van databricks-clusterafhankelijkheden.

Selecteer Installeren en start het cluster opnieuw wanneer de installatie is voltooid.

Notitie

Zorg ervoor dat u het Databricks-cluster opnieuw start nadat de MongoDB-connector voor Spark-bibliotheek is geïnstalleerd.

Daarna kunt u een Scala- of Python-notebook maken voor migratie.

Python-notebook maken om verbinding te maken met Azure Cosmos DB voor MongoDB vCore

Maak een Python-notebook in Databricks. Zorg ervoor dat u de juiste waarden voor de variabelen invoert voordat u de volgende codes uitvoert.

Spark-configuratie bijwerken met de Azure Cosmos DB voor MongoDB-verbindingsreeks

  1. Noteer de verbindingsreeks onder de instellingen-verbindingsreeksen> in Azure Cosmos DB MongoDB vCore-resource in Azure Portal. Het heeft de vorm van 'mongodb+srv://<user>:<password>@<database_name.mongocluster.cosmos.azure.com>'
  2. Plak in Databricks in uw clusterconfiguratie onder Geavanceerde opties (onder aan de pagina) de verbindingsreeks voor zowel de spark.mongodb.output.uri als spark.mongodb.input.uri de variabelen. Vul het veld gebruikersnaam en wachtwoord in dat geschikt is. Op deze manier worden alle werkmappen, die op het cluster worden uitgevoerd, deze configuratie gebruikt.
  3. U kunt ook option expliciet instellen wanneer u API's aanroept, zoals: spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load(). Als u de variabelen in het cluster configureert, hoeft u de optie niet in te stellen.
connectionString_vcore="mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
database="<database_name>"
collection="<collection_name>"

Gegevensvoorbeeldenset

Voor dit lab gebruiken we de CSV-gegevensset Citibike2019. U kunt het importeren: CitiBike Trip History 2019. We hebben het geladen in een database met de naam CitiBikeDB en de collectie CitiBike2019. We stellen de variabelendatabase en verzameling in om te verwijzen naar de gegevens die zijn geladen en we gebruiken variabelen in de voorbeelden.

database="CitiBikeDB"
collection="CitiBike2019"

Gegevens lezen uit Azure Cosmos DB voor MongoDB vCore

De algemene syntaxis ziet er als volgt uit:

df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()

U kunt het gegevensframe als volgt valideren:

df_vcore.printSchema()
display(df_vcore)

Laten we een voorbeeld bekijken:

df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).load()
df_vcore.printSchema()
display(df_vcore)

Uitvoer:

SchemaSchermopname van het afdrukschema.

DataFrameSchermopname van het Display DataFrame.

Gegevens filteren uit Azure Cosmos DB voor MongoDB vCore

De algemene syntaxis ziet er als volgt uit:

df_v = df_vcore.filter(df_vcore[column number/column name] == [filter condition])
display(df_v)

Laten we een voorbeeld bekijken:

df_v = df_vcore.filter(df_vcore[2] == 1970)
display(df_v)

Uitvoer: Schermopname van het gefilterde dataframe weergeven.

Een weergave of tijdelijke tabel maken en ER SQL-query's op uitvoeren

De algemene syntaxis ziet er als volgt uit:

df_[dataframename].createOrReplaceTempView("[View Name]")
spark.sql("SELECT * FROM [View Name]")

Laten we een voorbeeld bekijken:

df_vcore.createOrReplaceTempView("T_VCORE")
df_v = spark.sql(" SELECT * FROM T_VCORE WHERE birth_year == 1970 and gender == 2 ")
display(df_v)

Uitvoer: Schermopname van de SQL-query weergeven.

Gegevens schrijven naar Azure Cosmos DB voor MongoDB vCore

De algemene syntaxis ziet er als volgt uit:

df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","<collection_name>").mode("append").save()

Laten we een voorbeeld bekijken:

df_vcore.write.format("mongo").option("spark.mongodb.output.uri", connectionString_vcore).option("database",database).option("collection","CitiBike2019").mode("append").save()

Deze opdracht heeft geen uitvoer omdat deze rechtstreeks naar de verzameling schrijft. U kunt kruislings controleren of de record is bijgewerkt met behulp van een leesopdracht.

Gegevens lezen uit azure Cosmos DB voor MongoDB vCore-verzameling met een aggregatiepijplijn

[! Opmerking] Aggregatiepijplijn is een krachtige mogelijkheid waarmee gegevens in Azure Cosmos DB voor MongoDB vooraf kunnen worden verwerkt en getransformeerd. Het is een uitstekende match voor realtime analyses, dashboards, het genereren van rapporten met roll-ups, somt & gemiddelden met 'serverzijde' gegevens na verwerking. (Opmerking: er is een heel boek over geschreven).

Azure Cosmos DB voor MongoDB ondersteunt zelfs uitgebreide secundaire/samengestelde indexen voor het extraheren, filteren en verwerken van alleen de gegevens die nodig zijn.

Als u bijvoorbeeld alle klanten in een specifieke geografie rechtstreeks in de database analyseert zonder eerst de volledige gegevensset te hoeven laden, waardoor gegevensverplaatsing wordt geminimaliseerd en de latentie wordt verminderd.

Hier volgt een voorbeeld van het gebruik van de statistische functie:

pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
df_vcore = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString_vcore).option("collection",collection).option("pipeline", pipeline).load()
display(df_vcore)

Uitvoer:

Schermopname van de statistische gegevens weergeven.

De volgende artikelen laten zien hoe u aggregatiepijplijnen gebruikt in Azure Cosmos DB voor MongoDB vCore: