Operadores definidos pelo usuário no Lakeflow Designer

Importante

Esse recurso está em Visualização Pública.

O Lakeflow Designer permite criar operadores definidos pelo usuário que aparecem diretamente na tela ao lado de operadores internos. Use-os para estender o Lakeflow Designer com sua própria lógica de negócios, cálculos ou integrações.

Há três tipos de operadores definidos pelo usuário:

  • python-run-function: um arquivo YAML independente com Python embutido armazenado no espaço de trabalho. Melhor para transformações no nível de DataFrame e integrações externas. As permissões são gerenciadas no nível do arquivo do workspace.
  • uc-udf: Encapsula uma função escalar do Unity Catalog. Ideal para transformações no nível da coluna. O acesso é regido pelas permissões do Catálogo do Unity.
  • uc-udtf: Encapsula uma função do Unity Catalog que retorna uma tabela. Melhor para transformações no nível da tabela, como agrupamento e agregação de aprendizado de máquina. O acesso é regido pelas permissões do Catálogo do Unity.
Característica python-run-function uc-udf uc-udtf
Exemplo de caso de uso Transformações de DataFrame, integrações de API, notificações por email Cálculos em nível de coluna (IMC, taxas de juros) Agrupamento de aprendizado de máquina, agregação ao longo das linhas
Entrada Estruturas de Dados (DataFrames) Valores únicos Tabela inteira, linha por linha
Saída Estruturas de Dados (DataFrames) Valor único Tabela (várias linhas)
Requer função do Unity Catalog No Yes Yes
Governança de acesso Permissões de arquivos do espaço de trabalho Permissões do Unity Catalog (EXECUTE, USE SCHEMA) Permissões do Unity Catalog (EXECUTE, USE SCHEMA)
Idiomas com suporte Somente Python SQL ou Python em um wrapper do SQL SQL ou Python em um wrapper do SQL

Como funcionam os operadores definidos pelo usuário?

Um operador definido pelo usuário consiste em:

  • Lógica do operador: o código executado quando o operador é executado. Isso pode ser uma função Python run() embutida (para python-run-function) ou uma função do Catálogo do Unity (para uc-udf e uc-udtf).
  • Configuração do YAML: informa ao Lakeflow Designer como apresentar o operador na interface do usuário, incluindo o nome do operador, a descrição, os parâmetros de entrada, os widgets de interface do usuário e as portas. Todos os tipos de operador usam o user-defined-operator-v0.1.0 esquema.
  • Arquivo de registro: Uma entrada em .user_defined_operators.yaml que permite que o Lakeflow Designer descubra o operador.

Lógica do operador

Python executar a lógica do operador definido pelo usuário da função

Cada python-run-function operador deve definir uma run() função:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: valores configurados pelo usuário na interface do usuário, indexados pelo nome da propriedade.
  • inputs: DataFrames de entrada, chaveado pela porta namede entrada.
  • spark: O SparkSession ativo.
  • Retorno: um dicionário que mapeia os valores da porta de saída name para DataFrames.

O exemplo a seguir filtra linhas de um DataFrame de entrada:

def run(config, inputs, spark):
    df = inputs["in"]
    filtered = df.filter(config["filter_expression"])
    return {"out": filtered}

Se o operador exigir pacotes pip externos, adicione o campo environment ao YAML:

environment:
  environment_version: '1'
  dependencies:
    - requests==2.31.0
    - beautifulsoup4==4.12.0

Lógica do operador UDF e UDTF

Você pode escrever funções UC no SQL ou Python. Funções Python são envolvidas em uma instrução SQL CREATE FUNCTION:

Função SQL:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
  SELECT weight_kg / (height_m * height_m);

função Python (encapsulada em SQL):

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  return weight_kg / (height_m ** 2)
$$;

UDFs processam um único valor por vez e retornam um valor calculado. UDTFs processam tabelas linha por linha e podem manter o estado em todas as linhas. Use uc-udf para transformações de nível de coluna e uc-udtf para operações como clustering de ML ou agregação.

Além disso, os UDTFs exigem que você defina três métodos principais: __init__()eeval()terminate():

class MyOperator:
    def __init__(self):
        # Called before processing - initialize any values needed.

    def eval(self, row, id_column, columns, k):
        # Called once per input row - accumulate data here.

    def terminate(self):
        # Called after all rows - perform final calculations and yield results.

Note

As tabelas de retorno UDTF devem ter tipos fixos e explícitos. Você não pode referenciar tipos de coluna de entrada na configuração de retorno.

Configuração do YAML

A configuração do YAML informa ao Lakeflow Designer como apresentar o operador na interface do usuário. Ele define o nome do operador, a descrição, os parâmetros de entrada, os widgets de interface do usuário e as portas. Cada campo de configuração é uma propriedade com um tipo, título e dicas de widget opcionais x-ui :

