Condividi tramite


Confronta Spark Connect e Spark Classic

Spark Connect è un protocollo basato su gRPC in Apache Spark che specifica come un'applicazione client può comunicare con un server Spark remoto. Consente l'esecuzione remota dei carichi di lavoro Spark usando l'API DataFrame.

Spark Connect viene usato nei modi seguenti:

  • Notebook Scala con Databricks Runtime versione 13.3 e successive, su calcolo standard
  • Notebook Python con Databricks Runtime versione 14.3 e successive, su risorse di calcolo standard
  • Calcolo serverless (senza server)
  • Databricks Connect - uno strumento per integrare Databricks con i tuoi ambienti di sviluppo preferiti.

Anche se Spark Connect e Spark Classic usano l'esecuzione differita per le trasformazioni, esistono differenze importanti per evitare problemi imprevisti di comportamento e prestazioni durante la migrazione di codice esistente da Spark Classic a Spark Connect o durante la scrittura di codice che deve funzionare con entrambi.

Lazy vs eager

La differenza principale tra Spark Connect e Spark Classic è che Spark Connect rinvia l'analisi e la risoluzione dei nomi al tempo di esecuzione, come riepilogato nella tabella seguente.

Aspetto Spark Classico Spark Connect
Esecuzione della query Pigro Pigro
Analisi dello schema Desideroso Pigro
Accesso allo schema del database Local Trigger RPC
Visualizzazioni temporanee Pianificare l'incorporamento Ricerca del nome
Serializzazione UDF Al momento della creazione In fase di esecuzione

Esecuzione della query

Sia Spark Classic che Spark Connect seguono lo stesso modello di esecuzione lazy per le query.

Nella versione classica di Spark, le trasformazioni del dataframe (ad esempio filter e limit) sono lazy. Ciò significa che non vengono eseguiti immediatamente, ma vengono registrati in un piano logico. Il calcolo effettivo viene attivato solo quando viene richiamata un'azione , ad esempio show(), collect().

Spark Connect adotta un modello di valutazione differita simile. Le trasformazioni vengono costruite sul lato client e inviate come piani proto non risolti al server. Il server esegue quindi l'analisi e l'esecuzione necessarie quando viene chiamata un'azione.

Aspetto Spark Classico Spark Connect
Trasformazioni: df.filter(...), df.select(...), df.limit(...) Esecuzione differita Esecuzione differita
Query SQL: spark.sql("select …") Esecuzione differita Esecuzione differita
Azioni: df.collect(), df.show() Esecuzione con valutazione anticipata Esecuzione con valutazione anticipata
Comandi SQL: spark.sql("insert …"), spark.sql("create …") Esecuzione con valutazione anticipata Esecuzione con valutazione anticipata

Analisi dello schema

Spark Classic esegue l'analisi dello schema in modo ansioso durante la fase di costruzione del piano logico. Quando si definiscono le trasformazioni, Spark analizza immediatamente lo schema del dataframe per assicurarsi che tutte le colonne e i tipi di dati a cui si fa riferimento siano validi. Ad esempio, l'esecuzione di spark.sql("select 1 as a, 2 as b").filter("c > 1") genererà immediatamente un errore, indicando che la colonna c non può essere trovata.

Spark Connect costruisce invece piani proto non risolti durante la trasformazione. Quando si accede a uno schema o si esegue un'azione, il client invia i piani non risolti al server tramite RPC (chiamata di procedura remota). Il server esegue quindi l'analisi e l'esecuzione. Questa progettazione rinvia l'analisi dello schema. Ad esempio, spark.sql("select 1 as a, 2 as b").filter("c > 1") non genererà alcun errore perché il piano non risolto è solo sul lato client, ma in df.columns o df.show() verrà generato un errore perché il piano non risolto viene inviato al server per l'analisi.

A differenza dell'esecuzione di query, Spark Classic e Spark Connect differiscono in caso di analisi dello schema.

