Ottenere i dati di streaming in lakehouse e accedere con l'endpoint di analisi SQL

Questa guida introduttiva illustra come creare una definizione di processo Spark che contiene codice Python con Spark Structured Streaming per trasferire i dati in un lakehouse e quindi usarla tramite un endpoint di analisi SQL. Dopo aver completato questa guida introduttiva, si avrà una definizione di processo Spark che viene eseguita continuamente e l'endpoint di analisi SQL può visualizzare i dati in ingresso.

Creare uno script Python

  1. Usare il codice Python seguente che usa lo streaming strutturato Spark per ottenere dati in una tabella lakehouse.

    import sys
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.appName("MyApp").getOrCreate()
    
        tableName = "streamingtable"
        deltaTablePath = "Tables/" + tableName
    
        df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
    
        query = df.writeStream.outputMode("append").format("delta").option("path", deltaTablePath).option("checkpointLocation", deltaTablePath + "/checkpoint").start()
        query.awaitTermination()
    
  2. Salvare lo script come file Python (con estensione py) nel computer locale.

Creare una lakehouse

Per creare un lakehouse, seguire questa procedura:

  1. In Microsoft Fabric selezionare l'esperienza di Ingegneria dei dati Synapse.

  2. Passare all'area di lavoro desiderata o crearne una nuova, se necessario.

  3. Per creare una lakehouse, selezionare l'icona Lakehouse sotto la sezione Nuovo nel riquadro principale.

    Screenshot showing new lakehouse dialog

  4. Immettere il nome del lakehouse e selezionare Crea.

Creare una definizione di processo Spark

Usare la procedura seguente per creare una definizione di processo Spark:

  1. Nella stessa area di lavoro in cui è stata creata una lakehouse selezionare l'icona Crea dal menu a sinistra.

  2. In "Ingegneria dei dati", selezionare Spark Job Definition (Definizione processo Spark).

    Screenshot showing new Spark Job Definition dialog

  3. Immettere il nome della definizione del processo Spark e selezionare Crea.

  4. Selezionare Carica e selezionare il file Python creato nel passaggio precedente.

  5. In Lakehouse Reference (Riferimento lakehouse) scegliere la lakehouse creata.

Impostare i criteri di ripetizione dei tentativi per la definizione del processo Spark

Usare la procedura seguente per impostare i criteri di ripetizione dei tentativi per la definizione del processo Spark:

  1. Dal menu in alto selezionare l'icona Impostazione .

    Screenshot showing Spark Job Definition settings icon

  2. Aprire la scheda Ottimizzazione e impostare Trigger criteri di ripetizione dei tentativi attivato.

    Screenshot showing Spark Job Definition optimization tab

  3. Definire il numero massimo di tentativi o selezionare Consenti tentativi illimitati.

  4. Specificare il tempo tra ogni tentativo di ripetizione e selezionare Applica.

Nota

Esiste un limite di durata di 90 giorni per la configurazione dei criteri di ripetizione. Dopo aver abilitato i criteri di ripetizione dei tentativi, il processo verrà riavviato in base ai criteri entro 90 giorni. Dopo questo periodo, il criterio di ripetizione dei tentativi smetterà di funzionare automaticamente e il processo verrà terminato. Gli utenti dovranno quindi riavviare manualmente il processo, che a sua volta riabiliterà il criterio di ripetizione dei tentativi.

Eseguire e monitorare la definizione del processo Spark

  1. Nel menu in alto selezionare l'icona Esegui .

    Screenshot showing Spark Job Definition run icon

  2. Verificare se la definizione del processo Spark è stata inviata correttamente ed è in esecuzione.

Visualizzare i dati usando un endpoint di analisi SQL

  1. Nella visualizzazione area di lavoro selezionare Lakehouse.

  2. Nell'angolo destro selezionare Lakehouse e selezionare Endpoint di analisi SQL.

  3. Nella visualizzazione endpoint di analisi SQL in Tabelle selezionare la tabella usata dallo script per trasferire i dati. È quindi possibile visualizzare in anteprima i dati dall'endpoint di analisi SQL.