Praca z danymi w ramce danych platformy Spark

Ukończone

W poprzedniej lekcji przedstawiono sposób nawiązywania połączenia ze źródłem danych, ładowania danych do ramki danych i opcjonalnie zapisywania ramki danych w usłudze Lakehouse jako pliku lub tabeli. Teraz przyjrzyjmy się ramce danych bardziej szczegółowo.

Natywnie platforma Spark używa struktury danych nazywanej odpornym rozproszonym zestawem danych (RDD), ale chociaż można napisać kod, który działa bezpośrednio z RDD, najczęściej używaną strukturą danych do pracy z danymi ustrukturyzowanymi na platformie Spark jest ramka danych, która jest udostępniana jako część biblioteki Spark SQL. Ramki danych na platformie Spark są podobne do tych w wszechobecnej bibliotece języka Python biblioteki Pandas , ale zoptymalizowane pod kątem pracy w środowisku przetwarzania rozproszonego platformy Spark.

Uwaga

Oprócz interfejsu API ramki danych platforma Spark SQL udostępnia silnie typizowane interfejs API zestawu danych , który jest obsługiwany w językach Java i Scala. Skupimy się na interfejsie API ramki danych w tym module.

Ładowanie danych do ramki danych

Przyjrzyjmy się hipotetycznym przykładom, aby zobaczyć, jak można użyć ramki danych do pracy z danymi. Załóżmy, że masz następujące dane w pliku tekstowym rozdzielonym przecinkami o nazwie products.csv w folderze Pliki/dane w usłudze Lakehouse:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Wnioskowanie schematu

W notesie platformy Spark możesz użyć następującego kodu PySpark, aby załadować dane pliku do ramki danych i wyświetlić pierwsze 10 wierszy:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Jak wiesz wcześniej, %%pyspark linia na początku jest nazywana magią i informuje platformę Spark, że język używany w tej komórce to PySpark. W większości przypadków PySpark jest językiem domyślnym; i ogólnie będziemy trzymać się go w przykładach w tym module. Jednak w przypadku kompletności oto odpowiedni kod Scala dla przykładu danych produktów:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

Magia %%spark służy do określania Scala. Zwróć uwagę, że implementacja scala ramki danych działa podobnie do wersji PySpark.

Oba te przykłady kodu spowodują wygenerowanie danych wyjściowych w następujący sposób:

ProductID ProductName Kategoria ListPrice
771 Górskie — 100 srebrnych, 38 Rowery górskie 3399.9900
772 Górskie — 100 srebrnych, 42 Rowery górskie 3399.9900
773 Górskie — 100 srebrnych, 44 Rowery górskie 3399.9900
... ... ... ...

Określanie jawnego schematu

W poprzednim przykładzie pierwszy wiersz pliku CSV zawierał nazwy kolumn, a platforma Spark mogła wywnioskować typ danych każdej kolumny z danych, które zawiera. Można również określić jawny schemat dla danych, co jest przydatne, gdy nazwy kolumn nie są uwzględnione w pliku danych, podobnie jak w tym przykładzie CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

W poniższym przykładzie PySpark pokazano, jak określić schemat ramki danych do załadowania z pliku o nazwie product-data.csv w tym formacie:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Wyniki będą ponownie podobne do następujących:

ProductID ProductName Kategoria ListPrice
771 Górskie — 100 srebrnych, 38 Rowery górskie 3399.9900
772 Górskie — 100 srebrnych, 42 Rowery górskie 3399.9900
773 Górskie — 100 srebrnych, 44 Rowery górskie 3399.9900
... ... ... ...

Napiwek

Określanie jawnego schematu zwiększa również wydajność!

Filtrowanie i grupowanie ramek danych

Metody klasy Ramka danych umożliwiają filtrowanie, sortowanie, grupowanie i manipulowanie danymi, które zawiera. Na przykład poniższy przykład kodu używa metody select w celu pobrania kolumn ProductID i ListPrice z ramki danych df zawierającej dane produktu w poprzednim przykładzie:

pricelist_df = df.select("ProductID", "ListPrice")

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

W przypadku większości metod manipulowania danymi wybierz polecenie zwraca nowy obiekt ramki danych.

Napiwek

Wybranie podzbioru kolumn z ramki danych jest wspólną operacją, którą można również osiągnąć przy użyciu następującej krótszej składni:

pricelist_df = df["ProductID", "ListPrice"]

Metody "łańcuchowe" można łączyć w celu wykonania serii manipulacji, które skutkują przekształconą ramką danych. Na przykład ten przykładowy kod tworzy łańcuch wyboru i miejsca tworzenia nowej ramki danych zawierającej kolumny ProductName i ListPrice dla produktów z kategorią Mountain Bikes lub Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

ProductName Kategoria ListPrice
Górskie — 100 srebrnych, 38 Rowery górskie 3399.9900
Road-750 Black, 52 Rowery szosowe 539.9900
... ... ...

Aby grupować i agregować dane, możesz użyć metody groupBy i funkcji agregacji. Na przykład następujący kod PySpark zlicza liczbę produktów dla każdej kategorii:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

Kategoria count
Zestawy słuchawkowe 3
Wheels 14
Rowery górskie 32
... ...