Aspetto Spark Classico Spark Connect
Trasformazioni: df.filter(...), df.select(...), df.limit(...) Desideroso Pigro
Accesso allo schema: df.columns, df.schema, df.isStreaming Desideroso Desideroso
Attiva una richiesta RPC di analisi, a differenza della versione classica di Spark
Azioni: df.collect(), df.show() Desideroso Desideroso
Stato della sessione dipendente: UDF (funzioni definite dall'utente), viste temporanee, configurazioni Desideroso Pigro
Valutato durante l'esecuzione

Procedure consigliate

La differenza tra l'analisi differita e l'analisi anticipata implica che esistono alcune pratiche migliori da seguire per evitare problemi imprevisti di comportamento e prestazioni, in particolare quelli causati dalla sovrascrittura dei nomi associati alle visualizzazioni temporanee, la cattura di variabili esterne nelle funzioni definite dall'utente, il rilevamento ritardato degli errori e l'accesso eccessivo allo schema sui nuovi DataFrames.

Creare nomi di visualizzazione temporanea univoci

In Spark Connect il dataframe archivia solo un riferimento alla visualizzazione temporanea in base al nome. Di conseguenza, se la visualizzazione temporanea viene sostituita in un secondo momento, i dati nel DataFrame cambieranno anche perché la visualizzazione viene cercata per nome al momento dell'esecuzione.

Questo comportamento è diverso da Spark Classico, in cui il piano logico della visualizzazione temporanea è incorporato nel piano del frame di dati al momento della creazione. Qualsiasi sostituzione successiva della visualizzazione temporanea non influisce sul frame di dati originale.

Per attenuare la differenza, creare sempre nomi di visualizzazione temporanea univoci. Ad esempio, includere un UUID nel nome della visualizzazione. In questo modo si evita di influire sui dataframe esistenti che fanno riferimento a una vista temporanea registrata in precedenza.

Pitone

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

Avvolgere le definizioni UDF

In Spark Connect le UDF (funzioni definite dall'utente) Python sono valutate in modo pigro. La serializzazione e la registrazione vengono posticipate fino al tempo di esecuzione. Nell'esempio seguente, la UDF viene serializzata e caricata nel cluster Spark per l'esecuzione solo quando show() viene chiamata.

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x


df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

Questo comportamento è diverso da Spark Classico, in cui le funzioni definite dall'utente vengono create con entusiasmo. In Spark Classic, il valore di x al momento della creazione della UDF viene acquisito, pertanto eventuali modifiche successive a x non influiscono sulla UDF già creata.

Se hai bisogno di modificare il valore delle variabili esterne da cui dipende una funzione definita dall'utente, usa una fabbrica di funzioni (chiusura con collegamento anticipato) per acquisire correttamente i valori delle variabili. In particolare, incapsulare la creazione di una funzione definita dall'utente in una funzione ausiliaria per acquisire il valore di una variabile dipendente.

Pitone

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)


x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

Facendo il wrapping della definizione UDF all'interno di un'altra funzione (make_udf), si crea un nuovo ambito nel quale il valore corrente di x viene passato come argomento. In questo modo ogni funzione definita dall'utente generata ha una propria copia del campo, associata al momento della creazione della funzione definita dall'utente.

Attivare l'analisi anticipata per il rilevamento degli errori

La gestione degli errori seguente è utile in Spark Classic perché esegue un'analisi anticipata, che consente di sollevare eccezioni tempestivamente. Tuttavia, in Spark Connect questo codice non causa alcun problema, poiché crea solo un piano proto non risolto locale senza attivare alcuna analisi.

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

Se il codice si basa sull'eccezione di analisi e si vuole intercettarlo, è possibile attivare l'analisi eager, ad esempio con df.columns, df.schemao df.collect().

Pitone

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

Evita troppe richieste di analisi anticipata

Le prestazioni possono essere migliorate se si evita un numero elevato di richieste di analisi evitando un utilizzo eccessivo delle chiamate che attivano l'analisi eager (ad esempio df.columns, df.schema).

Se non è possibile evitare questo problema e controllare spesso le colonne di nuovi frame di dati, mantenere un set per tenere traccia dei nomi delle colonne per evitare le richieste di analisi.

Pitone

df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  if new_column_name not in columns: # Check the set
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()

Scala

import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  if (!columns.contains(newColumnName)) {
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

Un altro caso simile consiste nel creare un numero elevato di dataframe intermedi non necessari e analizzarli. In alternativa, ottenere StructType informazioni sul campo direttamente dallo schema del dataframe anziché creare dataframe intermedi.

Pitone

from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    column_schema.name: [f.name for f in column_schema.dataType.fields]
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)

Scala

import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
  }
  .toMap
println(structColumnFields)