Python UDF'er, Scala UDF'er og komplekse datatyper i den oprindelige eksekveringsmotor

Den indbyggede eksekveringsmotor i Microsoft Fabric understøtter nu Python brugerdefinerede funktioner (UDF'er), Scala UDF'er og komplekse datatyper (arrays, maps og structs). Disse funktioner gør det muligt at skrive udtryksfulde Spark-applikationer uden at gå på kompromis med ydeevnen.

Python UDF-understøttelse

Python er et af de mest populære sprog inden for data engineering og data science. Historisk set introducerede Python UDF'er betydelige overhead i Spark på grund af serialiseringsomkostninger mellem JVM- og Python-arbejdsprocesserne. Den native eksekveringsmotor minimerer disse dyre overgange og muliggør hurtigere eksekvering uden kodeændringer.

Hvordan Python UDF'er fungerer i den oprindelige eksekveringsmotor

I en konventionel Spark-eksekveringsmodel involverer Python UDF-eksekvering:

  1. Datakonvertering fra Sparks interne format.
  2. Serialisering og overførsel til Python-arbejdsprocesser.
  3. Python UDF-eksekvering.
  4. Serialisering af resultater tilbage i JVM.
  5. Spark genoptager udførelsen.

Denne kryds-runtime-bevægelse skaber serialiserings-/deserialiseringsomkostninger, CPU-ineffektivitet og ødelagte kolonne-udførelsespipelines. Den native eksekveringsmotor reducerer denne overhead ved at optimere dataoverførselsstien og opretholde vektoriseret behandling, hvor det er muligt.

Understøttede Python UDF-typer

Den oprindelige eksekveringsmotor understøtter:

  • Scalar UDFs: Række-for-række Python funktioner registreret med udf().
  • Vektoriserede (Pandas) UDF'er: Funktioner dekoreret med @pandas_udf databatches, der bruger Apache Arrow for effektiv overførsel.

Vektoriserede UDF'er opnår de største ydelsesgevinster, fordi de naturligt tilpasser sig den søjleformede behandlingsmodel i den native eksekveringsmotor.

Eksempel: Vectorized Python UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

@pandas_udf(DoubleType())
def calculate_discount(price: pd.Series, rate: pd.Series) -> pd.Series:
    return price * (1 - rate)

df = spark.table("sales.transactions")
result = df.withColumn("discounted_price", calculate_discount(df.price, df.discount_rate))
result.show()

Der kræves ingen yderligere konfiguration ud over at aktivere den oprindelige eksekveringsmotor. Eksisterende Python UDF'er drager automatisk fordel af det.

Scala UDF-understøttelse

Den native eksekveringsmotor accelererer også Scala UDF'er. Fordi Scala UDF'er kører native i JVM, kan motoren overføre understøttede operationer til den vektoriserede C++-eksekveringssti, samtidig med at Scala UDF-evalueringen holdes effektiv inden for samme kørsel.

Eksempel: Scala UDF

import org.apache.spark.sql.functions.udf

val toUpperCase = udf((s: String) => s.toUpperCase)
val df = spark.table("catalog.customers")
val result = df.withColumn("name_upper", toUpperCase(df("name")))
result.show()

Scala UDF'er, der kører på understøttede datatyper, accelereres uden kodeændringer, når den native eksekveringsmotor er aktiveret.

Understøttelse af komplekse datatyper

Moderne lakehouse-arkitekturer er afhængige af semistrukturerede og indlejrede data. Den native eksekveringsmotor leverer nu optimeret understøttelse for:

Datatyp Beskrivelse Eksempel på use case
Array Ordnet samling af elementer Begivenhedstags, produktkategorier
Kort Nøgleværdipar Konfigurationsegenskaber, metadata
Struct Navngivne felter med forskellige typer Indlejrede kundeposter, adresseobjekter

Operationer understøttet for komplekse typer

Den native eksekveringsmotor accelererer almindelige operationer på komplekse datatyper:

  • Arrayfunktioner: explode, array_contains, size, flatten, transform
  • Afbildningsfunktioner: map_keys, map_values, element_at
  • Struct-adgang: Punktnotationsfeltadgang, getField
  • Indlejrede kombinationer: Arrays af strukturer, kort med arrayværdier

Eksempel: Arbejde med arrays og strukturer

from pyspark.sql.functions import explode, col, size

# Read data with nested schema
df = spark.table("events.telemetry")

# Operations on arrays - accelerated by native engine
result = (df
    .filter(size(col("tags")) > 0)
    .select(
        col("event_id"),
        col("metadata.source"),  # Struct field access
        explode(col("tags")).alias("tag")
    )
)
result.show()

Eksempel: Arbejde med kort

from pyspark.sql.functions import map_keys, map_values, col

df = spark.table("config.settings")

# Map operations - accelerated by native engine
result = (df
    .select(
        col("setting_id"),
        map_keys(col("properties")).alias("keys"),
        map_values(col("properties")).alias("values")
    )
)
result.show()

Præstationsresultater

Intern benchmarking demonstrerer betydelige forbedringer på tværs af arbejdsbelastninger, der bruger Python UDF'er og komplekse datatyper:

Arbejdsbelastningstype Forbedring af ydeevnen
Vektoriserede Python UDF'er Op til 5,76 gange hurtigere
Skalar Python UDF'er Op til 1,08 gange hurtigere
TPC-DS ende-til-ende (med komplekse typer) Op til 2,35 gange hurtigere

Disse gevinster skyldes reduceret serialiseringsomkostninger, forbedret vektorisering og end-to-end kolonneudførelse.

Fordele ved avancerede lakehouse-mønstre

Acceleration af komplekse datatyper er særligt vigtig for:

  • Z-ORDER-optimering: Indlejrede kolonner deltager i det optimerede datalayout.
  • Væskeklyngedannelse: Komplekse type kolonner drager fordel af klyngedannelse uden udfladning.
  • Semistruktureret analyse: JSON-payloads og hændelsesstrømme forbliver indlejrede for naturlig forespørgsler.
  • Hændelsesdrevne arkitekturer: Telemetri og IoT-data bevarer deres hierarkiske struktur.

I stedet for at flade data ud eller omstrukturere pipelines for ydeevne, skal man arbejde naturligt med komplekse skemaer, samtidig med at man opretholder høj eksekveringseffektivitet.

Aktivere funktionen

Python UDF, Scala UDF og kompleks datatype understøttelse er tilgængelig, når den native eksekveringsmotor er aktiveret. Der er ikke behov for yderligere konfiguration.

For at aktivere den native eksekveringsmotor, se Native eksekveringsmotor for Fabric Data Engineering.

Forudsætninger

Begrænsninger

  • Ikke alle Python-biblioteker understøttes inden for den vektoriserede sti. Biblioteker, der kræver vilkårlig Python-objektserialisering, kan stadig udløse fallback.
  • Dybt indlejrede komplekse typer (for eksempel arrays af kort over strukturer) kan falde tilbage på JVM-motoren for visse operationer.
  • ANSI-tilstand understøttes ikke med den native eksekveringsmotor.