Adatfájlok használata a Spark használatával

Befejeződött

A Spark használatának egyik előnye, hogy kódokat írhat és futtathat különböző programozási nyelveken, így használhatja a már meglévő programozási készségeket, és egy adott feladathoz a legmegfelelőbb nyelvet használhatja. Az új Azure Databricks Spark-jegyzetfüzetek alapértelmezett nyelve a PySpark – a Python Spark-optimalizált verziója, amelyet az adattudósok és az elemzők gyakran használnak az adatmanipuláció és -vizualizáció erős támogatása miatt. Emellett használhat olyan nyelveket is, mint a Scala (egy Java-alapú nyelv, amely interaktívan használható) és AZ SQL (a Spark SQL-kódtárban gyakran használt SQL-nyelv egy változata a relációs adatstruktúrák használatához). A szoftvermérnökök olyan lefordított megoldásokat is létrehozhatnak, amelyek a Sparkon futnak olyan keretrendszerek használatával, mint a Java.

Adatok feltárása adatkeretekkel

A Spark natív módon egy rugalmas elosztott adatkészletnek (RDD) nevezett adatstruktúrát használ, de bár közvetlenül RDD-kkel működő kódot írhat, a Sparkban a strukturált adatok kezelésére leggyakrabban használt adatstruktúra az adatkeret, amely a Spark SQL-kódtár részeként érhető el. A Spark-adatkeretek hasonlóak a mindenütt használt Pandas Python-kódtárakhoz, de a Spark elosztott feldolgozási környezetében való működésre vannak optimalizálva.

Feljegyzés

A Dataframe API mellett a Spark SQL egy erősen gépelt Adathalmaz API-t is biztosít, amelyet a Java és a Scala támogat. Ebben a modulban a Dataframe API-ra összpontosítunk.

Adatok betöltése adatkeretbe

Tekintsünk át egy hipotetikus példát, amelyből megtudhatja, hogyan használhat adatkereteket az adatok kezeléséhez. Tegyük fel, hogy a következő adatok egy products.csv nevű vesszővel tagolt szövegfájlban találhatók a Databricks Fájlrendszer (DBFS) tároló adatmappájában:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Egy Spark-jegyzetfüzetben az alábbi PySpark-kóddal töltheti be az adatokat egy adatkeretbe, és megjelenítheti az első 10 sort:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Az %pyspark elején lévő sort varázslatnak nevezik, és közli a Sparkkal, hogy az ebben a cellában használt nyelv a PySpark. Íme a termékek adatainak megfelelő Scala-kód:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

A varázslat %spark a Scala megadására szolgál.

Tipp.

Kiválaszthatja azt a nyelvet is, amelyet a Jegyzetfüzet felület minden egyes cellájában használni szeretne.

A korábban bemutatott mindkét példa az alábbihoz hasonló kimenetet eredményezne:

Termékazonosító ProductName Kategória ListPrice
771 Mountain-100 ezüst, 38 Hegyi kerékpárok 3399.9900
772 Mountain-100 ezüst, 42 Hegyi kerékpárok 3399.9900
773 Mountain-100 ezüst, 44 Hegyi kerékpárok 3399.9900
... ... ... ...

Adatkeretséma megadása

Az előző példában a CSV-fájl első sora tartalmazta az oszlopneveket, a Spark pedig az egyes oszlopok adattípusát tudta kikövetkeztetni a benne lévő adatokból. Explicit sémát is megadhat az adatokhoz, ami akkor hasznos, ha az oszlopnevek nem szerepelnek az adatfájlban, például az alábbi CSV-példában:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Az alábbi PySpark-példa bemutatja, hogyan adhatja meg a product-data.csv nevű fájlból betöltendő adatkeret sémáját ebben a formátumban:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Az eredmények ismét hasonlóak lesznek a következőhöz:

Termékazonosító ProductName Kategória ListPrice
771 Mountain-100 ezüst, 38 Hegyi kerékpárok 3399.9900
772 Mountain-100 ezüst, 42 Hegyi kerékpárok 3399.9900
773 Mountain-100 ezüst, 44 Hegyi kerékpárok 3399.9900
... ... ... ...

Adatkeretek szűrése és csoportosítása

A Dataframe osztály metódusaival szűrheti, rendezheti, csoportosíthatja és egyéb módon módosíthatja a benne lévő adatokat. Az alábbi kódpélda például a select metódussal kéri le a ProductName és a ListPrice oszlopokat az előző példában termékadatokat tartalmazó elosztott fájlrendszer adatkeretéből:

