Tutorial: K-means clustering

Important

This feature is in Public Preview.

This tutorial shows how to build a Python user-defined table function (UDTF) operator for Lakeflow Designer that runs K-means clustering with scikit-learn. UDTFs are well-suited to machine learning tasks that process entire datasets. For background on user-defined operators, see User-defined operators in Lakeflow Designer.

Overview

This tutorial steps you through creating a UDTF user-defined operator using Python. The operator performs K-Means clustering on selected columns, allowing users to:

  • Choose which columns to use as features.
  • Specify the number of clusters.
  • Get back a table with cluster assignments for each row.

Step 1: Understand the UDTF handler pattern

A UDTF is implemented as a Python class with two key methods:

  • eval(row, ...) — Called for each input row to accumulate data
  • terminate() — Called after all rows are processed to yield results

This pattern allows the UDTF to:

  1. Collect all data points during eval() calls
  2. Train the K-Means model in terminate()
  3. Yield clustered results row by row
class SklearnKMeans:
    def __init__(self):
        self.id_col = None
        self.feature_cols = None
        self.k = None
        self.rows = []
        self.features = []

    def eval(self, row, id_column, columns, k):
        """Called one time per input row - accumulate data here."""
        # Initialize configuration on first row
        if self.id_col is None:
            self.id_col = id_column
        if self.feature_cols is None:
            self.feature_cols = columns
        if self.k is None:
            self.k = max(1, int(k))

        # Convert row to dictionary and store
        row_dict = row.asDict(recursive=False)
        self.rows.append(row_dict)

        # Extract numeric features
        feats = []
        for c in self.feature_cols:
            v = row_dict.get(c)
            if v is None:
                v = 0.0
            feats.append(float(v))
        self.features.append(feats)

    def terminate(self):
        """Called after all rows - train model and yield results."""
        import numpy as np
        from sklearn.cluster import KMeans

        if not self.rows:
            return

        X = np.asarray(self.features, dtype=float)
        n_samples = X.shape[0]
        n_clusters = min(self.k, n_samples)

        model = KMeans(
            n_clusters=n_clusters,
            n_init=10,
            random_state=42
        )
        labels = model.fit_predict(X)

        # Yield results row by row
        for row_dict, label in zip(self.rows, labels):
            yield str(row_dict[self.id_col]), int(label)

Note

The row parameter in eval() is a PySpark Row object. Use .asDict() to convert it to a dictionary for easier access.

Step 2: Create the YAML for the operator

The YAML configuration defines how the operator appears in Lakeflow Designer. For this operator:

  • Number parameter (k): Number of clusters to create
  • Select widget (id_column): Dropdown populated with columns from input table
  • Multi-select widget (columns): Multiple feature columns selection
  • optionsSource: Automatically populates dropdowns from input table schema
  • Input port: Specifies this operator accepts tabular data
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: '1.0.0'
description: Perform K-Means clustering on selected columns
config:
  type: object
  properties:
    k:
      type: number
      title: Number of Clusters
      default: 3
      minimum: 1
      maximum: 100
      x-ui:
        widget: number
    id_column:
      type: string
      title: ID Column
      x-ui:
        widget: select
        optionsSource:
          type: inputColumns
          port: input_data
    columns:
      type: array
      items:
        type: string
      title: Feature Columns
      x-ui:
        widget: multi-select
        optionsSource:
          type: inputColumns
          port: input_data
  required:
    - k
    - id_column
    - columns
  additionalProperties: false
ports:
  input:
    - name: input_data
      title: Input Data
  output:
    - name: output
      title: Clustered Data

See User-defined operator YAML reference for a comprehensive guide to all available properties, data types, widgets, and options.

Step 3: Create the Unity Catalog function

Combine the YAML configuration and Python handler class into a single CREATE FUNCTION statement.

CREATE OR REPLACE FUNCTION main.my_schema.k_means(
    input_data TABLE,
    id_column STRING,
    columns ARRAY<STRING>,
    k INT
)
RETURNS TABLE (
    id STRING,
    cluster_id INT
)
LANGUAGE PYTHON
HANDLER 'SklearnKMeans'
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: "1.0.0"
description: Perform K-Means clustering on selected columns
config:
  type: object
  properties:
    k:
      type: number
      title: Number of Clusters
      default: 3
      minimum: 1
      maximum: 100
      x-ui:
        widget: number
    id_column:
      type: string
      title: ID Column
      x-ui:
        widget: select
        optionsSource:
          type: inputColumns
          port: input_data
    columns:
      type: array
      items:
        type: string
      title: Feature Columns
      x-ui:
        widget: multi-select
        optionsSource:
          type: inputColumns
          port: input_data
  required:
    - k
    - id_column
    - columns
  additionalProperties: false
