Analizowanie danych za pomocą platformy Spark

Ukończone

Jedną z zalet korzystania z platformy Spark jest możliwość pisania i uruchamiania kodu w różnych językach programowania, dzięki czemu możesz używać już posiadanych umiejętności programistycznych i używać najbardziej odpowiedniego języka dla danego zadania. Domyślnym językiem w nowym notesie platformy Spark usługi Azure Synapse Analytics jest PySpark — zoptymalizowana pod kątem platformy Spark wersja języka Python, która jest często używana przez analityków i analityków danych ze względu na silną obsługę manipulowania danymi i wizualizacji. Ponadto można używać języków, takich jak Scala (język pochodny języka Java, który może być używany interaktywnie) i SQL (wariant powszechnie używanego języka SQL zawartego w bibliotece Spark SQL do pracy ze strukturami danych relacyjnych). Inżynierowie oprogramowania mogą również tworzyć skompilowane rozwiązania uruchamiane na platformie Spark przy użyciu platform takich jak Java i Microsoft .NET.

Eksplorowanie danych za pomocą ramek danych

Natywnie platforma Spark używa struktury danych nazywanej odpornym rozproszonym zestawem danych (RDD), ale chociaż możnanapisać kod, który działa bezpośrednio z RDD, najczęściej używaną strukturą danych do pracy z danymi ustrukturyzowanymi na platformie Spark jest ramka danych, która jest udostępniana jako część biblioteki Spark SQL. Ramki danych na platformie Spark są podobne do tych w wszechobecnej bibliotece języka Python biblioteki Pandas , ale zoptymalizowane pod kątem pracy w środowisku przetwarzania rozproszonego platformy Spark.

Uwaga

Oprócz interfejsu API ramki danych platforma Spark SQL udostępnia silnie typizowane interfejs API zestawu danych , który jest obsługiwany w językach Java i Scala. Skupimy się na interfejsie API ramki danych w tym module.

Ładowanie danych do ramki danych

Przyjrzyjmy się hipotetycznym przykładom, aby zobaczyć, jak można użyć ramki danych do pracy z danymi. Załóżmy, że masz następujące dane w pliku tekstowym rozdzielonym przecinkami o nazwie products.csv na podstawowym koncie magazynu dla obszaru roboczego usługi Azure Synapse Analytics:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

W notesie platformy Spark możesz użyć następującego kodu PySpark, aby załadować dane do ramki danych i wyświetlić pierwsze 10 wierszy:

%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Wiersz %%pyspark na początku jest nazywany magią i informuje platformę Spark, że język używany w tej komórce to PySpark. Możesz wybrać język, którego chcesz użyć jako domyślnego na pasku narzędzi interfejsu notesu, a następnie użyć magii, aby zastąpić ten wybór dla określonej komórki. Na przykład oto odpowiedni kod Scala dla danych produktów:

%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))

Magia %%spark służy do określania Scala.

Oba te przykłady kodu spowodują wygenerowanie danych wyjściowych w następujący sposób:

ProductID ProductName Kategoria ListPrice
771 Górskie — 100 srebrnych, 38 Rowery górskie 3399.9900
772 Górskie — 100 srebrnych, 42 Rowery górskie 3399.9900
773 Górskie — 100 srebrnych, 44 Rowery górskie 3399.9900
... ... ... ...

Określanie schematu ramki danych

W poprzednim przykładzie pierwszy wiersz pliku CSV zawierał nazwy kolumn, a platforma Spark mogła wywnioskować typ danych każdej kolumny z danych, które zawiera. Można również określić jawny schemat dla danych, co jest przydatne, gdy nazwy kolumn nie są uwzględnione w pliku danych, podobnie jak w tym przykładzie CSV:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

W poniższym przykładzie PySpark pokazano, jak określić schemat ramki danych do załadowania z pliku o nazwie product-data.csv w tym formacie:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Wyniki będą ponownie podobne do następujących:

ProductID ProductName Kategoria ListPrice
771 Górskie — 100 srebrnych, 38 Rowery górskie 3399.9900
772 Górskie — 100 srebrnych, 42 Rowery górskie 3399.9900
773 Górskie — 100 srebrnych, 44 Rowery górskie 3399.9900
... ... ... ...

Filtrowanie i grupowanie ramek danych

Metody klasy Ramka danych umożliwiają filtrowanie, sortowanie, grupowanie i manipulowanie danymi, które zawiera. Na przykład poniższy przykład kodu używa metody select , aby pobrać kolumny ProductName i ListPrice z ramki danych df zawierającej dane produktu w poprzednim przykładzie:

