Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Esse recurso está em Visualização Pública no Databricks Runtime 16.2 e superior.
Você pode criar aplicativos de streaming usando operadores com estado personalizados para implementar soluções de baixa latência e quase em tempo real que usam lógica com estado arbitrário. Operadores com estado personalizados desbloqueiam novos casos de uso operacional e padrões indisponíveis por meio do processamento de Streaming Estruturado tradicional.
Observação
A Databricks recomenda usar a funcionalidade interna de Streaming Estruturado para operações com estado suportadas, como agregações, eliminação de duplicações e uniões de streaming. Confira O que é streaming com estado?.
O Databricks recomenda usar transformWithState em vez de operadores herdados para transformações de estado arbitrárias. Para obter documentação sobre os operadores legados flatMapGroupsWithState e mapGroupsWithState, consulte Operadores arbitrários com estado legados.
Requisitos
O operador transformWithState e as APIs e classes relacionadas têm os seguintes requisitos:
- Disponível no Databricks Runtime 16.2 e superior.
- A computação deve usar o modo de acesso dedicado ou sem isolamento, exceto que o modo de acesso padrão tem suporte para Python (
transformWithStateInPandas) no Databricks Runtime 16.3 e superior, e para Scala (transformWithState) no Databricks Runtime 17.3 e superior. - Você deve usar o provedor de repositório de estado do RocksDB. O Databricks recomenda habilitar o RocksDB como parte da configuração de computação.
Observação
Para habilitar o provedor de repositório de estado do RocksDB para a sessão atual, execute o seguinte:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
O que é transformWithState?
O transformWithState operador aplica um processador com estado personalizado a uma consulta de Streaming Estruturado. Você deve implementar um processador com estado personalizado para usar transformWithState. O Streaming Estruturado inclui APIs para criar seu processador com estado usando Python, Scala ou Java.
Você usa transformWithState para aplicar lógica personalizada a uma chave de agrupamento para registros processados incrementalmente com Streaming Estruturado. O seguinte descreve o design de alto nível:
- Defina uma ou mais variáveis de estado.
- As informações de estado são mantidas para cada chave de agrupamento e podem ser acessadas para cada variável de estado de acordo com a lógica definida pelo usuário.
- Para cada micro-lote processado, todos os registros vinculados à chave estão disponíveis como um iterador.
- Use alças internas para controlar quando e como os registros são emitidos com base em temporizadores e condições definidas pelo usuário.
- Os valores de estado dão suporte a definições de TTL (vida útil individual), permitindo flexibilidade no gerenciamento da expiração de estado e do tamanho do estado.
Como transformWithState dá suporte à evolução do esquema no repositório de estado, você pode iterar e atualizar seus aplicativos de produção sem perder informações de estado histórico ou precisar reprocessar registros, proporcionando flexibilidade para desenvolvimento e facilidade de manutenção. Confira a evolução do esquema no armazenamento de estado.
Importante
O PySpark usa o operador transformWithStateInPandas em vez de transformWithState. A documentação do Azure Databricks usa transformWithState para descrever a funcionalidade para implementações do Python e do Scala.
As implementações de Scala e Python de transformWithState e APIs relacionadas diferem devido às especificações de linguagem, mas fornecem a mesma funcionalidade. Consulte exemplos específicos de linguagem e documentação da API para sua linguagem de programação preferida.
Alças de processamento embutidas
Implemente a lógica principal para seu aplicativo com estado personalizado implementando manipuladores usando gerenciadores internos.
- Os manipuladores fornecem os métodos para interagir com valores de estado e temporizadores, processar registros de entrada e emitir registros.
- Os manipuladores definem sua lógica personalizada controlada por eventos.
Os controladores de cada tipo de estado são implementados com base na estrutura de dados subjacente, mas cada um possui funcionalidades para obter, inserir, atualizar e excluir registros.
Os manipuladores são implementados com base em eventos observados em registros de entrada ou temporizadores, usando a seguinte semântica:
- Defina um manipulador usando o
handleInputRowsmétodo para controlar como os dados são processados, o estado é atualizado e os registros são emitidos para cada micro lote de registros processados para a chave de agrupamento. Veja Gerenciar linhas de entrada. - Defina um manipulador usando o
handleExpiredTimermétodo para usar limites baseados em tempo para executar a lógica se registros adicionais são processados ou não para a chave de agrupamento. Consulte eventos programados.
A tabela a seguir tem uma comparação de comportamentos funcionais com suporte por esses manipuladores:
| Comportamento | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Obter, colocar, atualizar ou limpar valores de estado | Sim | Sim |
| Criar ou excluir um temporizador | Sim | Sim |
| Emitir registros | Sim | Sim |
| Iterar sobre registros no micro lote atual | Sim | Não |
| Lógica de gatilho definida com base no tempo decorrido | Não | Sim |
Você pode combinar handleInputRows e handleExpiredTimer implementar uma lógica complexa conforme necessário.
Por exemplo, você pode implementar um aplicativo que usa handleInputRows para atualizar valores de estado para cada microlote e definir um temporizador de 10 segundos no futuro. Se nenhum registro adicional for processado, você poderá usar handleExpiredTimer para emitir os valores atuais no repositório de estado. Se novos registros forem processados para a chave de agrupamento, você poderá limpar o temporizador existente e definir um novo temporizador.
Tipos de estado personalizados
Você pode implementar vários objetos de estado em um único operador com estado. Os nomes que você dá a cada objeto de estado persistem no repositório de estado, que você pode acessar com o leitor do repositório de estado. Se o objeto de estado usar um StructType, você fornecerá nomes para cada campo no struct ao passar o esquema. Esses nomes também ficam visíveis quando se lê o repositório de estado. Consulte a seção Ler informações de estado do Streaming Estruturado.
A funcionalidade fornecida por classes e operadores internos destina-se a fornecer flexibilidade e extensibilidade, e as opções de implementação devem ser informadas pela lógica completa que seu aplicativo precisa executar. Por exemplo, você pode implementar uma lógica quase idêntica usando um ValueState agrupado por campos user_id e session_id ou um MapState agrupado por user_id onde session_id está a chave para .MapState Nesse caso, uma MapState pode ser a implementação preferencial se a lógica precisar avaliar condições em vários session_ids.
As seções a seguir descrevem os tipos de estado com suporte por transformWithState.
ValueState
Para cada chave de agrupamento, há um valor associado.
Um estado de valor pode incluir tipos complexos, como uma estrutura ou tupla. Ao atualizar um ValueState, você implementa a lógica para substituir todo o valor. O TTL para um estado de valor é redefinido quando o valor é atualizado, mas não é redefinido se uma chave de origem correspondente a um ValueState for processada sem atualizar o valor armazenado ValueState.
ListState
Para cada chave de agrupamento, há uma lista associada.
Um estado de lista é uma coleção de valores, cada um dos quais pode incluir tipos complexos. Cada valor em uma lista tem sua própria TTL. Você pode adicionar itens a uma lista acrescentando itens individuais, acrescentando uma lista de itens ou substituindo toda a lista com um put. Somente a operação put é considerada uma atualização para redefinir o TTL.
MapState
Para cada chave de agrupamento, há um mapa associado. Os mapas são o equivalente funcional do Apache Spark a um ditado do Python.
Importante
As chaves de agrupamento descrevem os campos especificados na GROUP BY cláusula da consulta de Streaming Estruturado. Os estados do mapa contêm um número arbitrário de pares chave-valor para uma chave de agrupamento.
Por exemplo, se você agrupar user_id e quiser definir um mapa para cada session_idum, sua chave de agrupamento será user_id e a chave do seu mapa será session_id.
Um estado de mapa é uma coleção de chaves distintas que cada um mapeia para um valor que pode incluir tipos complexos. Cada par chave-valor em um mapa tem seu próprio TTL. Você pode atualizar o valor de uma chave específica ou remover uma chave e seu valor. Você pode retornar um valor individual usando sua chave, listar todas as chaves, listar todos os valores ou retornar um iterador para trabalhar com o conjunto completo de pares chave-valor no mapa.
Inicializar uma variável de estado personalizada
Ao inicializar sua StatefulProcessor, você cria uma variável local para cada objeto de estado que permite interagir com objetos de estado em sua lógica personalizada. As variáveis de estado são definidas e inicializadas substituindo o método interno init na StatefulProcessor classe.
Você define uma quantidade arbitrária de objetos de estado usando os métodos getValueState, getListState e getMapState ao inicializar seu StatefulProcessor.
Cada objeto de estado deve ter o seguinte:
- Um nome exclusivo
- Um esquema especificado
- No Python, o esquema é especificado explicitamente.
- Em Scala, passe um
Encoderpara especificar esquema de estado.
Você também pode fornecer uma duração TTL (vida útil) opcional em milissegundos. Se estiver implementando um estado de mapa, você deverá fornecer uma definição de esquema separada para as chaves de mapa e os valores.
Observação
A lógica de como as informações de estado são consultadas, atualizadas e emitidas é tratada separadamente. Consulte Usar suas variáveis de estado.
Exemplo de aplicativo com estado
A seguir, demonstra a sintaxe básica para definir e usar um processador com estado personalizado, transformWithStateincluindo variáveis de estado de exemplo para cada tipo com suporte. Para obter mais exemplos, consulte Exemplo de aplicativos com estado.
Observação
O Python usa tuplas para todas as interações com valores de estado. Isso significa que o código Python deve passar valores usando tuplas ao usar operações como put e update esperar lidar com tuplas ao usar get.
Por exemplo, se o esquema do seu estado de valor for apenas um inteiro, você implementará o código da seguinte maneira:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Isso também é verdadeiro para itens em um ListState ou valores em um MapState.
Python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala (linguagem de programação)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
O PySpark inclui a StatefulProcessorHandle classe para fornecer acesso a funções que controlam como seu código Python definido pelo usuário interage com informações de estado. Você sempre deve importar e passar StatefulProcessorHandle para a variável handle ao inicializar um StatefulProcessor.
A handle variável vincula a variável local em sua classe Python à variável de estado.
Observação
Scala usa o método getHandle.
Especificar o estado inicial
Opcionalmente, você pode fornecer um estado inicial para usar com o primeiro microlote. Isso pode ser útil ao migrar um fluxo de trabalho existente para um novo aplicativo personalizado, atualizar um operador com estado para alterar seu esquema ou lógica ou reparar uma falha que não pode ser reparada automaticamente e requer intervenção manual.
Observação
Use o leitor do repositório de estado para consultar informações de estado de um ponto de verificação existente. Consulte a seção Ler informações de estado do Streaming Estruturado.
Se você estiver convertendo uma tabela Delta existente em um aplicativo com estado, leia a tabela usando spark.read.table("table_name") e passe o DataFrame resultante. Opcionalmente, você pode selecionar ou modificar campos para estar em conformidade com seu novo aplicativo com estado.
Você fornece um estado inicial usando um DataFrame com o mesmo esquema de chave de agrupamento que as linhas de entrada.
Observação
O Python usa handleInitialState para especificar o estado inicial ao definir um StatefulProcessor. Scala usa a classe StatefulProcessorWithInitialStatedistinta.
Use suas variáveis de estado
Os objetos de estado com suporte fornecem métodos para obter estado, atualizar informações de estado existentes ou limpar o estado atual. Cada tipo de estado com suporte tem uma implementação exclusiva de métodos que correspondem à estrutura de dados implementada.
Cada chave de agrupamento observada tem informações de estado dedicadas.
- Os registros são emitidos com base na lógica que você implementa e usando o esquema de saída especificado. Consulte Registros de emissão.
- Você pode acessar valores no repositório de estado usando o
statestoreleitor. Esse leitor tem funcionalidade em lotes e não se destina a cargas de trabalho de baixa latência. Consulte a seção Ler informações de estado do Streaming Estruturado. - A lógica especificada usando
handleInputRowssó é acionada se os registros da chave estiverem presentes em um microlote. Veja Gerenciar linhas de entrada. - Use
handleExpiredTimerpara implementar a lógica temporal que não depende da observação de registros para disparar. Consulte eventos programados.
Observação
Os objetos de estado são isolados pelo agrupamento de chaves com as seguintes implicações:
- Os valores de estado não podem ser afetados por registros associados a uma chave de agrupamento diferente.
- Não é possível implementar a lógica que depende da comparação de valores ou da atualização do estado entre chaves de agrupamento.
Você pode comparar valores em uma chave de agrupamento. Use um MapState para implementar a lógica com uma segunda chave que a sua lógica personalizada pode utilizar. Por exemplo, agrupar por user_id e classificar seu MapState usando o endereço IP permitiria a implementação de lógica que acompanha sessões de usuários simultâneas.
Considerações avançadas para trabalhar com o estado
Gravar em uma variável de estado dispara uma gravação no RocksDB. Para um desempenho otimizado, o Databricks recomenda processar todos os valores no iterador para uma determinada chave e confirmar atualizações em uma única gravação sempre que possível.
Observação
As atualizações de estado são tolerantes a falhas. Se uma tarefa falhar antes de um micro-lote concluir o processamento, o valor do último microlote bem-sucedido será usado na repetição.
Os valores de estado não têm valores padrão predefinidos. Se sua lógica exigir a leitura de informações de estado existentes, use o método exists ao implementar sua lógica.
Observação
MapState as variáveis têm funcionalidade adicional para verificar se há chaves individuais ou listar todas as chaves para implementar a lógica para o estado nulo.
Emitir registros
A lógica definida pelo usuário controla como transformWithState emite registros. Os registros são emitidos por chave de agrupamento.
Aplicativos com estado personalizados não fazem suposições sobre como as informações de estado são usadas ao determinar como emitir registros e o número retornado de registros para uma determinada condição pode ser nenhum, um ou muitos.
Você implementa a lógica para emitir registros usando handleInputRows ou handleExpiredTimer. Consulte Manipular linhas de entrada e Eventos temporizados do programa.
Observação
Você pode implementar vários valores de estado e definir várias condições para emitir registros, mas todos os registros emitidos devem usar o mesmo esquema.
Python
No Python, você define seu esquema de saída usando a outputStructType palavra-chave ao chamar transformWithStateInPandas.
Você gera registros usando um objeto pandas DataFrame e yield.
Opcionalmente, você pode criar um DataFrame vazio yield. Quando combinado com update o modo de saída, a emissão de um DataFrame vazio atualiza os valores para que a chave de agrupamento seja nula.
Scala (linguagem de programação)
No Scala, você emite registros usando um Iterator objeto. O esquema da saída é derivado de registros emitidos.
Você pode opcionalmente emitir um Iterator vazio. Quando combinado com o modo de saída update, emitir um Iterator vazio atualiza os valores para que a chave de agrupamento seja nula.
Manipular linhas de entrada
Use o handleInputRows método para definir a lógica de como os registros observados em sua consulta de streaming interagem e atualizam valores de estado. O manipulador que você define com o handleInputRows método é executado sempre que todos os registros são processados por meio da consulta de Streaming Estruturado.
Para a maioria dos aplicativos com estado implementados com transformWithState, a lógica principal é definida usando handleInputRows.
Para cada atualização de microlote processada, todos os registros no microlote de uma determinada chave de agrupamento estão disponíveis utilizando um iterador. A lógica definida pelo usuário pode interagir com todos os registros do microbatch atual e valores no statestore.
Eventos programados
Você pode usar temporizadores para implementar a lógica personalizada com base no tempo decorrido de uma condição especificada.
Você trabalha com temporizadores implementando um handleExpiredTimer método.
Dentro de uma chave de agrupamento, os temporizadores são identificados exclusivamente pelo carimbo de data/hora.
Quando um temporizador expira, o resultado é determinado pela lógica implementada em seu aplicativo. Os padrões comuns incluem:
- Emitindo informações armazenadas em uma variável de estado.
- Removendo informações de estado armazenadas.
- Criando um novo temporizador.
Os temporizadores expirados são acionados, mesmo que nenhum registro da chave associada seja processado em um microlote.
Especificar o modelo de hora
Ao passar o seu StatefulProcessor para transformWithState, você deve especificar o modelo de hora. Há suporte para as seguintes opções:
ProcessingTimeEventTime-
NoTimeouTimeMode.None()
Especificar NoTime significa que não há suporte para temporizadores para o processador.
Valores de temporizador interno
O Databricks recomenda não invocar o relógio do sistema em seu aplicativo com estado personalizado, pois isso pode levar a novas tentativas não confiáveis de falha de tarefa. Use os métodos na TimerValues classe quando você precisar acessar o tempo de processamento ou a marca d'água:
TimerValues |
Descrição |
|---|---|
getCurrentProcessingTimeInMs |
Retorna o timestamp do tempo de processamento do lote atual em milissegundos desde a Epoch. |
getCurrentWatermarkInMs |
Retorna o carimbo de data/hora da marca d'água para o lote atual em milissegundos desde época. |
Observação
O tempo de processamento descreve a hora em que o microlote é processado pelo Apache Spark. Muitas fontes de streaming, como Kafka, também incluem o tempo de processamento do sistema.
As marcas d'água em consultas de streaming geralmente são definidas em relação à hora do evento ou ao tempo de processamento da fonte de streaming. Confira Aplicar marcas d’água para controlar os limites do processamento de dados.
Marcas d'água e janelas podem ser usadas em combinação com transformWithState. Você pode implementar funcionalidades semelhantes em seu aplicativo personalizado com estado, utilizando TTL, temporizadores e funcionalidade de MapState ou ListState.
O que é o tempo de vida do estado (TTL)?
Os valores de estado usados por transformWithState cada um dão suporte a uma especificação TTL (tempo de vida útil) opcional. Quando o TTL expira, o valor é removido do armazenamento de estado. O TTL interage apenas com valores no repositório de estado, o que significa que você pode implementar a lógica para remover informações de estado, mas não pode disparar diretamente a lógica, pois o TTL remove valores de estado.
Importante
Se você não implementar o TTL, precisará gerenciar a remoção do estado usando outra lógica para evitar o crescimento interminável do estado.
O TTL é imposto para cada valor de estado, com regras diferentes para cada tipo de estado.
- As variáveis de estado estão limitadas pelas chaves de agrupamento.
- Para
ValueStateobjetos, apenas um único valor é armazenado por chave de agrupamento. TTL aplica-se a esse valor. - Para
ListStateobjetos, a lista pode conter muitos valores. A TTL aplica-se a cada valor em uma lista de forma independente. - Para
MapStateobjetos, cada chave de mapa tem um valor de estado associado. O TTL aplica-se independentemente a cada par chave-valor em um mapa.
Para todos os tipos de estado, o TTL será redefinido se as informações de estado forem atualizadas.
Observação
Embora o TTL esteja no escopo de valores individuais em um ListState, a única maneira de atualizar um valor em uma lista é usar o método put para sobrescrever todo o conteúdo da variável ListState.
Qual é a diferença entre temporizadores e TTL?
Há alguma sobreposição entre temporizadores e TTL (vida útil) para variáveis de estado, mas os temporizadores fornecem um conjunto mais amplo de recursos do que o TTL.
O TTL remove informações de estado que não foram atualizadas para o período especificado pelo usuário. Isso permite que os usuários impeçam o crescimento descontrolado do estado e removam entradas de estado obsoletas. Como mapas e listas implementam TTL para cada valor, você pode implementar funções que consideram apenas valores de estado que foram atualizados recentemente definindo TTL.
Os temporizadores permitem que você defina a lógica personalizada além da remoção de estado, incluindo a emissão de registros. Opcionalmente, você pode usar temporizadores para limpar informações de estado para um determinado valor de estado, com a flexibilidade adicional para emitir valores ou disparar outra lógica condicional com base no temporizador.