Упражнение. Потоковая передача данных Kafka в записную книжку Jupyter и определение окна для данных

Завершено

Теперь кластер Kafka записывает данные в свой журнал, который можно обработать с помощью структурированной потоковой передачи Spark.

Записная книжка Spark включена в клонированный пример, поэтому для его использования необходимо передать эту записную книжку в кластер Spark.

Отправка записной книжки Python в кластер Spark

  1. На портале Azure выберите "Главная" > "Кластеры HDInsight", а затем выберите только что созданный кластер Spark (не кластер Kafka).

  2. В области "Информационные панели кластера" щелкните "Записная книжка Jupyter".

    Opening a Jupyter notebook

  3. При появлении запроса на ввод учетных данных введите admin в качестве имени пользователя, а также пароль, указанный при создании кластеров. Отобразится веб-сайт Jupyter.

  4. Щелкните PySpark, а затем на странице PySpark нажмите кнопку Upload (Отправить).

  5. Перейдите в папку, в которую был скачан пример из GitHub, выберите файл RealTimeStocks.ipynb, а затем нажмите кнопку Open (Открыть), нажмите Upload (Отправить), а затем и нажмите кнопку "Обновить" в браузере.

  6. После отправки записной книжки в папку PySpark щелкните RealTimeStocks.ipynb, чтобы открыть записную книжку в браузере.

  7. Выполните первую ячейку в записной книжке, поместив курсор в ячейку, а затем нажав клавиши SHIFT + ВВОД, чтобы выполнить ячейку.

    На успешное выполнение ячейки Настройка библиотек и пакетов указывают сообщения Starting Spark application (Запуск приложения Spark) и дополнительные сведения, как показано в следующем снимке экрана.

    Configuring libraries in a Jupyter notebook

  8. В ячейке Настройка подключения к Kafka в строке.option("kafka.bootstrap.servers", "") введите брокер Kafka между вторым набором кавычек. Например, .option("kafka.bootstrap.servers", "wn0-kafka.mdlamldganads.gx.internal.cloudapp.net:9092"), а затем нажмите клавиши SHIFT + ВВОД, чтобы выполнить ячейку.

    Настройка Подключение в ячейку Kafka завершается успешно, когда отображается входное сообщение: org.apache.spark.sql.DataFrame = [key: binary, value: binary... 5 дополнительных полей]. Для чтения данных Spark использует API readStream.

    Set-up a connection to Kafka

  9. Выберите ячейку Чтение из Kafka в потоковый кадр данных, а затем нажмите клавиши SHIFT + ВВОД, чтобы выполнить ячейку.

    На успешное выполнение ячейки указывает следующее сообщение: stockDf: org.apache.spark.sql.DataFrame = [symbol: string, time: string ... 2 more fields]

    Read from Kafka into Streaming Dataframe

  10. Выберите ячейку Вывод потокового кадра данных в консоль, а затем нажмите клавиши SHIFT + ВВОД, чтобы выполнить ячейку.

    На успешное выполнение этой ячейки указывают сведения, аналогичные приведенным ниже. В выходных данных отображается значение для каждой ячейки в том виде, в каком оно было передано в микропакете, — один пакет в секунду.

    Output a Streaming Dataframe to a Console

  11. Выберите ячейку "Минимум/максимум акции в окне", а затем нажмите клавиши SHIFT + ВВОД, чтобы выполнить ячейку.

    На успешное выполнение ячейки указывает отображение максимальной и минимальной цены за каждую акцию в четырехсекундном окне, которое определено в ячейке. Как обсуждалось ранее, предоставление сведений о конкретных окнах времени является одним из преимуществ структурированной потоковой передачи Spark.

    An example of a using a minimum and maximum aggregate function

  12. Выберите ячейку "Сбор всех значений акций в окне", а затем нажмите клавиши SHIFT + ВВОД, чтобы выполнить ячейку.

    На успешное выполнение ячейки указывает отображение таблицы значений для акций. Для параметра outputMode указано complete, поэтому отображаются все данные.

    An example of a using a total aggregate function

В этом уроке вы отправили записную книжку Jupyter в кластер Spark, подключили ее к кластеру Kafka, вывели потоковые данные, созданные с помощью файла производителя Python, в записную книжку Spark, определили окно для потоковых данных и отобразили высокие и низкие цены на акции в этом окне, а также вывели все значения цены на акции в виде таблицы. Поздравляем! Вы успешно выполнили структурированную потоковую передачу с помощью Spark и Kafka!