Can not use pydantic objects in UDF

Freia Vercruysse 31 Reputation points
2023-02-08T08:59:45.3666667+00:00

When trying to use a pydantic object inside a UDF, I get the following error message;

PicklingError: Can't pickle <cyfunction int_validator at 0x7f1aa626dc70>: it's not the same object as pydantic.validators.int_validator

I use the following code (pyspark):

from pydantic import BaseModel
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.types import StringType


data = [
    Row(zip_code='58542', dma='MIN'),
    Row(zip_code='58701', dma='MIN'),
    Row(zip_code='57632', dma='MIN')
]
df = spark.createDataFrame(data)

class TestClass(BaseModel):
    name: int = 0


@F.udf(StringType())
def udf_test(dossier):
    test = TestClass()
    return "test"

df.withColumn("test", udf_test(df['zip_code'])).show()

I get the same error message when trying to pickle a Pydantic object using cloupickle, but not using pickle:

from pyspark import cloudpickle
import pydantic
import pickle

class Bar(pydantic.BaseModel):
    a: int

p1 = pickle.loads(pickle.dumps(Bar(a=1))) # This works well
print(f"p1: {p1}")

p2 = cloudpickle.loads(cloudpickle.dumps(Bar(a=1))) # This fails with the error below
print(f"p2: {p2}")

Is there any way to change the serializer that's used for a UDF?

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,365 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,917 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Freia Vercruysse 31 Reputation points
    2023-02-24T09:54:45.48+00:00

    I received the solution from Microsoft Support:

    1. Create Repos
    2. Add the Bar class in a separate python file test.py
       from pydantic import BaseModel
       
       
       class Bar(BaseModel):
           a: int = 0
       
    

    3.Create a new notebook in the repos and try importing the class from the python file which is already defined.

       from pydantic import BaseModel
       from pyspark.sql import functions as F
       from pyspark.sql import Row
       from pyspark.sql.types import StringType
       from test import Bar
       
       
       data = [
           Row(zip_code='58542', dma='MIN'),
           Row(zip_code='58701', dma='MIN'),
           Row(zip_code='57632', dma='MIN')
       ]
       df = spark.createDataFrame(data)
       
       
       @F.udf(StringType())
       def udf_test(dossier):
           test = Bar()
           return "test"
       
       df.withColumn("test", udf_test(df['zip_code'])).show()