Nota
L'accesso a questa pagina richiede l'autorizzazione. Puoi provare ad accedere o a cambiare directory.
L'accesso a questa pagina richiede l'autorizzazione. Puoi provare a cambiare directory.
Spark Connect è un protocollo basato su gRPC in Apache Spark che specifica come un'applicazione client può comunicare con un server Spark remoto. Consente l'esecuzione remota dei carichi di lavoro Spark usando l'API DataFrame.
Spark Connect viene usato nei modi seguenti:
- Notebook Scala con Databricks Runtime versione 13.3 e successive, su calcolo standard
- Notebook Python con Databricks Runtime versione 14.3 e successive, su risorse di calcolo standard
- Calcolo serverless (senza server)
- Databricks Connect - uno strumento per integrare Databricks con i tuoi ambienti di sviluppo preferiti.
Anche se Spark Connect e Spark Classic usano l'esecuzione differita per le trasformazioni, esistono differenze importanti per evitare problemi imprevisti di comportamento e prestazioni durante la migrazione di codice esistente da Spark Classic a Spark Connect o durante la scrittura di codice che deve funzionare con entrambi.
Valutazione ritardata vs valutazione immediata
La differenza principale tra Spark Connect e Spark Classic è che Spark Connect rinvia l'analisi e la risoluzione dei nomi al tempo di esecuzione, come riepilogato nella tabella seguente.
| Aspetto | Spark Classico | Spark Connect |
|---|---|---|
| Esecuzione della query | Pigro | Pigro |
| Analisi dello schema | Desideroso | Pigro |
| Accesso allo schema del database | Local | Attiva RPC e memorizza nella cache lo schema al primo accesso |
| Visualizzazioni temporanee | Pianificare l'incorporamento | Ricerca del nome |
| Serializzazione UDF | Al momento della creazione | In fase di esecuzione |
Esecuzione della query
Sia Spark Classic che Spark Connect seguono lo stesso modello di esecuzione lazy per le query.
Nella versione classica di Spark, le trasformazioni del dataframe (ad esempio filter e limit) sono lazy. Ciò significa che non vengono eseguiti immediatamente, ma codificati in un piano logico. Il calcolo effettivo viene attivato solo con un'azione ( ad esempio show(), collect()).
Spark Connect adotta un modello di valutazione differita simile. Le trasformazioni vengono costruite sul lato client e inviate come piani non risolti al server. Il server esegue quindi l'analisi e l'esecuzione necessarie quando viene chiamata un'azione.
| Aspetto | Spark Classico | Spark Connect |
|---|---|---|
Trasformazioni: df.filter(...), df.select(...), df.limit(...) |
Esecuzione differita | Esecuzione differita |
Query SQL: spark.sql("select …") |
Esecuzione differita | Esecuzione differita |
Azioni: df.collect(), df.show() |
Esecuzione con valutazione anticipata | Esecuzione con valutazione anticipata |
Comandi SQL: spark.sql("insert …"), spark.sql("create …") |
Esecuzione con valutazione anticipata | Esecuzione con valutazione anticipata |
Analisi dello schema
Spark Classic esegue l'analisi in modo immediato durante la costruzione di piani logici. Questa fase di analisi converte il piano non risolto in un piano logico completamente risolto e verifica che l'operazione possa essere eseguita da Spark. Uno dei vantaggi principali dell'esecuzione di questo lavoro con entusiasmo è che gli utenti ricevono feedback immediato quando viene commesso un errore. Ad esempio, l'esecuzione di spark.sql("select 1 as a, 2 as b").filter("c > 1") genererà immediatamente un errore, indicando che la colonna c non può essere trovata.
Spark Connect è diverso dal modello classico perché il client costruisce piani non risolti durante la trasformazione e ne impedisce l'analisi. Qualsiasi operazione che richiede un piano risolto, ad esempio l'accesso a uno schema, la spiegazione del piano, la persistenza di un dataframe o l'esecuzione di un'azione, fa sì che il client invii i piani non risolti al server tramite RPC. Il server esegue quindi l'analisi completa per ottenere il piano logico risolto ed eseguire l'operazione. Ad esempio, spark.sql("select 1 as a, 2 as b").filter("c > 1") non genererà alcun errore perché il piano non risolto è solo sul lato client, ma in df.columns o df.show() verrà generato un errore perché il piano non risolto viene inviato al server per l'analisi.
A differenza dell'esecuzione di query, Spark Classic e Spark Connect differiscono in caso di analisi dello schema.
| Aspetto | Spark Classico | Spark Connect |
|---|---|---|
Trasformazioni: df.filter(...), df.select(...), df.limit(...) |
Desideroso | Pigro |
Accesso allo schema: df.columns, df.schema, df.isStreaming |
Desideroso | Desideroso Attiva una richiesta RPC di analisi, a differenza della versione classica di Spark |
Azioni: df.collect(), df.show() |
Desideroso | Desideroso |
| Stato della sessione dipendente dei DataFrames: UDF (funzioni definite dall'utente), viste temporanee, configurazioni | Desideroso | Pigro Valutato durante l'esecuzione del piano del DataFrame |
| Stato della sessione dipendente delle viste temporanee: UDF, altre viste temporanee, configurazioni | Desideroso | Desideroso L'analisi viene attivata in modo ansioso durante la creazione della visualizzazione temporanea |
Procedure consigliate
La differenza tra l'analisi differita e l'analisi anticipata implica che esistono alcune pratiche migliori da seguire per evitare problemi imprevisti di comportamento e prestazioni, in particolare quelli causati dalla sovrascrittura dei nomi associati alle visualizzazioni temporanee, la cattura di variabili esterne nelle funzioni definite dall'utente, il rilevamento ritardato degli errori e l'accesso eccessivo allo schema sui nuovi DataFrames.
Creare nomi di visualizzazione temporanea univoci
In Spark Connect il dataframe archivia solo un riferimento alla visualizzazione temporanea in base al nome. Di conseguenza, se la visualizzazione temporanea viene sostituita in un secondo momento, i dati nel DataFrame cambieranno anche perché la visualizzazione viene cercata per nome al momento dell'esecuzione.
Questo comportamento è diverso da Spark Classico, in cui il piano logico della visualizzazione temporanea è incorporato nel piano del frame di dati al momento della creazione. Qualsiasi sostituzione successiva della visualizzazione temporanea non influisce sul frame di dati originale.
Per attenuare la differenza, creare sempre nomi di visualizzazione temporanea univoci. Ad esempio, includere un UUID nel nome della visualizzazione. In questo modo si evita di influire sui dataframe esistenti che fanno riferimento a una vista temporanea registrata in precedenza.
Pitone
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
Scala
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
Avvolgere le definizioni UDF
In genere è considerata una pratica scorretta per le funzioni definite dall'utente dipendere da variabili esterne modificabili, in quanto introduce dipendenze implicite, può portare a un comportamento non deterministico e riduce la composizione. Tuttavia, se si dispone di un modello di questo tipo, tenere presente la seguente trappola:
In Spark Connect le UDF (funzioni definite dall'utente) Python sono valutate in modo pigro. La serializzazione e la registrazione vengono posticipate fino al tempo di esecuzione. Nell'esempio seguente, la UDF viene serializzata e caricata nel cluster Spark per l'esecuzione solo quando show() viene chiamata.
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
Questo comportamento è diverso da Spark Classico, in cui le funzioni definite dall'utente vengono create con entusiasmo. In Spark Classic, il valore di x al momento della creazione della UDF viene acquisito, pertanto eventuali modifiche successive a x non influiscono sulla UDF già creata.
Se hai bisogno di modificare il valore delle variabili esterne da cui dipende una funzione definita dall'utente, usa una fabbrica di funzioni (chiusura con collegamento anticipato) per acquisire correttamente i valori delle variabili. In particolare, incapsulare la creazione di una funzione definita dall'utente in una funzione ausiliaria per acquisire il valore di una variabile dipendente.
Pitone
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
Scala
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
Facendo il wrapping della definizione UDF all'interno di un'altra funzione (make_udf), si crea un nuovo ambito nel quale il valore corrente di x viene passato come argomento. In questo modo ogni funzione definita dall'utente generata ha una propria copia del campo, associata al momento della creazione della funzione definita dall'utente.
Attivare l'analisi anticipata per il rilevamento degli errori
La gestione degli errori seguente è utile in Spark Classic perché esegue un'analisi anticipata, che consente di sollevare eccezioni tempestivamente. Tuttavia, in Spark Connect questo codice non causa alcun problema, poiché crea solo un piano locale non risolto senza attivare alcuna analisi.
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
Se il codice si basa sull'eccezione di analisi e si vuole intercettarlo, è possibile attivare l'analisi eager, ad esempio con df.columns, df.schemao df.collect().
Pitone
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
Scala
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
Evita troppe richieste di analisi anticipata
Le prestazioni possono essere migliorate se si evitano richieste di analisi su un numero elevato di dataframe.
Creazione di nuovi dataframe passo dopo passo e accesso al relativo schema in ogni iterazione
Quando si creano molti nuovi DataFrames, evitare l'eccesso di chiamate che attivano analisi 'eager' (ad esempio df.columns, df.schema). È possibile accedere allo schema dello stesso dataframe più volte, ma l'attivazione dell'analisi su molti dataframe appena creati influirà sulle prestazioni.
Ad esempio, quando si aggiungono in modo iterativo colonne a un dataframe all'interno di un ciclo e si verifica se ogni colonna esiste già prima di aggiungerla, la chiamata df.columns a ogni dataframe appena creato attiva una richiesta di analisi in ogni iterazione. Per evitare questo problema, mantenere un set per tenere traccia dei nomi delle colonne invece di accedere ripetutamente allo schema del dataframe.
Pitone
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
# if new_column_name not in df.columns: # Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if new_column_name not in columns: # Check the set without triggering analysis
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
Scala
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
// if (!df.columns.contains(newColumnName)) { // Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if (!columns.contains(newColumnName)) { // Check the set without triggering analysis
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
Evitare di accedere agli schemi per un numero elevato di dataframe intermedi
Un altro caso simile consiste nel creare un numero elevato di dataframe intermedi non necessari e analizzarli. Nel caso seguente, per estrarre i nomi dei campi da ogni colonna di un tipo struct, ottenere StructType le informazioni sul campo direttamente dallo schema del DataFrame anziché creare DataFrame intermedi.
Pitone
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
# column_schema.name: df.select(column_schema.name + ".*").columns # Bad practice. This creates an intermediate DataFrame and triggers an analysis request for each StructType column.
column_schema.name: [f.name for f in column_schema.dataType.fields] # Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
Scala
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
// field.name -> df.select(field.name + ".*").columns // Bad practice. This creates an intermediate DataFrame and triggers analysis for each StructType column.
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name) // Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
}
.toMap
println(structColumnFields)