Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Importante
Este recurso está no Public Preview no Databricks Runtime 16.2 ou versões superiores.
Você pode criar aplicações de streaming usando operadores com estado personalizados para implementar soluções de baixa latência e de quase tempo real que usam em lógica de estado arbitrária. Operadores personalizados de gestão de estado permitem novos casos de uso e padrões que não são possíveis através do processamento tradicional de streaming estruturado.
Observação
O Databricks recomenda o uso da funcionalidade interna de Streaming Estruturado para operações com estado suportadas, como agregações, desduplicação e junções de streaming. Veja O que é streaming stateful?.
O Databricks recomenda o uso de transformWithState sobre operadores herdados para transformações de estado arbitrárias. Para obter documentação sobre os operadores de estado flatMapGroupsWithState e mapGroupsWithState herdados, consulte Operadores de estado arbitrários herdados.
Requerimentos
O operador transformWithState e as APIs e classes relacionadas têm os seguintes requisitos:
- Disponível no Databricks Runtime 16.2 e superior.
- A computação deve usar modo de acesso dedicado ou sem isolamento, exceto que o modo de acesso padrão é suportado para Python (
transformWithStateInPandas) no Databricks Runtime 16.3 e superior, e para Scala (transformWithState) no Databricks Runtime 17.3 e superior. - Você deve usar o provedor de armazenamento de estado RocksDB. O Databricks recomenda habilitar o RocksDB como parte da configuração de computação.
Observação
Para habilitar o provedor de armazenamento de estado RocksDB para a sessão atual, execute o seguinte:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
O que é transformWithState?
O operador transformWithState aplica um processador de estado personalizado a uma consulta de streaming estruturado. Você deve implementar um processador com estado personalizado para usar o transformWithState. O Streaming Estruturado inclui APIs para criar seu processador stateful usando Python, Scala ou Java.
Use transformWithState para aplicar lógica personalizada a uma chave de agrupamento para registros processados incrementalmente com o Structured Streaming. O seguinte descreve o design de alto nível:
- Defina uma ou mais variáveis de estado.
- As informações de estado são persistentes para cada chave de agrupamento e podem ser acessadas para cada variável de estado de acordo com a lógica definida pelo usuário.
- Para cada microlote processado, todos os registos associados à chave estão disponíveis como um iterador.
- Utilize manipuladores integrados para controlar quando e como os registos são emitidos com base em temporizadores e condições definidas pelo utilizador.
- Os valores de estado suportam definições individuais de tempo de vida (TTL), permitindo flexibilidade no gerenciamento da expiração do estado e do tamanho do estado.
Como o transformWithState oferece suporte à evolução do esquema no armazenamento de estado, você pode iterar e atualizar seus aplicativos de produção sem perder informações de estado histórico ou precisar reprocessar registros, oferecendo flexibilidade para desenvolvimento e facilidade de manutenção. Consulte evolução do esquema no repositório estadual.
Importante
PySpark usa o operador transformWithStateInPandas em vez de transformWithState. A documentação do Azure Databricks usa transformWithState para descrever a funcionalidade para implementações Python e Scala.
As implementações Scala e Python do transformWithState e APIs relacionadas diferem devido às especificidades da linguagem, mas fornecem a mesma funcionalidade. Consulte exemplos específicos de linguagem e documentação de API para sua linguagem de programação preferida.
Alças de processamento incorporadas
Você implementa a lógica principal para o seu aplicativo stateful personalizado implementando handlers de usando handles internos .
- Os manipuladores fornecem os métodos para interagir com valores de estado e temporizadores, processar registos de entrada e emitir registos.
- Os manipuladores definem sua lógica personalizada orientada a eventos.
Os manipuladores para cada tipo de estado são implementados com base na estrutura de dados subjacente, mas cada um contém funcionalidade para obter, inserir, atualizar e excluir registos.
Os manipuladores são implementados com base em eventos observados em registros de entrada ou temporizadores, usando a seguinte semântica:
- Defina um manipulador usando o método
handleInputRowspara controlar como os dados são processados, o estado é atualizado e os registros são emitidos para cada microlote de registros processados para a chave de agrupamento. Consulte Manipular linhas de entrada. - Defina um manipulador usando o método
handleExpiredTimerpara usar limites baseados em tempo para executar a lógica, independentemente de registros adicionais serem ou não processados para a chave de agrupamento. Consulte Eventos cronometrados do programa.
A tabela a seguir apresenta uma comparação de comportamentos funcionais suportados por esses manipuladores:
| Comportamento | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Obter, colocar, atualizar ou limpar valores de estado | Sim | Sim |
| Criar ou eliminar um temporizador | Sim | Sim |
| Emitir registos | Sim | Sim |
| Iterar sobre registros no microlote atual | Sim | Não |
| Lógica de acionamento com base no tempo decorrido | Não | Sim |
Você pode combinar handleInputRows e handleExpiredTimer para implementar lógica complexa, conforme necessário.
Por exemplo, você pode implementar um aplicativo que usa handleInputRows para atualizar valores de estado para cada microlote e definir um temporizador de 10 segundos no futuro. Se nenhum registo adicional for processado, poderá usar handleExpiredTimer para emitir os valores atuais no armazenamento de estado. Se novos registros forem processados para a chave de agrupamento, você poderá limpar o temporizador existente e definir um novo temporizador.
Tipos de estado personalizados
Você pode implementar vários objetos de estado em um único operador com estado. Os nomes que você dá a cada objeto de estado persistem no armazenamento de estado, que você pode acessar com o leitor de armazenamento de estado. Se o objeto de estado usar um StructType, você fornecerá nomes para cada campo na estrutura ao passar o esquema. Esses nomes também são visíveis ao ler o armazenamento de estado. Consulte Leia as informações de estado do Structured Streaming.
A funcionalidade fornecida por classes e operadores internos destina-se a fornecer flexibilidade e extensibilidade, e as opções de implementação devem ser informadas pela lógica completa que seu aplicativo precisa executar. Por exemplo, você pode implementar uma lógica quase idêntica usando um ValueState agrupado por campos user_id e session_id ou um MapState agrupado por user_id em que session_id é a chave para o MapState. Neste caso, um MapState pode ser a implementação preferida se a lógica precisar avaliar as condições em vários session_ids.
As seções a seguir descrevem os tipos de estado suportados pelo transformWithState.
ValueState
Para cada chave de agrupamento, há um valor associado.
Um estado de valor pode incluir tipos complexos, como uma estrutura ou tupla. Ao atualizar um ValueState, você implementa a lógica para substituir todo o valor. O TTL de um estado de valor é redefinido quando o valor é atualizado, mas não é redefinido se uma chave de origem correspondente a um ValueState for processada sem atualizar o ValueStatearmazenado.
ListState
Para cada chave de agrupamento, há uma lista associada.
Um estado de lista é uma coleção de valores, cada um dos quais pode incluir tipos complexos. Cada valor em uma lista tem seu próprio TTL. Você pode adicionar itens a uma lista anexando itens individuais, anexando uma lista de itens ou substituindo a lista inteira por um put. Somente a operação put é considerada uma atualização para redefinir o TTL.
MapState
Para cada chave de agrupamento, há um mapa associado. Os mapas são o equivalente funcional do Apache Spark a um ditado do Python.
Importante
As chaves de agrupamento descrevem os campos especificados na cláusula GROUP BY da consulta Streaming Estruturado. Os estados do mapa contêm um número arbitrário de pares chave-valor para uma chave de agrupamento.
Por exemplo, se você agrupar por user_id e quiser definir um mapa para cada session_id, sua chave de agrupamento será user_id e a chave em seu mapa será session_id.
Um estado de mapa é uma coleção de chaves distintas que cada uma mapeia para um valor que pode incluir tipos complexos. Cada par chave-valor em um mapa tem seu próprio TTL. Você pode atualizar o valor de uma chave específica ou remover uma chave e seu valor. Você pode retornar um valor individual usando sua chave, listar todas as chaves, listar todos os valores ou retornar um iterador para trabalhar com o conjunto completo de pares chave-valor no mapa.
Inicializar uma variável de estado personalizada
Ao inicializar o StatefulProcessor, você cria uma variável local para cada objeto de estado que permite interagir com objetos de estado em sua lógica personalizada. As variáveis de estado são definidas e inicializadas substituindo o método init interno na classe StatefulProcessor.
Você define uma quantidade arbitrária de objetos de estado usando os métodos getValueState, getListStatee getMapState ao inicializar o StatefulProcessor.
Cada objeto de estado deve ter o seguinte:
- Um nome único
- Um esquema especificado
- Em Python, o esquema é especificado explicitamente.
- No Scala, passe um
Encoderpara especificar o esquema de estado.
Você também pode fornecer uma duração opcional de tempo de vida (TTL) em milissegundos. Se estiver implementando um estado de mapa, você deve fornecer uma definição de esquema separada para as chaves de mapa e os valores.
Observação
A lógica de como as informações de estado são consultadas, atualizadas e emitidas é tratada separadamente. Consulte Usar as suas variáveis de estado.
Exemplo de aplicação com monitoração de estado
A seguir é demonstrada a sintaxe básica para definir e usar um processador stateful personalizado com transformWithState, incluindo variáveis de estado de exemplo para cada tipo suportado. Para obter mais exemplos, consulte Exemplos de aplicativos com estado.
Observação
Python usa tuplas para todas as interações com valores de estado. Isso significa que o código Python deve passar valores usando tuplas ao usar operações como put e update e esperar manipular tuplas ao usar get.
Por exemplo, se o esquema para seu estado de valor for apenas um único inteiro, você implementaria um código como o seguinte:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Isso também é verdade para itens em um ListState ou valores em um MapState.
Python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
linguagem de programação Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
O PySpark inclui a classe StatefulProcessorHandle para fornecer acesso a funções que controlam como o código Python definido pelo usuário interage com as informações de estado. Você sempre deve importar e passar o StatefulProcessorHandle para a variável handle ao inicializar um StatefulProcessor.
A variável handle vincula a variável local em sua classe Python à variável state.
Observação
Scala usa o método getHandle.
Especificar estado inicial
Opcionalmente, você pode fornecer um estado inicial para usar com o primeiro microlote. Isso pode ser útil ao migrar um fluxo de trabalho existente para um novo aplicativo personalizado, atualizar um operador com estado para alterar seu esquema ou lógica ou reparar uma falha que não pode ser reparada automaticamente e requer intervenção manual.
Observação
Use o leitor de armazenamento de estado para consultar informações de estado de um ponto de verificação existente. Consulte Leia as informações de estado do Structured Streaming.
Se estiveres a converter uma tabela Delta existente numa aplicação com estado, lê a tabela usando spark.read.table("table_name") e passa o DataFrame resultante. Opcionalmente, podes selecionar ou modificar campos para se adequarem à tua nova aplicação com estado gerido.
Você fornece um estado inicial usando um DataFrame com o mesmo esquema de chave de agrupamento que as linhas de entrada.
Observação
Python usa handleInitialState para especificar o estado inicial ao definir um StatefulProcessor. Scala usa a classe distinta StatefulProcessorWithInitialState.
Use suas variáveis de estado
Os objetos de estado suportados fornecem métodos para obter o estado, atualizar informações de estado existentes ou limpar o estado atual. Cada tipo de estado suportado tem uma implementação exclusiva de métodos que correspondem à estrutura de dados implementada.
Cada chave de agrupamento observada tem informações de estado dedicadas.
- Os registros são emitidos com base na lógica implementada e usando o esquema de saída especificado. Consulte os registros de emissão em .
- Você pode acessar valores no armazenamento de estado usando o leitor
statestore. Este leitor tem funcionalidade de lote e não se destina a cargas de trabalho de baixa latência. Consulte Leia as informações de estado do Structured Streaming. - A lógica especificada usando
handleInputRowssó é acionada se os registros da chave estiverem presentes em um microlote. Consulte Manipular linhas de entrada. - Use
handleExpiredTimerpara implementar lógica baseada no tempo que não depende da observação de registros para disparar. Consulte Eventos cronometrados do programa.
Observação
Os objetos de estado são isolados agrupando chaves com as seguintes implicações:
- Os valores de estado não podem ser afetados por registros associados a uma chave de agrupamento diferente.
- Você não pode implementar lógica que depende da comparação de valores ou da atualização do estado entre chaves de agrupamento.
Você pode comparar valores dentro de uma chave de agrupamento. Use um MapState para implementar a lógica com uma segunda chave que sua lógica personalizada pode usar. Por exemplo, agrupar por user_id e chavear o seu MapState usando o endereço IP permitiria implementar uma lógica que rastreia sessões de utilizador simultâneas.
Considerações avançadas para trabalhar com o estado
Gravar numa variável de estado provoca uma gravação no RocksDB. Para um desempenho otimizado, o Databricks recomenda processar todos os valores no iterador para uma determinada chave e confirmar atualizações em uma única gravação sempre que possível.
Observação
As atualizações de estado são tolerantes a falhas. Se uma tarefa falhar antes de um microlote terminar o processamento, o valor do último microlote bem-sucedido será usado na nova tentativa.
Os valores de estado não têm valores padrão definidos. Se sua lógica requer a leitura de informações de estado existentes, use o método exists ao implementar sua lógica.
Observação
MapState variáveis têm funcionalidade adicional para verificar chaves individuais ou listar todas as chaves para implementar lógica para estado nulo.
Emitir registos
A lógica definida pelo usuário controla como transformWithState emite registros. Os registros são emitidos por chave de agrupamento.
As aplicações de estado personalizado não fazem suposições sobre como as informações de estado são usadas ao determinar como emitir registos, e o número de registos devolvidos para uma dada condição pode ser nenhum, um ou muitos.
Você implementa a lógica para emitir registros usando handleInputRows ou handleExpiredTimer. Consulte Manipular linhas de entrada e Eventos cronometrados do programa.
Observação
Você pode implementar vários valores de estado e definir várias condições para emitir registros, mas todos os registros emitidos devem usar o mesmo esquema.
Python
Em Python, você define seu esquema de saída usando a palavra-chave outputStructType ao chamar transformWithStateInPandas.
Você emite registros usando um objeto Pandas DataFrame e yield.
Opcionalmente, você pode yield um DataFrame vazio. Quando combinado com o modo de saída update, a emissão de um DataFrame vazio atualiza os valores da chave de agrupamento para que sejam nulos.
linguagem de programação Scala
No Scala, você emite registros usando um objeto Iterator. O esquema da saída é derivado de registros emitidos.
Opcionalmente, você pode emitir uma Iteratorvazia. Quando combinado com o modo de saída update, a emissão de um Iterator vazio atualiza os valores associados à chave de agrupamento para serem nulos.
Manipular linhas de entrada
Use o método handleInputRows para definir a lógica de como os registros observados em sua consulta de streaming interagem e atualizam os valores de estado. O manipulador que você define com o método handleInputRows é executado sempre que qualquer registro é processado por meio de sua consulta de Streaming Estruturado.
Para a maioria dos aplicativos com estado implementados com transformWithState, a lógica central é definida usando handleInputRows.
Para cada atualização de microlote processada, todos os registros no microlote para uma determinada chave de agrupamento estão disponíveis usando um iterador. A lógica definida pelo usuário pode interagir com todos os registros do microlote atual e os valores no statestore.
Eventos cronometrados do programa
Você pode usar temporizadores para implementar lógica personalizada com base no tempo decorrido de uma condição especificada.
Você trabalha com temporizadores implementando um método handleExpiredTimer.
Dentro de uma chave de agrupamento, os temporizadores são identificados exclusivamente por seu carimbo de data/hora.
Quando um temporizador expira, o resultado é determinado pela lógica implementada em seu aplicativo. Os padrões comuns incluem:
- Emissão de informações armazenadas em uma variável de estado.
- Expulsão de informações de estado armazenadas.
- Criação de um novo temporizador.
Os temporizadores expirados são acionados mesmo que nenhum registro para sua chave associada seja processado em um microlote.
Especificar o modelo de tempo
Ao passar o StatefulProcessor para transformWithState, você deve especificar o modelo de tempo. As seguintes opções são suportadas:
ProcessingTimeEventTime-
NoTimeouTimeMode.None()
Especificar NoTime significa que os temporizadores não são suportados para o processador.
Valores de temporizador incorporados
O Databricks recomenda não invocar o relógio do sistema em seu aplicativo com monitoração de estado personalizado, pois isso pode levar a novas tentativas não confiáveis em caso de falha de tarefa. Use os métodos na classe TimerValues quando você deve acessar o tempo de processamento ou marca d'água:
TimerValues |
Descrição |
|---|---|
getCurrentProcessingTimeInMs |
Retorna a marca temporal do tempo de processamento para o lote atual em milissegundos desde a era Unix. |
getCurrentWatermarkInMs |
Retorna o timestamp da marca d'água do lote atual em milissegundos desde o Epoch. |
Observação
O tempo de processamento descreve o tempo que o microlote é processado pelo Apache Spark. Muitas fontes de streaming, como Kafka, também incluem o tempo de processamento do sistema.
As marcas temporais em consultas de transmissão geralmente são definidas em relação ao tempo do evento ou ao tempo de processamento da fonte de streaming. Consulte Aplicação de marcas d'água para controlar os limites de processamento de dados.
Tanto as marcas d'água, assim como as janelas, podem ser usadas em combinação com transformWithState. Você pode implementar uma funcionalidade semelhante na sua aplicação stateful personalizada aproveitando TTL, temporizadores e a funcionalidade MapState ou ListState.
O que é o tempo de vida do estado (TTL)?
Os valores de estado usados por transformWithState cada um suporta uma especificação opcional de tempo de vida (TTL). Quando o TTL expira, o valor é eliminado do armazenamento de estado. O TTL interage apenas com valores no armazenamento de estado, o que significa que pode implementar lógica para remover informações de estado, mas não pode acionar diretamente a lógica à medida que o TTL remove os valores de estado.
Importante
Se você não implementar o TTL, deverá lidar com a remoção de estado usando outra lógica para evitar o crescimento interminável do estado.
O TTL é aplicado para cada valor de estado, com regras diferentes para cada tipo de estado.
- As variáveis de estado têm como escopo agrupar chaves.
- Para
ValueStateobjetos, apenas um único valor é armazenado por chave de agrupamento. TTL aplica-se a este valor. - Para objetos
ListState, a lista pode conter muitos valores. O TTL aplica-se a cada valor numa lista de forma independente. - Para objetos
MapState, cada chave de mapa tem um valor de estado associado. O TTL aplica-se independentemente a cada par chave-valor num mapa.
Para todos os tipos de estado, o TTL é reiniciado se as informações de estado forem atualizadas.
Observação
Embora o TTL tenha como escopo valores individuais em um ListState, a única maneira de atualizar um valor em uma lista é usar o método put para substituir todo o conteúdo da variável ListState.
Qual é a diferença entre temporizadores e TTL?
Há alguma sobreposição entre temporizadores e tempo de vida (TTL) para variáveis de estado, mas os temporizadores fornecem um conjunto mais amplo de recursos do que o TTL.
TTL remove informações de estado que não foram atualizadas para o período especificado pelo usuário. Isso permite que os usuários impeçam o crescimento descontrolado do estado e removam entradas de estado obsoletas. Como mapas e listas implementam TTL para cada valor, você pode implementar funções que consideram apenas valores de estado que foram atualizados recentemente definindo TTL.
Os temporizadores permitem que defina uma lógica personalizada além da expulsão de estado, incluindo a emissão de registos. Opcionalmente, você pode usar temporizadores para limpar informações de estado para um determinado valor de estado, com a flexibilidade adicional para emitir valores ou acionar outra lógica condicional com base no temporizador.