pricelist_df = df.select("ProductID", "ListPrice")

A példakód eredményei a következőképpen néznek ki:

Termékazonosító ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

A legtöbb adatmanipulációs módszer esetén a kiválasztás egy új adatkeret-objektumot ad vissza.

Tipp.

Az oszlopok egy részhalmazának kiválasztása egy adatkeretből gyakori művelet, amely az alábbi rövidebb szintaxissal is elérhető:

pricelist_df = df["ProductID", "ListPrice"]

A metódusok "összeláncolt" használatával olyan manipulációk sorozatát hajthatja végre, amelyek átalakított adatkeretet eredményeznek. Ez a példakód például a select éswhere metódusokat láncolva hoz létre egy új adatkeretet, amely tartalmazza a ProductName és ListPrice oszlopokat a Mountain Bikes vagy Road Bikes kategóriájú termékekhez:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

A példakód eredményei a következőképpen néznek ki:

ProductName ListPrice
Mountain-100 ezüst, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

Az adatok csoportosításához és összesítéséhez használhatja a GroupBy metódust és az összesítő függvényeket. Az alábbi PySpark-kód például megszámolja az egyes kategóriákhoz tartozó termékek számát:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

A példakód eredményei a következőképpen néznek ki:

Kategória darabszám
Fejhallgatók 3
Kerekek 14
Hegyi kerékpárok 32
... ...

SQL-kifejezések használata a Sparkban

A Dataframe API egy Spark SQL nevű Spark-kódtár része, amely lehetővé teszi, hogy az adatelemzők SQL-kifejezéseket használjanak az adatok lekérdezéséhez és kezeléséhez.

Adatbázis-objektumok létrehozása a Spark-katalógusban

A Spark-katalógus egy metaadattár relációs adatobjektumokhoz, például nézetekhez és táblákhoz. A Spark-futtatókörnyezet a katalógus használatával zökkenőmentesen integrálhatja a Spark által támogatott nyelven írt kódot olyan SQL-kifejezésekkel, amelyek bizonyos adatelemzők vagy fejlesztők számára természetesebbek lehetnek.

Az adatkeretek adatainak a Spark-katalógusban való lekérdezéséhez az egyik legegyszerűbb módja egy ideiglenes nézet létrehozása, ahogyan az a következő példakódban látható:

df.createOrReplaceTempView("products")

A nézet ideiglenes, ami azt jelenti, hogy az aktuális munkamenet végén automatikusan törlődik. Létrehozhat olyan táblákat is, amelyek megmaradnak a katalógusban, hogy meghatározzon egy, a Spark SQL használatával lekérdezhető adatbázist.

Feljegyzés

Ebben a modulban nem vizsgáljuk meg részletesen a Spark-katalógus táblázatát, de érdemes időt szakosíteni néhány fontos pont kiemeléséhez:

  • A metódussal spark.catalog.createTable üres táblát hozhat létre. A táblák metaadat-struktúrák, amelyek a mögöttes adatokat a katalógushoz társított tárolási helyen tárolják. A táblák törlése a mögöttes adatokat is törli.
  • Az adatkereteket táblázatként mentheti annak metódusával saveAsTable .
  • A metódussal spark.catalog.createExternalTable külső táblát is létrehozhat. A külső táblák metaadatokat határoznak meg a katalógusban, de külső tárolóhelyről szerzik be a mögöttes adatokat; általában egy data lake-beli mappa. A külső tábla törlése nem törli a mögöttes adatokat.

Adatok lekérdezése a Spark SQL API használatával

A Spark SQL API-t bármilyen nyelven írt kódban használhatja a katalógus adatainak lekérdezéséhez. Az alábbi PySpark-kód például egy SQL-lekérdezéssel adja vissza a termékek nézetéből származó adatokat adatkeretként.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

A példakód eredményei az alábbi táblázathoz hasonlóan néznek ki:

ProductName ListPrice
Mountain-100 ezüst, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

SQL-kód használata

Az előző példa bemutatja, hogyan ágyazhat be SQL-kifejezéseket a Spark-kódba a Spark SQL API használatával. A jegyzetfüzetekben a %sql varázslóval olyan SQL-kódot is futtathat, amely a katalógusban lévő objektumokat lekérdezi, az alábbi módon:

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Az SQL-kód példája egy olyan eredményhalmazt ad vissza, amely a jegyzetfüzetben táblázatként automatikusan megjelenik, az alábbihoz hasonlóan:

Kategória ProductCount
Bib-Shorts 3
Kerékpártartók 0
Kerékpárállványok 0
... ...