config:
  type: object
  properties:
    my_param:
      type: string
      title: My Parameter
      x-ui:
        widget: input
    my_expression:
      type: string
      title: Column
      format: expression
      x-ui:
        widget: expression
        port: in
    my_number:
      type: number
      title: Count
      default: 10
      minimum: 0
      maximum: 100
  required:
    - my_param
    - my_expression

Para obter detalhes completos sobre o esquema YAML, incluindo todos os tipos de widget e opções de configuração, consulte a referência YAML do operador definido pelo usuário.

Portas

As portas definem as entradas e saídas para o operador:

ports:
  input:
    - name: in
      title: Input Data
      mime: application/vnd.databricks.dataframe
      required: true
      allowMultiple: false
  output:
    - name: out
      title: Output Data

YAML para operadores de função de execução para Python

Para operadores python-run-function, o arquivo YAML é autônomo e inclui um campo run_function com código Python embutido:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
  type: object
  properties:
    filter_expression:
      type: string
      title: Filter Expression
      x-ui:
        widget: input
  required:
    - filter_expression
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["in"]
        filtered = df.filter(config["filter_expression"])
        return {"out": filtered}

YAML para funções do Unity Catalog

Para operadores baseados em UC, insira a configuração YAML como um comentário ou docstring em sua função.

Em SQL (use o comentário /* ... */):

RETURN(/*
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
    */
  SELECT weight_kg / (height_m * height_m)
);

In Python (use """ ... """ docstring):

AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
  """

  return weight_kg / (height_m ** 2)
$$;

Registrar e implantar seu operador no Lakeflow Designer

Para que o operador apareça no Lakeflow Designer, registre-o em um .user_defined_operators.yaml arquivo:

  • Nível do workspace: Coloque o arquivo na raiz do workspace para tornar o operador visível para todos os usuários.
  • Nível de usuário: Coloque o arquivo na pasta inicial do usuário (/Workspace/Users/<user-name>/.user_defined_operators.yaml) para tornar os operadores visíveis somente para você.

A seção operators: oferece suporte a caminhos de arquivo, referências de função do Unity Catalog e padrões glob. Você pode misturar tipos de entrada:

operators:
  # File path (python-run-function operators)
  - /Workspace/Users/me/udos/my_operator.yaml
  # Glob pattern (registers all matching files)
  - /Workspace/Users/me/udos/transforms/*.yaml
  # UC function reference (uc-udf and uc-udtf operators)
  - catalog: my_catalog
    schema: my_schema
    functionName: my_function

Configurações avançadas

Modo de visualização

O Lakeflow Designer dá suporte a visualizações enquanto estiver no modo de design. Para operadores que chamam APIs externas ou gravam em sistemas externos, adicione uma propriedade de configuração is_preview para que você possa ignorar efeitos colaterais durante a visualização. Quando o modo de visualização está habilitado, os usuários precisam clicar explicitamente em Executar para executar o operador com efeitos colaterais.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false

Lakeflow Designer define automaticamente esse valor como true durante a visualização. Verifique isso na sua lógica para evitar efeitos colaterais:

# In a python-run-function
if config.get("is_preview"):
    return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Conexões do Unity Catalog

Para operadores SQL baseados em UC que chamam APIs externas, use conexões HTTP do Catálogo do Unity para armazenar credenciais com segurança:

CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
  host 'https://api.example.com',
  port '443',
  base_path '/v1/',
  bearer_token 'your-token-here'
);

Em seguida, use a conexão na sua UDF SQL com a função http_request(). Para obter detalhes, consulte Conectar-se a serviços HTTP externos.

WorkspaceClient

Para operadores python-run-function, você pode usar o Azure Databricks WorkspaceClient para acessar recursos do espaço de trabalho e APIs externas:

def run(config, inputs, spark):
    from databricks.sdk import WorkspaceClient
    w = WorkspaceClient()
    # Use w to access workspace resources

Criar um operador completo definido pelo usuário de python-run-function

As etapas a seguir mostram como criar um operador python-run-function do zero.

Etapa 1: Definir a lógica

Escreva sua run() função em um notebook:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    from pyspark.sql import functions as F
    df = inputs["in"]
    result = df.withColumn(config["column_name"], F.current_timestamp())
    return {"out": result}

Etapa 2: Testar a função

Teste a função interativamente com dados de exemplo:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

result = run(
    config={"column_name": "processed_at"},
    inputs={"in": test_df},
    spark=spark
)

result["out"].show()

Etapa 3: Criar a configuração do YAML

Defina os metadados do operador, os campos de configuração e as portas em um arquivo YAML:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name

Etapa 4: Combinar a lógica e o YAML

Adicione os campos run_function e ports para criar o arquivo YAML completo. Salve isso no seu espaço de trabalho, por exemplo /Workspace/Users/<user-name>/udos/add_timestamp.yaml:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        from pyspark.sql import functions as F
        df = inputs["in"]
        result = df.withColumn(config["column_name"], F.current_timestamp())
        return {"out": result}

Etapa 5: Registrar o operador

Adicione o caminho do arquivo ao arquivo .user_defined_operators.yaml :

operators:
  - /Workspace/Users/<user-name>/udos/add_timestamp.yaml

Etapa 6: Usar o operador no Lakeflow Designer

Abra o Lakeflow Designer e verifique se o operador aparece na paleta de operadores. Arraste-o para a tela, conecte uma entrada, configure o nome da coluna e execute uma visualização.

Criar um operador completo definido pelo usuário da UC

As etapas a seguir mostram como criar um operador baseado em UC uc-udf.

Etapa 1: Definir a lógica

Escreva e teste sua lógica de função em um notebook:

def double_value(input_value: float) -> float:
    if input_value is None:
        return None
    return input_value * 2

Etapa 2: Criar a configuração do YAML

Defina os metadados do operador, os campos de configuração e as portas:

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
  type: object
  properties:
    input_value:
      type: string
      title: Input Value
      format: expression
      x-ui:
        widget: expression
        port: input_data
  required:
    - input_value
ports:
  input:
    - name: input_data
      title: Input
  output:
    - name: out
      title: Output

Etapa 3: Combinar a lógica e o YAML

Crie a função catálogo do Unity com o YAML inserido como uma docstring:

CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Double Value
  id: math.double_value
  version: "1.0.0"
  description: Doubles the input value
  config:
    type: object
    properties:
      input_value:
        type: string
        title: Input Value
        format: expression
        x-ui:
          widget: expression
          port: input_data
    required:
      - input_value
  ports:
    input:
      - name: input_data
        title: Input
    output:
      - name: out
        title: Output
  """

  def double_value(input_value: float) -> float:
      if input_value is None:
          return None
      return input_value * 2

  return double_value(input_value)
