Cvičení – streamování dat Kafka do poznámkového bloku Jupyter a okno dat

Dokončeno

Cluster Kafka teď zapisuje data do svého protokolu, který je možné zpracovat prostřednictvím strukturovaného streamování Sparku.

Poznámkový blok Sparku je součástí ukázky, kterou jste naklonovali, takže ho musíte nahrát do clusteru Spark, abyste ho mohli použít.

Nahrání poznámkového bloku Pythonu do clusteru Spark

  1. Na webu Azure Portal klikněte na domovské > clustery HDInsight a pak vyberte cluster Spark, který jste právě vytvořili (ne cluster Kafka).

  2. V podokně Řídicí panely clusteru klikněte na poznámkový blok Jupyter.

    Opening a Jupyter notebook

  3. Po zobrazení výzvy k zadání přihlašovacích údajů zadejte uživatelské jméno správce a zadejte heslo, které jste vytvořili při vytváření clusterů. Zobrazí se web Jupyter.

  4. Klikněte na PySpark a potom na stránce PySpark klikněte na Nahrát.

  5. Přejděte do umístění, kam jste stáhli ukázku z GitHubu, vyberte soubor RealTimeStocks.ipynb, klikněte na Otevřít, potom klikněte na Nahrát a potom klikněte na Aktualizovat v internetovém prohlížeči.

  6. Jakmile se poznámkový blok nahraje do složky PySpark, kliknutím na RealTimeStocks.ipynb otevřete poznámkový blok v prohlížeči.

  7. První buňku v poznámkovém bloku spusťte umístěním kurzoru do buňky a následným kliknutím na Shift+Enter buňku spusťte.

    Buňka Konfigurace knihoven a balíčků se úspěšně dokončí, když zobrazí zprávu o spuštění aplikace Spark a další informace, jak je znázorněno na následující obrazovce popis.

    Configuring libraries in a Jupyter notebook

  8. V buňce Set-up Připojení ion na platformě Kafka na řádku .option("kafka.bootstrap.servers"; "") zadejte zprostředkovatele Kafka mezi druhou sadou uvozovek. Příklad: .option("kafka.bootstrap.servers", "wn0-kafka.mdlamldganads.gx.internal.cloudapp.net:9092") a potom kliknutím na Shift+Enter buňku spusťte.

    Nastavení Připojení ion na buňku Kafka se úspěšně dokončí, když zobrazí zprávu inputDf: org.apache.spark.sql.DataFrame = [klíč: binární, hodnota: binární ... 5 dalších polí]. Spark ke čtení dat používá rozhraní API readStream.

    Set-up a connection to Kafka

  9. Vyberte buňku Číst ze systému Kafka do buňky streamovaného datového rámce a potom kliknutím na Shift+Enter buňku spusťte.

    Buňka se úspěšně dokončí, když zobrazí následující zprávu: stockDf: org.apache.spark.sql.DataFrame = [symbol: řetězec, čas: řetězec ... 2 další pole]

    Read from Kafka into Streaming Dataframe

  10. Vyberte výstupní datový rámec streamování do buňky konzoly a kliknutím na Shift+Enter buňku spusťte.

    Buňka se úspěšně dokončí, když zobrazí podobné informace. Výstup zobrazuje hodnotu pro každou buňku, jak byla předána v mikrodávce, a existuje jedna dávka za sekundu.

    Output a Streaming Dataframe to a Console

  11. Vyberte buňku Min/ Max s oknem a potom kliknutím na Shift +Enter buňku spusťte.

    Buňka se úspěšně dokončí, když poskytne maximální a minimální cenu pro každou akcii v 4sekundovém okně, které je definováno v buňce. Jak je popsáno v předchozí lekci, poskytování informací o konkrétních časových obdobích je jednou z výhod, které získáte pomocí strukturovaného streamování Sparku.

    An example of a using a minimum and maximum aggregate function

  12. Vyberte možnost Shromáždit všechny hodnoty akcií v buňce okna a potom kliknutím na Shift +Enter buňku spusťte.

    Buňka se úspěšně dokončí, když poskytuje tabulku hodnot pro akcie v tabulce. Výstupnímode je dokončen, takže se zobrazí všechna data.

    An example of a using a total aggregate function

V této lekci jste nahráli poznámkový blok Jupyter do clusteru Spark, připojili jste ho ke clusteru Kafka, vypíšete streamovaná data vytvořená souborem producenta Pythonu do poznámkového bloku Sparku, definovali jste okno pro streamovaná data a zobrazili jste v tomto okně vysoké a nízké ceny akcií a zobrazili jste všechny hodnoty akcií v tabulce. Blahopřejeme, úspěšně jste provedli strukturované streamování pomocí Sparku a Kafka!