Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Spark Connect è un protocollo basato su gRPC in Apache Spark che specifica come un'applicazione client può comunicare con un server Spark remoto. Consente l'esecuzione remota dei carichi di lavoro Spark usando l'API DataFrame.
Spark Connect viene usato nei modi seguenti:
- Notebook Scala con Databricks Runtime versione 13.3 e successive, su calcolo standard
- Notebook Python con Databricks Runtime versione 14.3 e successive, su risorse di calcolo standard
- Calcolo serverless (senza server)
- Databricks Connect - uno strumento per integrare Databricks con i tuoi ambienti di sviluppo preferiti.
Anche se Spark Connect e Spark Classic usano l'esecuzione differita per le trasformazioni, esistono differenze importanti per evitare problemi imprevisti di comportamento e prestazioni durante la migrazione di codice esistente da Spark Classic a Spark Connect o durante la scrittura di codice che deve funzionare con entrambi.
Lazy vs eager
La differenza principale tra Spark Connect e Spark Classic è che Spark Connect rinvia l'analisi e la risoluzione dei nomi al tempo di esecuzione, come riepilogato nella tabella seguente.
| Aspetto | Spark Classico | Spark Connect |
|---|---|---|
| Esecuzione della query | Pigro | Pigro |
| Analisi dello schema | Desideroso | Pigro |
| Accesso allo schema del database | Local | Trigger RPC |
| Visualizzazioni temporanee | Pianificare l'incorporamento | Ricerca del nome |
| Serializzazione UDF | Al momento della creazione | In fase di esecuzione |
Esecuzione della query
Sia Spark Classic che Spark Connect seguono lo stesso modello di esecuzione lazy per le query.
Nella versione classica di Spark, le trasformazioni del dataframe (ad esempio filter e limit) sono lazy. Ciò significa che non vengono eseguiti immediatamente, ma vengono registrati in un piano logico. Il calcolo effettivo viene attivato solo quando viene richiamata un'azione , ad esempio show(), collect().
Spark Connect adotta un modello di valutazione differita simile. Le trasformazioni vengono costruite sul lato client e inviate come piani proto non risolti al server. Il server esegue quindi l'analisi e l'esecuzione necessarie quando viene chiamata un'azione.
| Aspetto | Spark Classico | Spark Connect |
|---|---|---|
Trasformazioni: df.filter(...), df.select(...), df.limit(...) |
Esecuzione differita | Esecuzione differita |
Query SQL: spark.sql("select …") |
Esecuzione differita | Esecuzione differita |
Azioni: df.collect(), df.show() |
Esecuzione con valutazione anticipata | Esecuzione con valutazione anticipata |
Comandi SQL: spark.sql("insert …"), spark.sql("create …") |
Esecuzione con valutazione anticipata | Esecuzione con valutazione anticipata |
Analisi dello schema
Spark Classic esegue l'analisi dello schema in modo ansioso durante la fase di costruzione del piano logico. Quando si definiscono le trasformazioni, Spark analizza immediatamente lo schema del dataframe per assicurarsi che tutte le colonne e i tipi di dati a cui si fa riferimento siano validi. Ad esempio, l'esecuzione di spark.sql("select 1 as a, 2 as b").filter("c > 1") genererà immediatamente un errore, indicando che la colonna c non può essere trovata.
Spark Connect costruisce invece piani proto non risolti durante la trasformazione. Quando si accede a uno schema o si esegue un'azione, il client invia i piani non risolti al server tramite RPC (chiamata di procedura remota). Il server esegue quindi l'analisi e l'esecuzione. Questa progettazione rinvia l'analisi dello schema. Ad esempio, spark.sql("select 1 as a, 2 as b").filter("c > 1") non genererà alcun errore perché il piano non risolto è solo sul lato client, ma in df.columns o df.show() verrà generato un errore perché il piano non risolto viene inviato al server per l'analisi.
A differenza dell'esecuzione di query, Spark Classic e Spark Connect differiscono in caso di analisi dello schema.
| Aspetto | Spark Classico | Spark Connect |
|---|---|---|
Trasformazioni: df.filter(...), df.select(...), df.limit(...) |
Desideroso | Pigro |
Accesso allo schema: df.columns, df.schema, df.isStreaming |
Desideroso | Desideroso Attiva una richiesta RPC di analisi, a differenza della versione classica di Spark |
Azioni: df.collect(), df.show() |
Desideroso | Desideroso |
| Stato della sessione dipendente: UDF (funzioni definite dall'utente), viste temporanee, configurazioni | Desideroso | Pigro Valutato durante l'esecuzione |
Procedure consigliate
La differenza tra l'analisi differita e l'analisi anticipata implica che esistono alcune pratiche migliori da seguire per evitare problemi imprevisti di comportamento e prestazioni, in particolare quelli causati dalla sovrascrittura dei nomi associati alle visualizzazioni temporanee, la cattura di variabili esterne nelle funzioni definite dall'utente, il rilevamento ritardato degli errori e l'accesso eccessivo allo schema sui nuovi DataFrames.
Creare nomi di visualizzazione temporanea univoci
In Spark Connect il dataframe archivia solo un riferimento alla visualizzazione temporanea in base al nome. Di conseguenza, se la visualizzazione temporanea viene sostituita in un secondo momento, i dati nel DataFrame cambieranno anche perché la visualizzazione viene cercata per nome al momento dell'esecuzione.
Questo comportamento è diverso da Spark Classico, in cui il piano logico della visualizzazione temporanea è incorporato nel piano del frame di dati al momento della creazione. Qualsiasi sostituzione successiva della visualizzazione temporanea non influisce sul frame di dati originale.
Per attenuare la differenza, creare sempre nomi di visualizzazione temporanea univoci. Ad esempio, includere un UUID nel nome della visualizzazione. In questo modo si evita di influire sui dataframe esistenti che fanno riferimento a una vista temporanea registrata in precedenza.
Pitone
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
Scala
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
Avvolgere le definizioni UDF
In Spark Connect le UDF (funzioni definite dall'utente) Python sono valutate in modo pigro. La serializzazione e la registrazione vengono posticipate fino al tempo di esecuzione. Nell'esempio seguente, la UDF viene serializzata e caricata nel cluster Spark per l'esecuzione solo quando show() viene chiamata.
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
Questo comportamento è diverso da Spark Classico, in cui le funzioni definite dall'utente vengono create con entusiasmo. In Spark Classic, il valore di x al momento della creazione della UDF viene acquisito, pertanto eventuali modifiche successive a x non influiscono sulla UDF già creata.
Se hai bisogno di modificare il valore delle variabili esterne da cui dipende una funzione definita dall'utente, usa una fabbrica di funzioni (chiusura con collegamento anticipato) per acquisire correttamente i valori delle variabili. In particolare, incapsulare la creazione di una funzione definita dall'utente in una funzione ausiliaria per acquisire il valore di una variabile dipendente.
Pitone
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
Scala
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
Facendo il wrapping della definizione UDF all'interno di un'altra funzione (make_udf), si crea un nuovo ambito nel quale il valore corrente di x viene passato come argomento. In questo modo ogni funzione definita dall'utente generata ha una propria copia del campo, associata al momento della creazione della funzione definita dall'utente.
Attivare l'analisi anticipata per il rilevamento degli errori
La gestione degli errori seguente è utile in Spark Classic perché esegue un'analisi anticipata, che consente di sollevare eccezioni tempestivamente. Tuttavia, in Spark Connect questo codice non causa alcun problema, poiché crea solo un piano proto non risolto locale senza attivare alcuna analisi.
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
Se il codice si basa sull'eccezione di analisi e si vuole intercettarlo, è possibile attivare l'analisi eager, ad esempio con df.columns, df.schemao df.collect().
Pitone
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
Scala
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
Evita troppe richieste di analisi anticipata
Le prestazioni possono essere migliorate se si evita un numero elevato di richieste di analisi evitando un utilizzo eccessivo delle chiamate che attivano l'analisi eager (ad esempio df.columns, df.schema).
Se non è possibile evitare questo problema e controllare spesso le colonne di nuovi frame di dati, mantenere un set per tenere traccia dei nomi delle colonne per evitare le richieste di analisi.
Pitone
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
if new_column_name not in columns: # Check the set
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
Scala
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
if (!columns.contains(newColumnName)) {
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
Un altro caso simile consiste nel creare un numero elevato di dataframe intermedi non necessari e analizzarli. In alternativa, ottenere StructType informazioni sul campo direttamente dallo schema del dataframe anziché creare dataframe intermedi.
Pitone
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
column_schema.name: [f.name for f in column_schema.dataType.fields]
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
Scala
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
}
.toMap
println(structColumnFields)