Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo illustra come connettersi ad Azure DocumentDB da Azure Databricks per eseguire operazioni comuni sui dati usando Python e Spark. È possibile configurare le dipendenze necessarie, stabilire una connessione ed eseguire operazioni di lettura, scrittura, filtro e aggregazione con il connettore Spark MongoDB.
Prerequisiti
Una sottoscrizione di Azure
- Se non hai un abbonamento Azure, crea un account gratuito
Un cluster Di Azure DocumentDB esistente
- Se non si ha un cluster, creare un nuovo cluster
Firewall configurato per consentire l'accesso dai servizi di Azure
Ambiente Spark in Azure Databricks
- Connettore MongoDB Spark compatibile con Spark 3.2.1 o versioni successive (disponibile alle coordinate Maven
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1)
- Connettore MongoDB Spark compatibile con Spark 3.2.1 o versioni successive (disponibile alle coordinate Maven
Configurare l'area di lavoro di Azure Databricks
Configurare l'area di lavoro di Azure Databricks per connettersi ad Azure DocumentDB. Aggiungere il connettore MongoDB della libreria Spark all'ambiente di calcolo per abilitare la connettività ad Azure DocumentDB.
Passare all'area di lavoro di Azure Databricks.
Configurare il calcolo predefinito disponibile o creare una nuova risorsa di calcolo per eseguire il notebook.
Selezionare un runtime di Databricks che supporta almeno Spark 3.0.
Nella risorsa di calcolo selezionare Librerie>Installa nuovo>Maven.
Aggiungere le coordinate Maven:
org.mongodb.spark:mongo-spark-connector_2.12:3.0.1Selezionare Installa.
Riavviare il calcolo al termine dell'installazione.
Configurare le impostazioni di connessione
Configurare Spark per usare la stringa di connessione di Azure DocumentDB per tutte le operazioni di lettura e scrittura.
Nel portale di Azure passare alla risorsa di Azure DocumentDB.
In Impostazioni>Stringhe di connessione copiare la stringa di connessione. Ha il formato :
mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.comIn Azure Databricks passare alla configurazione di calcolo e selezionare Opzioni avanzate (nella parte inferiore della pagina).
Aggiungere le variabili di configurazione spark seguenti:
-
spark.mongodb.output.uri- Incollare la stringa di connessione -
spark.mongodb.input.uri- Incollare la stringa di connessione
-
Salvare la configurazione.
In alternativa, è possibile impostare la stringa di connessione direttamente nel codice usando il metodo .option() durante la lettura o la scrittura dei dati.
Creare un notebook Python
Eseguire le operazioni di dati creando un nuovo notebook Python.
Nell'area di lavoro di Azure Databricks creare un nuovo notebook Python.
Definire le variabili di connessione all'inizio del notebook:
connectionString = "mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000" database = "<database_name>" collection = "<collection_name>"Sostituire i valori segnaposto con il nome effettivo del database e il nome della raccolta.
Leggere i dati dalla raccolta
Legge i dati dalla raccolta di Azure DocumentDB in un dataframe Spark.
Usare il codice seguente per caricare i dati dalla raccolta:
df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).load()Verificare che i dati siano stati caricati correttamente:
df.printSchema() display(df)Osservare il risultato. Questo codice crea un dataframe contenente tutti i documenti della raccolta specificata e visualizza lo schema e i dati.
Filtrare i dati
Applicare filtri per recuperare specifici subset di dati dalla raccolta.
Usare il metodo DataFrame
filter()per applicare le condizioni:df_filtered = df.filter(df["birth_year"] == 1970) display(df_filtered)Usare i numeri di indice di colonna:
df_filtered = df.filter(df[2] == 1970) display(df_filtered)Osservare il risultato. Questo approccio restituisce solo i documenti che soddisfano i criteri di filtro.
Eseguire query sui dati con SQL
Creare viste temporanee ed eseguire query SQL sui dati per un'analisi familiare basata su SQL.
Creare una visualizzazione temporanea dal dataframe:
df.createOrReplaceTempView("T")Eseguire query SQL sulla vista:
df_result = spark.sql("SELECT * FROM T WHERE birth_year == 1970 AND gender == 2") display(df_result)Osservare il risultato. Questo approccio consente di usare la sintassi SQL standard per query e join complessi.
Scrivere dati nella raccolta
Salvare dati nuovi o modificati scrivendo di nuovo i dataframe nelle raccolte di Azure DocumentDB.
Usare il codice seguente per scrivere dati in una raccolta:
df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database", database).option("collection", "CitiBike2019").mode("append").save()L'operazione di scrittura viene completata senza output. Verificare che l'operazione di scrittura sia stata completata correttamente leggendo i dati dalla raccolta:
df_verify = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", "CitiBike2019").load() display(df_verify)Suggerimento
Usare modalità di scrittura diverse come
append,overwriteoignorea seconda dei requisiti.
Eseguire pipeline di aggregazione
Eseguire pipeline di aggregazione per eseguire l'elaborazione e l'analisi dei dati sul lato server direttamente all'interno di Azure DocumentDB. Le pipeline di aggregazione consentono trasformazioni avanzate dei dati, raggruppamento e calcoli senza spostare i dati dal database. Sono ideali per l'analisi, i dashboard e la generazione di report in tempo reale.
Definisci la pipeline di aggregazione come stringa JSON:
pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"Eseguire la pipeline e caricare i risultati:
df_aggregated = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).option("pipeline", pipeline).load() display(df_aggregated)
Contenuti correlati
- Maven central - Versioni del connettore MongoDB Spark
- Aggregazioni MongoDB pratiche - Guida alle pipeline di aggregazione
- Configurare le impostazioni del firewall