Adatfájlok használata a Spark használatával
A Spark használatának egyik előnye, hogy kódokat írhat és futtathat különböző programozási nyelveken, így használhatja a már meglévő programozási készségeket, és egy adott feladathoz a legmegfelelőbb nyelvet használhatja. Az új Azure Databricks Spark-jegyzetfüzetek alapértelmezett nyelve a PySpark – a Python Spark-optimalizált verziója, amelyet az adattudósok és az elemzők gyakran használnak az adatmanipuláció és -vizualizáció erős támogatása miatt. Emellett használhat olyan nyelveket is, mint a Scala (egy Java-alapú nyelv, amely interaktívan használható) és AZ SQL (a Spark SQL-kódtárban gyakran használt SQL-nyelv egy változata a relációs adatstruktúrák használatához). A szoftvermérnökök olyan lefordított megoldásokat is létrehozhatnak, amelyek a Sparkon futnak olyan keretrendszerek használatával, mint a Java.
Adatok feltárása adatkeretekkel
A Spark natív módon egy rugalmas elosztott adatkészletnek (RDD) nevezett adatstruktúrát használ, de bár közvetlenül RDD-kkel működő kódot írhat, a Sparkban a strukturált adatok kezelésére leggyakrabban használt adatstruktúra az adatkeret, amely a Spark SQL-kódtár részeként érhető el. A Spark-adatkeretek hasonlóak a mindenütt használt Pandas Python-kódtárakhoz, de a Spark elosztott feldolgozási környezetében való működésre vannak optimalizálva.
Feljegyzés
A Dataframe API mellett a Spark SQL egy erősen gépelt Adathalmaz API-t is biztosít, amelyet a Java és a Scala támogat. Ebben a modulban a Dataframe API-ra összpontosítunk.
Adatok betöltése adatkeretbe
Tekintsünk át egy hipotetikus példát, amelyből megtudhatja, hogyan használhat adatkereteket az adatok kezeléséhez. Tegyük fel, hogy a következő adatok egy products.csv nevű vesszővel tagolt szövegfájlban találhatók a Databricks Fájlrendszer (DBFS) tároló adatmappájában:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Egy Spark-jegyzetfüzetben az alábbi PySpark-kóddal töltheti be az adatokat egy adatkeretbe, és megjelenítheti az első 10 sort:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Az %pyspark
elején lévő sort varázslatnak nevezik, és közli a Sparkkal, hogy az ebben a cellában használt nyelv a PySpark. Íme a termékek adatainak megfelelő Scala-kód:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
A varázslat %spark
a Scala megadására szolgál.
Tipp.
Kiválaszthatja azt a nyelvet is, amelyet a Jegyzetfüzet felület minden egyes cellájában használni szeretne.
A korábban bemutatott mindkét példa az alábbihoz hasonló kimenetet eredményezne:
Termékazonosító | ProductName | Kategória | ListPrice |
---|---|---|---|
771 | Mountain-100 ezüst, 38 | Hegyi kerékpárok | 3399.9900 |
772 | Mountain-100 ezüst, 42 | Hegyi kerékpárok | 3399.9900 |
773 | Mountain-100 ezüst, 44 | Hegyi kerékpárok | 3399.9900 |
... | ... | ... | ... |
Adatkeretséma megadása
Az előző példában a CSV-fájl első sora tartalmazta az oszlopneveket, a Spark pedig az egyes oszlopok adattípusát tudta kikövetkeztetni a benne lévő adatokból. Explicit sémát is megadhat az adatokhoz, ami akkor hasznos, ha az oszlopnevek nem szerepelnek az adatfájlban, például az alábbi CSV-példában:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Az alábbi PySpark-példa bemutatja, hogyan adhatja meg a product-data.csv nevű fájlból betöltendő adatkeret sémáját ebben a formátumban:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Az eredmények ismét hasonlóak lesznek a következőhöz:
Termékazonosító | ProductName | Kategória | ListPrice |
---|---|---|---|
771 | Mountain-100 ezüst, 38 | Hegyi kerékpárok | 3399.9900 |
772 | Mountain-100 ezüst, 42 | Hegyi kerékpárok | 3399.9900 |
773 | Mountain-100 ezüst, 44 | Hegyi kerékpárok | 3399.9900 |
... | ... | ... | ... |
Adatkeretek szűrése és csoportosítása
A Dataframe osztály metódusaival szűrheti, rendezheti, csoportosíthatja és egyéb módon módosíthatja a benne lévő adatokat. Az alábbi kódpélda például a select metódussal kéri le a ProductName és a ListPrice oszlopokat az előző példában termékadatokat tartalmazó elosztott fájlrendszer adatkeretéből:
pricelist_df = df.select("ProductID", "ListPrice")
A példakód eredményei a következőképpen néznek ki:
Termékazonosító | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
A legtöbb adatmanipulációs módszer esetén a kiválasztás egy új adatkeret-objektumot ad vissza.
Tipp.
Az oszlopok egy részhalmazának kiválasztása egy adatkeretből gyakori művelet, amely az alábbi rövidebb szintaxissal is elérhető:
pricelist_df = df["ProductID", "ListPrice"]
A metódusok "összeláncolt" használatával olyan manipulációk sorozatát hajthatja végre, amelyek átalakított adatkeretet eredményeznek. Ez a példakód például a select éswhere metódusokat láncolva hoz létre egy új adatkeretet, amely tartalmazza a ProductName és ListPrice oszlopokat a Mountain Bikes vagy Road Bikes kategóriájú termékekhez:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
A példakód eredményei a következőképpen néznek ki:
ProductName | ListPrice |
---|---|
Mountain-100 ezüst, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Az adatok csoportosításához és összesítéséhez használhatja a GroupBy metódust és az összesítő függvényeket. Az alábbi PySpark-kód például megszámolja az egyes kategóriákhoz tartozó termékek számát:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
A példakód eredményei a következőképpen néznek ki:
Kategória | darabszám |
---|---|
Fejhallgatók | 3 |
Kerekek | 14 |
Hegyi kerékpárok | 32 |
... | ... |
SQL-kifejezések használata a Sparkban
A Dataframe API egy Spark SQL nevű Spark-kódtár része, amely lehetővé teszi, hogy az adatelemzők SQL-kifejezéseket használjanak az adatok lekérdezéséhez és kezeléséhez.
Adatbázis-objektumok létrehozása a Spark-katalógusban
A Spark-katalógus egy metaadattár relációs adatobjektumokhoz, például nézetekhez és táblákhoz. A Spark-futtatókörnyezet a katalógus használatával zökkenőmentesen integrálhatja a Spark által támogatott nyelven írt kódot olyan SQL-kifejezésekkel, amelyek bizonyos adatelemzők vagy fejlesztők számára természetesebbek lehetnek.
Az adatkeretek adatainak a Spark-katalógusban való lekérdezéséhez az egyik legegyszerűbb módja egy ideiglenes nézet létrehozása, ahogyan az a következő példakódban látható:
df.createOrReplaceTempView("products")
A nézet ideiglenes, ami azt jelenti, hogy az aktuális munkamenet végén automatikusan törlődik. Létrehozhat olyan táblákat is, amelyek megmaradnak a katalógusban, hogy meghatározzon egy, a Spark SQL használatával lekérdezhető adatbázist.
Feljegyzés
Ebben a modulban nem vizsgáljuk meg részletesen a Spark-katalógus táblázatát, de érdemes időt szakosíteni néhány fontos pont kiemeléséhez:
- A metódussal
spark.catalog.createTable
üres táblát hozhat létre. A táblák metaadat-struktúrák, amelyek a mögöttes adatokat a katalógushoz társított tárolási helyen tárolják. A táblák törlése a mögöttes adatokat is törli. - Az adatkereteket táblázatként mentheti annak metódusával
saveAsTable
. - A metódussal
spark.catalog.createExternalTable
külső táblát is létrehozhat. A külső táblák metaadatokat határoznak meg a katalógusban, de külső tárolóhelyről szerzik be a mögöttes adatokat; általában egy data lake-beli mappa. A külső tábla törlése nem törli a mögöttes adatokat.
Adatok lekérdezése a Spark SQL API használatával
A Spark SQL API-t bármilyen nyelven írt kódban használhatja a katalógus adatainak lekérdezéséhez. Az alábbi PySpark-kód például egy SQL-lekérdezéssel adja vissza a termékek nézetéből származó adatokat adatkeretként.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
A példakód eredményei az alábbi táblázathoz hasonlóan néznek ki:
ProductName | ListPrice |
---|---|
Mountain-100 ezüst, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
SQL-kód használata
Az előző példa bemutatja, hogyan ágyazhat be SQL-kifejezéseket a Spark-kódba a Spark SQL API használatával. A jegyzetfüzetekben a %sql
varázslóval olyan SQL-kódot is futtathat, amely a katalógusban lévő objektumokat lekérdezi, az alábbi módon:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Az SQL-kód példája egy olyan eredményhalmazt ad vissza, amely a jegyzetfüzetben táblázatként automatikusan megjelenik, az alábbihoz hasonlóan:
Kategória | ProductCount |
---|---|
Bib-Shorts | 3 |
Kerékpártartók | 0 |
Kerékpárállványok | 0 |
... | ... |