Práce s datovými soubory pomocí Sparku
Po nastavení poznámkového bloku a jeho připojení ke clusteru můžete pomocí Sparku číst a zpracovávat datové soubory. Spark podporuje širokou škálu formátů, jako jsou CSV, JSON, Parquet, ORC, Avro a Delta, a Databricks, poskytuje integrované konektory pro přístup k souborům uloženým v pracovním prostoru, v Azure Data Lake nebo Blob Storage nebo v jiných externích systémech.
Pracovní postup se obvykle řídí třemi kroky:
Čtení souboru do datového rámce Spark pomocí spark.read se správným formátem a cestou Při čtení nezpracovaných textových formátů, jako je CSV nebo JSON, může Spark odvodit schéma (názvy sloupců a datové typy), ale někdy je to pomalé nebo nespolehlivé. Lepším postupem v produkčním prostředí je explicitně definovat schéma tak, aby se data načítala konzistentně a efektivně.
Prozkoumejte a transformujte datový rámec pomocí operací SQL nebo DataFrame (například filtrování řádků, výběr sloupců, agregace hodnot).
Zapište výsledky zpět do úložiště ve zvoleném formátu.
Práce se soubory ve Sparku je navržená tak, aby byla konzistentní v malých a velkých datových sadách. Stejný kód použitý k otestování malého souboru CSV bude fungovat také na mnohem větších datových sadách, protože Spark distribuuje práci napříč clusterem. Díky tomu je snazší vertikálně navýšit kapacitu z rychlého zkoumání na složitější zpracování dat.
Načtení dat do datového rámce
Pojďme se podívat na hypotetický příklad, abyste viděli, jak můžete datový rámec použít k práci s daty. Předpokládejme, že máte následující data v textovém souboru s oddělovači s názvem products.csv ve složce dat ve vašem úložišti systému souborů Databricks (DBFS):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
V poznámkovém bloku Sparku můžete pomocí následujícího kódu PySpark načíst data do datového rámce a zobrazit prvních 10 řádků:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Řádek %pyspark na začátku se nazývá magie a říká Sparku, že jazyk použitý v této buňce je PySpark. Tady je ekvivalentní kód Scala pro příklad dat produktů:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
Magie %spark se používá k určení scaly.
Návod
Můžete také vybrat jazyk, který chcete použít pro každou buňku v rozhraní poznámkového bloku.
Oba výše uvedené příklady by vytvořily výstup takto:
| ProduktID | ProductName | Kategorie | Katalogová cena |
|---|---|---|---|
| 771 | Hora-100 Stříbrná, 38 | Horská kola | 3399.9900 |
| 772 | Hora-100 Stříbrná, 42 | Horská kola | 3399.9900 |
| 773 | Hora-100 Stříbrná, 44 | Horská kola | 3399.9900 |
| ... | ... | ... | ... |
Určení schématu datového rámce
V předchozím příkladu obsahoval první řádek souboru CSV názvy sloupců a Spark dokázal odvodit datový typ každého sloupce z dat, která obsahuje. Můžete také zadat explicitní schéma dat, které je užitečné, když názvy sloupců nejsou zahrnuty do datového souboru, například v tomto příkladu CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Následující příklad PySpark ukazuje, jak zadat schéma pro datový rámec, který se má načíst ze souboru s názvem product-data.csv v tomto formátu:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Výsledky by se opět podobaly následujícímu:
| ProduktID | ProductName | Kategorie | Katalogová cena |
|---|---|---|---|
| 771 | Hora-100 Stříbrná, 38 | Horská kola | 3399.9900 |
| 772 | Hora-100 Stříbrná, 42 | Horská kola | 3399.9900 |
| 773 | Hora-100 Stříbrná, 44 | Horská kola | 3399.9900 |
| ... | ... | ... | ... |
Filtrování a seskupování datových rámců
Pomocí metod třídy datového rámce můžete filtrovat, řadit, seskupovat a jinak manipulovat s daty, která obsahuje. Následující příklad kódu například používá metodu select k načtení sloupců ProductName a ListPrice z datového rámce df obsahujícího data produktu v předchozím příkladu:
pricelist_df = df.select("ProductID", "ListPrice")
Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:
| ProduktID | Katalogová cena |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
U většiny metod manipulace s daty select vrací nový objekt datového rámce.
Návod
Výběr podmnožině sloupců z datového rámce je běžná operace, kterou lze dosáhnout také pomocí následující kratší syntaxe:
pricelist_df = df["ProductID", "ListPrice"]
Metody můžete "zřetězovat" a provést řadu manipulací, které mají za následek transformovaný datový rámec. Tento příklad například zřetězí select kód a where metody pro vytvoření nového datového rámce obsahujícího sloupce ProductName a ListPrice pro produkty s kategorií Horská kola nebo silniční kola:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:
| ProductName | Katalogová cena |
|---|---|
| Hora-100 Stříbrná, 38 | 3399.9900 |
| Road-750 Černá, velikost 52 | 539.9900 |
| ... | ... |
K seskupení a agregaci dat můžete použít metodu groupby a agregační funkce. Například následující kód PySpark spočítá počet produktů pro každou kategorii:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Výsledky z tohoto příkladu kódu by vypadaly přibližně takto:
| Kategorie | počítat |
|---|---|
| Sluchátka | 3 |
| Kola | 14 |
| Horská kola | 32 |
| ... | ... |
Poznámka:
Datové rámce Sparku jsou deklarativní a neměnné. Každá transformace (například select, filternebo groupBy) vytvoří nový datový rámec, který představuje to, co chcete, ne způsob spuštění. Díky tomu je kód opakovaně použitelný, optimalizovaný a bez vedlejších účinků. Ale žádná z těchto transformací se ve skutečnosti nespustí, dokud nespustíte akci (například display, collect, write), v jakém okamžiku Spark spustí úplný optimalizovaný plán.
Použití výrazů SQL ve Sparku
Rozhraní API datového rámce je součástí knihovny Spark s názvem Spark SQL, která datovým analytikům umožňuje dotazovat a manipulovat s daty pomocí výrazů SQL.
Vytváření databázových objektů v katalogu Spark
Katalog Sparku je metastor pro relační datové objekty, jako jsou zobrazení a tabulky. Modul runtime Sparku může pomocí katalogu bezproblémově integrovat kód napsaný v jakémkoli jazyce podporovaném sparkem s výrazy SQL, které můžou být pro některé datové analytiky nebo vývojáře přirozenější.
Jedním z nejjednodušších způsobů, jak zpřístupnit data v datovém rámci pro dotazování v katalogu Spark, je vytvořit dočasné zobrazení, jak je znázorněno v následujícím příkladu kódu:
df.createOrReplaceTempView("products")
Zobrazení je dočasné, což znamená, že se automaticky odstraní na konci aktuální relace. Můžete také vytvořit tabulky , které jsou trvalé v katalogu a definovat databázi, která se dá dotazovat pomocí Spark SQL.
Poznámka:
V tomto modulu nebudeme podrobně zkoumat tabulky katalogu Sparku, ale stojí za to si vzít čas na zvýraznění několika klíčových bodů:
- Prázdnou tabulku můžete vytvořit pomocí
spark.catalog.createTablemetody. Tabulky jsou struktury metadat, které ukládají jejich podkladová data do umístění úložiště přidruženého k katalogu. Odstraněním tabulky se odstraní také její podkladová data. - Datový rámec můžete uložit jako tabulku pomocí jeho
saveAsTablemetody. - Externí tabulku můžete vytvořit pomocí
spark.catalog.createExternalTablemetody. Externí tabulky definují metadata v katalogu, ale získávají podkladová data z externího umístění úložiště; obvykle složku v datovém jezeře. Odstranění externí tabulky neodstraní podkladová data.
Použití rozhraní Spark SQL API k dotazování dat
K dotazování dat v katalogu můžete použít rozhraní SPARK SQL API v kódu napsané v libovolném jazyce. Například následující kód PySpark používá dotaz SQL k vrácení dat z zobrazení produktů jako datového rámce.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Výsledky z příkladu kódu by vypadaly podobně jako v následující tabulce:
| ProductName | Katalogová cena |
|---|---|
| Hora-100 Stříbrná, 38 | 3399.9900 |
| Road-750 Černá, velikost 52 | 539.9900 |
| ... | ... |
Použití kódu SQL
Předchozí příklad ukazuje, jak pomocí rozhraní Spark SQL API vkládat výrazy SQL do kódu Sparku. V poznámkovém bloku můžete pomocí %sql magie také spustit kód SQL, který se dotazuje na objekty v katalogu, například takto:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Příklad kódu SQL vrátí sadu výsledků, která se automaticky zobrazí v poznámkovém bloku jako tabulka, například následující:
| Kategorie | PočetProduktů |
|---|---|
| Bib-Shorts | 3 |
| Stojany na kola | 0 |
| Kolové stojany | 0 |
| ... | ... |