pricelist_df = df.select("ProductID", "ListPrice")

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

W przypadku większości metod manipulowania danymi wybierz polecenie zwraca nowy obiekt ramki danych.

Napiwek

Wybranie podzbioru kolumn z ramki danych jest wspólną operacją, którą można również osiągnąć przy użyciu następującej krótszej składni:

pricelist_df = df["ProductID", "ListPrice"]

Metody "łańcuchowe" można łączyć w celu wykonania serii manipulacji, które skutkują przekształconą ramką danych. Na przykład ten przykładowy kod tworzy łańcuch wyboru i miejsca tworzenia nowej ramki danych zawierającej kolumny ProductName i ListPrice dla produktów z kategorią Mountain Bikes lub Road Bikes:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

ProductName ListPrice
Górskie — 100 srebrnych, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

Aby grupować i agregować dane, możesz użyć metody groupBy i funkcji agregacji. Na przykład następujący kod PySpark zlicza liczbę produktów dla każdej kategorii:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:

Kategoria count
Zestawy słuchawkowe 3
Wheels 14
Rowery górskie 32
... ...

Używanie wyrażeń SQL na platformie Spark

Interfejs API ramki danych jest częścią biblioteki Spark o nazwie Spark SQL, która umożliwia analitykom danych używanie wyrażeń SQL do wykonywania zapytań o dane i manipulowania nimi.

Tworzenie obiektów bazy danych w wykazie platformy Spark

Wykaz platformy Spark to magazyn metadanych dla obiektów danych relacyjnych, takich jak widoki i tabele. Środowisko uruchomieniowe platformy Spark może używać wykazu do bezproblemowego integrowania kodu napisanego w dowolnym języku obsługiwanym przez platformę Spark z wyrażeniami SQL, które mogą być bardziej naturalne dla niektórych analityków danych lub deweloperów.

Jednym z najprostszych sposobów udostępniania danych w ramce danych na potrzeby wykonywania zapytań w wykazie platformy Spark jest utworzenie widoku tymczasowego, jak pokazano w poniższym przykładzie kodu:

df.createOrReplaceTempView("products")

Widok jest tymczasowy, co oznacza, że jest on automatycznie usuwany na końcu bieżącej sesji. Możesz również utworzyć tabele , które są utrwalane w wykazie, aby zdefiniować bazę danych, do której można wykonywać zapytania przy użyciu usługi Spark SQL.

Uwaga

Nie będziemy szczegółowo eksplorować tabel wykazu platformy Spark w tym module, ale warto poświęcić trochę czasu na wyróżnienie kilku kluczowych kwestii:

  • Pustą tabelę można utworzyć przy użyciu spark.catalog.createTable metody . Tabele to struktury metadanych, które przechowują swoje dane bazowe w lokalizacji magazynu skojarzonej z wykazem. Usunięcie tabeli powoduje również usunięcie danych bazowych.
  • Ramkę danych można zapisać jako tabelę przy użyciu jej saveAsTable metody.
  • Tabelę zewnętrzną można utworzyć przy użyciu spark.catalog.createExternalTable metody . Tabele zewnętrzne definiują metadane w wykazie, ale pobierają swoje dane bazowe z zewnętrznej lokalizacji magazynu; zazwyczaj folder w usłudze Data Lake. Usunięcie tabeli zewnętrznej nie powoduje usunięcia danych bazowych.

Wykonywanie zapytań dotyczących danych przy użyciu interfejsu API SQL platformy Spark

Interfejs API SQL platformy Spark można używać w kodzie napisanym w dowolnym języku do wykonywania zapytań dotyczących danych w wykazie. Na przykład poniższy kod PySpark używa zapytania SQL do zwracania danych z widoku produktów jako ramki danych.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Wyniki z przykładu kodu będą wyglądać podobnie do poniższej tabeli:

ProductID ProductName ListPrice
38 Górskie — 100 srebrnych, 38 3399.9900
52 Road-750 Black, 52 539.9900
... ... ...

Korzystanie z kodu SQL

W poprzednim przykładzie pokazano, jak używać interfejsu API SQL platformy Spark do osadzania wyrażeń SQL w kodzie platformy Spark. W notesie można również użyć %%sql funkcji magic do uruchomienia kodu SQL, który wysyła zapytania do obiektów w wykazie, w następujący sposób:

%%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

Przykład kodu SQL zwraca zestaw wyników, który jest automatycznie wyświetlany w notesie jako tabela, podobnie jak poniżej:

Kategoria ProductCount
Bib-Shorts 3
Stojaki rowerowe 1
Stojaki rowerowe 1
... ...