$$

Etapa 4: Testar a função

SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

Etapa 5: Registrar o operador

Adicione a referência de função do Catálogo do Unity ao seu .user_defined_operators.yaml arquivo:

operators:
  - catalog: main
    schema: my_schema
    functionName: double_value

Etapa 6: Usar o operador no Lakeflow Designer

Abra o Lakeflow Designer e verifique se o operador aparece na paleta de operadores. Arraste-o para a tela, conecte uma entrada e execute uma visualização.

Troubleshooting

Issue Solução
O operador não aparece no Lakeflow Designer. Verifique se .user_defined_operators.yaml existe e lista o caminho da função ou do arquivo. Para python-run-function operadores, verifique o caminho do arquivo e se o arquivo YAML está acessível.
Falha na validação do esquema. Verifique seu YAML em relação ao esquema oficial em https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json.
Permissão negada. Para operadores baseados em UC, verifique se os usuários têm EXECUTE na função e USE SCHEMA no esquema. Para python-run-function operadores, verifique se os usuários têm acesso de leitura ao arquivo YAML.
python-run-function O operador falha no runtime. Verifique se a assinatura da função run() corresponde a def run(config, inputs, spark). Verifique se os nomes de porta no código correspondem ao YAML e se as chaves de dicionário de retorno correspondem aos valores da porta name de saída.
UDTF retorna tipos incorretos. Os tipos de retorno UDTF devem ser explícitos– você não pode referenciar tipos de coluna de entrada.

Permissions

Permissão Purpose
Acesso de leitura a .user_defined_operators.yaml. Descubra o operador.
Acesso de leitura ao arquivo YAML (python-run-function somente). Carregue a definição do operador.
EXECUTE na função Catálogo do Unity (somente operadores baseados em UC). Execute o operador.
USE SCHEMA no esquema (somente operadores baseados em UC). Acesse o esquema em que a função é criada.
Outras permissões Dependendo do operador, os usuários podem exigir outras permissões. Por exemplo, USE CONNECTION em uma conexão do Unity Catalog para chamadas à API HTTP.

Próximas Etapas 

Explore os seguintes tutoriais:

Example Tipo Description
Remetente de email do Gmail python-run-function Enviar dados dataFrame como um anexo de email CSV por meio do Gmail.
Calculadora de juros compostos uc-udf Calcule valores de investimento futuros usando a fórmula de juros composta.
Clustering K-means uc-udtf Segmentar dados em clusters usando scikit-learn.
Enviar mensagem do Slack uc-udf Envie notificações para os canais do Slack por meio da API.
Todos os widgets de interface do usuário uc-udf Operador de referência mostrando todos os widgets de interface do usuário disponíveis.

Para obter uma referência completa ao esquema YAML, consulte a referência YAML do operador definido pelo usuário.