ports:
  input:
    - name: input_data
      title: Input Data
  output:
    - name: output
      title: Clustered Data
"""

class SklearnKMeans:
    def __init__(self):
        self.id_col = None
        self.feature_cols = None
        self.k = None
        self.rows = []
        self.features = []

    def eval(self, row, id_column, columns, k):
        if self.id_col is None:
            self.id_col = id_column
        if self.feature_cols is None:
            self.feature_cols = columns
        if self.k is None:
            self.k = max(1, int(k))

        row_dict = row.asDict(recursive=False)
        self.rows.append(row_dict)

        feats = []
        for c in self.feature_cols:
            v = row_dict.get(c)
            if v is None:
                v = 0.0
            feats.append(float(v))
        self.features.append(feats)

    def terminate(self):
        import numpy as np
        from sklearn.cluster import KMeans

        if not self.rows:
            return

        X = np.asarray(self.features, dtype=float)
        n_samples = X.shape[0]
        n_clusters = min(self.k, n_samples)

        model = KMeans(
            n_clusters=n_clusters,
            n_init=10,
            random_state=42
        )
        labels = model.fit_predict(X)

        for row_dict, label in zip(self.rows, labels):
            yield str(row_dict[self.id_col]), int(label)
$$

Step 4: Test with sample data

Create sample customer data for testing:

-- Create sample customer data
CREATE OR REPLACE TEMP VIEW customers AS
SELECT * FROM VALUES
    ('C001', 25, 35000, 20),
    ('C002', 45, 85000, 80),
    ('C003', 35, 55000, 50),
    ('C004', 50, 95000, 90),
    ('C005', 23, 30000, 15),
    ('C006', 40, 75000, 70),
    ('C007', 60, 100000, 95),
    ('C008', 30, 45000, 40)
AS t(customer_id, age, annual_income, spending_score);

Test the K-Means UDTF:

-- Run K-Means clustering with 3 clusters
SELECT * FROM main.my_schema.k_means(
    input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
    k => 3,
    id_column => 'customer_id',
    columns => array('age', 'annual_income', 'spending_score')
)

In this case, you want to join the clustering results back with original data to see cluster assignments:

-- Join cluster results with original data
SELECT
  c.*,
  k.cluster_id
FROM customers c
INNER JOIN main.my_schema.k_means(
    input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
    k => 3,
    id_column => 'customer_id',
    columns => array('age', 'annual_income', 'spending_score')
) k
ON c.customer_id = k.id
ORDER BY k.cluster_id, c.customer_id

Step 5: Register the operator

To use the operator in Lakeflow Designer, you must register it, by adding it to your .user_defined_operators.yaml file:

operators:
  - catalog: main
    schema: my_schema
    functionName: k_means

Note

If you define this file in your user folder, it only appears for you. For more information, see Make your operator discoverable.

Step 6: Set up permissions

Grant access to users who need to use this operator:

GRANT USE SCHEMA ON SCHEMA main.my_schema TO `<user>`;
GRANT EXECUTE ON FUNCTION main.my_schema.k_means TO `<user>`;

Using the operator in Lakeflow Designer

After it's registered, the operator will appear in Lakeflow Designer with:

  • An input port to connect your data source
  • A dropdown to select which column uniquely identifies rows
  • A multi-select to choose which columns to use as clustering features
  • A number input for the desired number of clusters

Users can segment customers, products, or any other data into meaningful groups without writing code.

Tips for building UDTFs

  1. Initialize state in __init__ — Set up empty lists/variables to accumulate data
  2. Accumulate in eval — Don't process yet, just collect data
  3. Process in terminate — This is where the real work happens
  4. Use yield to return rows — Return results one at a time from terminate
  5. Handle edge cases — What if there are fewer rows than clusters?
  6. Keep types explicit — UDTF returns cannot reference input types