Analýza dat pomocí Sparku

Dokončeno

Jednou z výhod používání Sparku je, že můžete psát a spouštět kód v různých programovacích jazycích, což vám umožní používat programovací dovednosti, které už máte, a používat nejvhodnější jazyk pro daný úkol. Výchozím jazykem v novém poznámkovém bloku Sparku služby Azure Synapse Analytics je PySpark – verze Pythonu optimalizovaná pro Spark, kterou běžně používají datoví vědci a analytici kvůli silné podpoře manipulace s daty a vizualizací. Kromě toho můžete používat jazyky, jako je Scala (jazyk odvozený z Javy, který se dá interaktivně používat) a SQL (varianta běžně používaného jazyka SQL, který je součástí knihovny Spark SQL pro práci se strukturami relačních dat). Softwaroví inženýři můžou také vytvářet kompilovaná řešení, která běží ve Sparku pomocí architektur, jako je Java a Microsoft .NET.

Zkoumání dat pomocí datových rámců

Spark nativně používá datovou strukturu označovanou jako odolná distribuovaná datová sada (RDD), ale i když můžete psát kód, který funguje přímo se sadami RDD, nejčastěji používanou datovou strukturou pro práci se strukturovanými daty ve Sparku je datový rámec, který je součástí knihovny Spark SQL. Datové rámce ve Sparku jsou podobné datovým rámcům v všudypřítomné knihovně Pandas Pythonu, ale optimalizované pro práci v distribuovaném výpočetním prostředí Sparku.

Poznámka:

Kromě rozhraní API datového rámce poskytuje Spark SQL rozhraní API datové sady se silným typem, které je podporováno v Javě a Scala. V tomto modulu se zaměříme na rozhraní API datového rámce.

Načtení dat do datového rámce

Pojďme se podívat na hypotetický příklad, abyste viděli, jak můžete datový rámec použít k práci s daty. Předpokládejme, že máte následující data v textovém souboru s oddělovači s názvem products.csv v primárním účtu úložiště pro pracovní prostor Azure Synapse Analytics:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

V poznámkovém bloku Sparku můžete pomocí následujícího kódu PySpark načíst data do datového rámce a zobrazit prvních 10 řádků:

%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Řádek %%pyspark na začátku se nazývá magie a říká Sparku, že jazyk použitý v této buňce je PySpark. Jazyk, který chcete použít jako výchozí, můžete vybrat na panelu nástrojů rozhraní Poznámkový blok a pak pomocí magie přepsat tuto volbu pro konkrétní buňku. Tady je například ekvivalentní kód Scala pro příklad dat produktů:

%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))

Magie %%spark se používá k určení scaly.

Oba tyto ukázky kódu by vytvořily výstup podobný tomuto:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountain Bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain Bikes 3399.9900
... ... ... ...

Určení schématu datového rámce

V předchozím příkladu obsahoval první řádek souboru CSV názvy sloupců a Spark dokázal odvodit datový typ každého sloupce z dat, která obsahuje. Můžete také zadat explicitní schéma dat, které je užitečné, když názvy sloupců nejsou zahrnuty do datového souboru, například v tomto příkladu CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Následující příklad PySpark ukazuje, jak zadat schéma pro datový rámec, který se má načíst ze souboru s názvem product-data.csv v tomto formátu:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Výsledky by se opět podobaly následujícímu:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountain Bikes 3399.9900
772 Mountain-100 Silver, 42 Mountain Bikes 3399.9900
773 Mountain-100 Silver, 44 Mountain Bikes 3399.9900
... ... ... ...

Filtrování a seskupování datových rámců

Pomocí metod třídy datového rámce můžete filtrovat, řadit, seskupovat a jinak manipulovat s daty, která obsahuje. Například následující příklad kódu používá metodu select k načtení sloupců ProductName a ListPrice z datového rámce df obsahujícího data produktu v předchozím příkladu:

