Visão geral de Streaming do Apache Spark
O Apache Spark Streaming fornece processamento de fluxo de dados em clusters do HDInsight Spark. Ele traz a garantia de que qualquer evento de entrada será processado exatamente uma vez, mesmo se ocorrer uma falha de nó. O Spark Stream é um trabalho de longa execução que recebe dados de entrada de diversas origens, incluindo Hubs de Eventos do Azure. E também: Hub IoT do Azure, Apache Kafka, Apache Flume, X, ZeroMQ
, soquetes TCP brutos ou, ainda, a partir do monitoramento de sistemas de arquivos YARN do Apache Hadoop. Ao contrário de um processo exclusivamente controlado por eventos, o Spark Stream agrupa dados de entrada em janelas de tempo. Como exemplo, uma fatia de 2 segundos; em seguida, ele transforma cada lote de dados usando operações para mapear, reduzir, unir e extrair. Em seguida, o Spark Stream grava os dados transformados em sistemas de arquivos, bancos de dados, painéis e no console.
Os aplicativos Spark Streaming precisam aguardar uma fração de segundo para coletar cada micro-batch
de eventos, antes de encaminhar esse lote para processamento. Por outro lado, um aplicativo controlado por evento processa cada evento imediatamente. A latência do Spark Streaming normalmente fica abaixo de alguns segundos. Os benefícios da abordagem de microlote são um processamento de dados mais eficiente e cálculos de agregação mais simples.
Introdução ao DStream
O Spark Streaming é um fluxo contínuo de entrada de dados que usa um fluxo discretizado chamado DStream. É possível criar um DStream a partir de fontes de entrada como os Hubs de Eventos ou o Kafka. Ou, ainda, aplicando transformações em outro DStream.
Um DStream oferece uma camada de abstração sobre os dados brutos de evento.
Inicie com um evento único, como uma leitura de temperatura de um termostato conectado. Quando esse evento é recebido pelo aplicativo Spark Streaming, ele é armazenado de forma confiável, sendo replicado em vários nós. Essa tolerância a falhas garante que a falha em um dos nós não resultará na perda do evento. O núcleo do Spark usa uma estrutura de dados que distribui dados ao longo de vários nós no cluster. Nela, cada nó mantém, geralmente, seus próprios dados na memória para obter o melhor desempenho. Essa estrutura de dados é chamada de RDD (conjunto de dados distribuído resiliente).
Cada RDD representa os eventos coletados por um período de tempo definido pelo usuário, denominado intervalo de lote. Com a expiração de cada intervalo de lote, um novo RDD é produzido, contendo todos os dados desse intervalo. O conjunto contínuo de RDDs é coletado em um DStream. Por exemplo, se o intervalo de lote for de um segundo, o DStream emitirá um lote a cada segundo com um RDD que contém todos os dados ingeridos durante esse segundo. Ao processar o DStream, o evento de temperatura aparece em um desses lotes. Um aplicativo Spark Streaming processa os lotes que contêm os eventos e, por fim, atua nos dados armazenados em cada RDD.
Estrutura de um aplicativo Spark Streaming
Um aplicativo Spark Streaming é um aplicativo de longa execução que recebe dados de fontes de ingestão. Ele aplica as transformações para processar os dados e, em seguida, efetua push dos dados para um ou mais destinos. A estrutura de um aplicativo Spark Streaming tem uma parte estática e uma parte dinâmica. A parte estática define a origem dos dados e como eles serão processados. E para onde os resultados devem ser enviados. A parte dinâmica executa o aplicativo por tempo indefinido, aguardando um sinal de parada.
Por exemplo, o aplicativo simples a seguir recebe uma linha de texto em um soquete TCP e conta o número de vezes que cada palavra é exibida.
Definir o aplicativo
A definição da lógica do aplicativo tem quatro etapas:
- Criar um StreamingContext.
- Criar um DStream com base no StreamingContext.
- Aplicar transformações ao DStream.
- Enviar os resultados.
Essa definição é estática e nenhum dado é processado até que o aplicativo seja executado.
Criar um StreamingContext
Crie um StreamingContext com base no SparkContext que aponta para o cluster. Ao criar um StreamingContext, especifique o tamanho do lote em segundos, por exemplo:
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(1))
Criar um DStream
Com a instância de StreamingContext, crie um DStream de entrada para a fonte de entrada. Nesse caso, o aplicativo está observando se novos arquivos aparecerão no armazenamento padrão anexado.
val lines = ssc.textFileStream("/uploads/Test/")
Aplicar transformações
O processamento é implementado ao aplicar transformações ao DStream. Este aplicativo recebe uma linha de texto do arquivo por vez e divide cada linha em palavras. Em seguida, ele usa um padrão de redução de mapas para contar quantas vezes cada palavra é exibida.
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Enviar os resultados
Envie os resultados da transformação por push para sistemas de destino aplicando operações de saída. Nesse caso, o resultado de cada execução por meio da computação é impresso na saída do console.
wordCounts.print()
Executar o aplicativo
Inicie o aplicativo de streaming e execute-o até receber um sinal de encerramento.
ssc.start()
ssc.awaitTermination()
Para obter detalhes sobre a API do Spark Stream, consulte o Guia de Programação do Apache Spark Streaming.
O aplicativo de exemplo a seguir é independente, assim, é possível executá-lo dentro de um Bloco de Anotações do Jupyter. Este exemplo cria uma fonte de dados fictícia na classe DummySource, que gera o valor de um contador e da hora atual em milissegundos a cada cinco segundos. Um novo objeto StreamingContext tem um intervalo de lote de 30 segundos. Sempre que um lote é criado, o aplicativo de streaming examina o RDD produzido. Em seguida, ele converte o RDD para um DataFrame do Spark e cria uma tabela temporária no DataFrame.
class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {
/** Start the thread that simulates receiving data */
def onStart() {
new Thread("Dummy Source") { override def run() { receive() } }.start()
}
def onStop() { }
/** Periodically generate a random number from 0 to 9, and the timestamp */
private def receive() {
var counter = 0
while(!isStopped()) {
store(Iterator((counter, System.currentTimeMillis)))
counter += 1
Thread.sleep(5000)
}
}
}
// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))
// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)
// Create the stream
val stream = ssc.receiverStream(new DummySource())
// Process RDDs in the batch
stream.foreachRDD { rdd =>
// Access the SQLContext and create a table called demo_numbers we can query
val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
_sqlContext.createDataFrame(rdd).toDF("value", "time")
.registerTempTable("demo_numbers")
}
// Start the stream processing
ssc.start()
Aguarde cerca de 30 segundos depois de iniciar o aplicativo acima. Em seguida, você pode consultar o DataFrame de tempos em tempos para conferir o conjunto atual de valores presente no lote, usando, por exemplo, esta consulta SQL:
%%sql
SELECT * FROM demo_numbers
A saída resultante se parece com a seguinte saída:
value | time |
---|---|
10 | 1497314465256 |
11 | 1497314470272 |
12 | 1497314475289 |
13 | 1497314480310 |
14 | 1497314485327 |
15 | 1497314490346 |
Há seis valores, pois a DummySource cria um valor a cada 5 segundos e o aplicativo emite um lote a cada 30 segundos.
Janelas deslizantes
Para fazer cálculos de agregação no DStream por um determinado período, por exemplo, para obter uma temperatura média dos últimos dois segundos, use as operações de sliding window
(janela deslizante) incluídas no Spark Streaming. Uma janela deslizante tem uma duração (a duração da janela) e o intervalo durante o qual o conteúdo da janela é avaliado (o intervalo de deslizamento).
As janelas deslizantes podem se sobrepor; por exemplo, é possível definir uma janela com uma duração de dois segundos, que desliza a cada segundo. Isso significa que, sempre que você fizer um cálculo de agregação, a janela incluirá os dados do último segundo da janela anterior. E todos os novos dados do próximo segundo.
O exemplo a seguir atualiza o código que usa a DummySource, para coletar os lotes em uma janela com uma duração e um deslizamento, ambos de um minuto.
class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {
/** Start the thread that simulates receiving data */
def onStart() {
new Thread("Dummy Source") { override def run() { receive() } }.start()
}
def onStop() { }
/** Periodically generate a random number from 0 to 9, and the timestamp */
private def receive() {
var counter = 0
while(!isStopped()) {
store(Iterator((counter, System.currentTimeMillis)))
counter += 1
Thread.sleep(5000)
}
}
}
// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))
// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)
// Create the stream
val stream = ssc.receiverStream(new DummySource())
// Process batches in 1 minute windows
stream.window(org.apache.spark.streaming.Minutes(1)).foreachRDD { rdd =>
// Access the SQLContext and create a table called demo_numbers we can query
val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
_sqlContext.createDataFrame(rdd).toDF("value", "time")
.registerTempTable("demo_numbers")
}
// Start the stream processing
ssc.start()
Após o primeiro minuto, há 12 entradas – seis entradas de cada um dos dois lotes coletados na janela.
value | time |
---|---|
1 | 1497316294139 |
2 | 1497316299158 |
3 | 1497316304178 |
4 | 1497316309204 |
5 | 1497316314224 |
6 | 1497316319243 |
7 | 1497316324260 |
8 | 1497316329278 |
9 | 1497316334293 |
10 | 1497316339314 |
11 | 1497316344339 |
12 | 1497316349361 |
As funções de janela deslizante disponíveis na API Spark Streaming incluem window, countByWindow, reduceByWindow e countByValueAndWindow. Para obter detalhes sobre essas funções, consulte Transformações em DStreams.
Definindo o ponto de verificação
Para fornecer resiliência e tolerância a falhas, o Spark Streaming conta com um ponto de verificação para garantir que o processamento do fluxo continue sem interrupções, mesmo que ocorram falhas nos nós. O Spark cria pontos de verificação para um armazenamento durável (Armazenamento do Azure ou Data Lake Storage). Esses pontos de verificação armazenam os metadados sobre o aplicativo de streaming, como a configuração do aplicativo e as operações definidas por ele. Além disso, todos os lotes em fila que ainda não foram processados. Algumas vezes, os pontos de verificação também podem incluir o salvamento dos dados nos RDDs, a fim de reconstruir o estado dos dados mais rapidamente a partir daquilo que está presente nos RDDs gerenciados pelo Spark.
Implantar aplicativos Spark Streaming
Normalmente, você cria um aplicativo Spark Streaming localmente em um arquivo JAR. Em seguida, para implantá-lo no Spark pelo HDInsight, você copia o arquivo JAR para o armazenamento anexado padrão. Você pode iniciar o aplicativo usando as APIs REST do LIVY disponíveis no cluster com uma operação POST. O corpo da solicitação POST inclui um documento JSON que fornece o caminho para o JAR. E inclui também o nome da classe cujo método principal define e executa o aplicativo de streaming e, opcionalmente, os requisitos de recursos do trabalho (como a quantidade de executores, memória e núcleos). Além disso, todas as definições de configuração necessárias ao código do aplicativo.
O status de todos os aplicativos também pode ser verificado com uma solicitação GET em um ponto de extremidade LIVY. Por fim, é possível encerrar um aplicativo em execução emitindo uma solicitação DELETE no ponto de extremidade do LIVY. Para detalhes sobre a API LIVY, veja Trabalhos remotos com o Apache LIVY