Condividi tramite


Connettersi ad Azure DocumentDB da Azure Databricks

Questo articolo illustra come connettersi ad Azure DocumentDB da Azure Databricks per eseguire operazioni comuni sui dati usando Python e Spark. È possibile configurare le dipendenze necessarie, stabilire una connessione ed eseguire operazioni di lettura, scrittura, filtro e aggregazione con il connettore Spark MongoDB.

Prerequisiti

  • Una sottoscrizione di Azure

  • Un cluster Di Azure DocumentDB esistente

Configurare l'area di lavoro di Azure Databricks

Configurare l'area di lavoro di Azure Databricks per connettersi ad Azure DocumentDB. Aggiungere il connettore MongoDB della libreria Spark all'ambiente di calcolo per abilitare la connettività ad Azure DocumentDB.

  1. Passare all'area di lavoro di Azure Databricks.

  2. Configurare il calcolo predefinito disponibile o creare una nuova risorsa di calcolo per eseguire il notebook.

  3. Selezionare un runtime di Databricks che supporta almeno Spark 3.0.

  4. Nella risorsa di calcolo selezionare Librerie>Installa nuovo>Maven.

  5. Aggiungere le coordinate Maven: org.mongodb.spark:mongo-spark-connector_2.12:3.0.1

  6. Selezionare Installa.

  7. Riavviare il calcolo al termine dell'installazione.

Configurare le impostazioni di connessione

Configurare Spark per usare la stringa di connessione di Azure DocumentDB per tutte le operazioni di lettura e scrittura.

  1. Nel portale di Azure passare alla risorsa di Azure DocumentDB.

  2. In Impostazioni>Stringhe di connessione copiare la stringa di connessione. Ha il formato : mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com

  3. In Azure Databricks passare alla configurazione di calcolo e selezionare Opzioni avanzate (nella parte inferiore della pagina).

  4. Aggiungere le variabili di configurazione spark seguenti:

    • spark.mongodb.output.uri - Incollare la stringa di connessione
    • spark.mongodb.input.uri - Incollare la stringa di connessione
  5. Salvare la configurazione.

In alternativa, è possibile impostare la stringa di connessione direttamente nel codice usando il metodo .option() durante la lettura o la scrittura dei dati.

Creare un notebook Python

Eseguire le operazioni di dati creando un nuovo notebook Python.

  1. Nell'area di lavoro di Azure Databricks creare un nuovo notebook Python.

  2. Definire le variabili di connessione all'inizio del notebook:

    connectionString = "mongodb+srv://<user>:<password>@<database_name>.mongocluster.cosmos.azure.com/?tls=true&authMechanism=SCRAM-SHA-256&retrywrites=false&maxIdleTimeMS=120000"
    database = "<database_name>"
    collection = "<collection_name>"
    
  3. Sostituire i valori segnaposto con il nome effettivo del database e il nome della raccolta.

Leggere i dati dalla raccolta

Legge i dati dalla raccolta di Azure DocumentDB in un dataframe Spark.

  1. Usare il codice seguente per caricare i dati dalla raccolta:

    df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).load()
    
  2. Verificare che i dati siano stati caricati correttamente:

    df.printSchema()
    display(df)
    
  3. Osservare il risultato. Questo codice crea un dataframe contenente tutti i documenti della raccolta specificata e visualizza lo schema e i dati.

Filtrare i dati

Applicare filtri per recuperare specifici subset di dati dalla raccolta.

  1. Usare il metodo DataFrame filter() per applicare le condizioni:

    df_filtered = df.filter(df["birth_year"] == 1970)
    display(df_filtered)
    
  2. Usare i numeri di indice di colonna:

    df_filtered = df.filter(df[2] == 1970)
    display(df_filtered)
    
  3. Osservare il risultato. Questo approccio restituisce solo i documenti che soddisfano i criteri di filtro.

Eseguire query sui dati con SQL

Creare viste temporanee ed eseguire query SQL sui dati per un'analisi familiare basata su SQL.

  1. Creare una visualizzazione temporanea dal dataframe:

    df.createOrReplaceTempView("T")
    
  2. Eseguire query SQL sulla vista:

    df_result = spark.sql("SELECT * FROM T WHERE birth_year == 1970 AND gender == 2")
    display(df_result)
    
  3. Osservare il risultato. Questo approccio consente di usare la sintassi SQL standard per query e join complessi.

Scrivere dati nella raccolta

Salvare dati nuovi o modificati scrivendo di nuovo i dataframe nelle raccolte di Azure DocumentDB.

  1. Usare il codice seguente per scrivere dati in una raccolta:

    df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database", database).option("collection", "CitiBike2019").mode("append").save()
    
  2. L'operazione di scrittura viene completata senza output. Verificare che l'operazione di scrittura sia stata completata correttamente leggendo i dati dalla raccolta:

    df_verify = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", "CitiBike2019").load()
    display(df_verify)
    

    Suggerimento

    Usare modalità di scrittura diverse come append, overwrite o ignore a seconda dei requisiti.

Eseguire pipeline di aggregazione

Eseguire pipeline di aggregazione per eseguire l'elaborazione e l'analisi dei dati sul lato server direttamente all'interno di Azure DocumentDB. Le pipeline di aggregazione consentono trasformazioni avanzate dei dati, raggruppamento e calcoli senza spostare i dati dal database. Sono ideali per l'analisi, i dashboard e la generazione di report in tempo reale.

  1. Definisci la pipeline di aggregazione come stringa JSON:

    pipeline = "[{ $group : { _id : '$birth_year', totaldocs : { $count : 1 }, totalduration: {$sum: '$tripduration'}} }]"
    
  2. Eseguire la pipeline e caricare i risultati:

    df_aggregated = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection", collection).option("pipeline", pipeline).load()
    display(df_aggregated)