pricelist_df = df.select("ProductID", "ListPrice")

Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

U většiny metod manipulace s daty vrátí výběr nový objekt datového rámce.

Tip

Výběr podmnožině sloupců z datového rámce je běžná operace, kterou lze dosáhnout také pomocí následující kratší syntaxe:

pricelist_df = df["ProductID", "ListPrice"]

Metody můžete "zřetězovat" a provést řadu manipulací, které mají za následek transformovaný datový rámec. Například tento příklad kódu zřetězí výběr a kde metody vytvoření nového datového rámce obsahujícího sloupce ProductName a ListPrice pro produkty s kategorií horských kol nebo silničních kol:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

K seskupení a agregaci dat můžete použít metodu groupBy a agregační funkce. Například následující kód PySpark spočítá počet produktů pro každou kategorii:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:

Kategorie count
Sluchátka 3
Wheels 14
Mountain Bikes 32
... ...

Použití výrazů SQL ve Sparku

Rozhraní API datového rámce je součástí knihovny Spark s názvem Spark SQL, která datovým analytikům umožňuje dotazovat a manipulovat s daty pomocí výrazů SQL.

Vytváření databázových objektů v katalogu Spark

Katalog Sparku je metastor pro relační datové objekty, jako jsou zobrazení a tabulky. Modul runtime Sparku může pomocí katalogu bezproblémově integrovat kód napsaný v jakémkoli jazyce podporovaném sparkem s výrazy SQL, které můžou být pro některé datové analytiky nebo vývojáře přirozenější.

Jedním z nejjednodušších způsobů, jak zpřístupnit data v datovém rámci pro dotazování v katalogu Spark, je vytvořit dočasné zobrazení, jak je znázorněno v následujícím příkladu kódu:

df.createOrReplaceTempView("products")

Zobrazení je dočasné, což znamená, že se automaticky odstraní na konci aktuální relace. Můžete také vytvořit tabulky , které jsou trvalé v katalogu a definovat databázi, která se dá dotazovat pomocí Spark SQL.

Poznámka:

V tomto modulu nebudeme podrobně zkoumat tabulky katalogu Sparku, ale stojí za to si vzít čas na zvýraznění několika klíčových bodů:

  • Prázdnou tabulku můžete vytvořit pomocí spark.catalog.createTable metody. Tabulky jsou struktury metadat, které ukládají jejich podkladová data do umístění úložiště přidruženého k katalogu. Odstraněním tabulky se odstraní také její podkladová data.
  • Datový rámec můžete uložit jako tabulku pomocí jeho saveAsTable metody.
  • Externí tabulku můžete vytvořit pomocí spark.catalog.createExternalTable metody. Externí tabulky definují metadata v katalogu, ale získávají podkladová data z externího umístění úložiště; obvykle složku v datovém jezeře. Odstraněním externí tabulky nedojde k odstranění podkladových dat.

Použití rozhraní Spark SQL API k dotazování dat

K dotazování dat v katalogu můžete použít rozhraní SPARK SQL API v kódu napsané v libovolném jazyce. Například následující kód PySpark používá dotaz SQL k vrácení dat z zobrazení produktů jako datového rámce.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Výsledky z příkladu kódu by vypadaly podobně jako v následující tabulce:

ProductID ProductName ListPrice
38 Mountain-100 Silver, 38 3399.9900
52 Road-750 Black, 52 539.9900
... ... ...

Použití kódu SQL

Předchozí příklad ukazuje, jak pomocí rozhraní Spark SQL API vkládat výrazy SQL do kódu Sparku. V poznámkovém bloku můžete pomocí %%sql magie také spustit kód SQL, který se dotazuje na objekty v katalogu, například takto:

%%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Příklad kódu SQL vrátí sadu výsledků, která se automaticky zobrazí v poznámkovém bloku jako tabulka, například následující:

Kategorie ProductCount
Bib-Shorts 3
Stojany na kola 0
Kolové stojany 0
... ...