Korzystanie z platformy Spark do pracy z plikami danych
Po skonfigurowaniu notesu i dołączeniu go do klastra możesz użyć platformy Spark do odczytywania i przetwarzania plików danych. Platforma Spark obsługuje szeroką gamę formatów— takich jak CSV, JSON, Parquet, ORC, Avro i Delta — a usługa Databricks udostępnia wbudowane łączniki umożliwiające dostęp do plików przechowywanych w obszarze roboczym, w usłudze Azure Data Lake lub Blob Storage lub w innych systemach zewnętrznych.
Przepływ pracy zwykle wykonuje trzy kroki:
Odczytaj plik do ramki danych platformy Spark przy użyciu pliku spark.read z poprawnym formatem i ścieżką. Podczas odczytywania nieprzetworzonych formatów tekstu, takich jak CSV lub JSON, platforma Spark może wywnioskować schemat (nazwy kolumn i typy danych), ale czasami jest to powolne lub zawodne. Lepszym rozwiązaniem w środowisku produkcyjnym jest jawne zdefiniowanie schematu, dzięki czemu dane są ładowane spójnie i wydajnie.
Eksplorowanie i przekształcanie ramki danych przy użyciu operacji SQL lub DataFrame (na przykład filtrowanie wierszy, wybieranie kolumn, agregowanie wartości).
Zapisz wyniki z powrotem do magazynu w wybranym formacie.
Praca z plikami na platformie Spark została zaprojektowana tak, aby była spójna w małych i dużych zestawach danych. Ten sam kod używany do testowania małego pliku CSV będzie również działać na znacznie większych zestawach danych, ponieważ platforma Spark dystrybuuje pracę w klastrze. Ułatwia to skalowanie w górę od szybkiej eksploracji do bardziej złożonego przetwarzania danych.
Ładowanie danych do ramki danych
Przyjrzyjmy się hipotetycznym przykładom, aby zobaczyć, jak można użyć ramki danych do pracy z danymi. Załóżmy, że masz następujące dane w pliku tekstowym rozdzielonym przecinkami o nazwie products.csv w folderze danych w magazynie systemu plików usługi Databricks (DBFS):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
W notesie platformy Spark możesz użyć następującego kodu PySpark, aby załadować dane do ramki danych i wyświetlić pierwsze 10 wierszy:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Linia %pyspark na początku jest nazywana magią i informuje Spark, że język używany w tej komórce to PySpark. Oto odpowiedni kod Scala dla przykładu danych produktów:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
Magia %spark służy do określania Scala.
Napiwek
Możesz również wybrać język, którego chcesz użyć dla każdej komórki w interfejsie notesu.
Oba pokazane wcześniej przykłady spowodują wygenerowanie danych wyjściowych w następujący sposób:
| IdentyfikatorProduktu | ProductName | Kategoria | Cena Katalogowa |
|---|---|---|---|
| 771 | Górskie — 100 srebrnych, 38 | Rowery górskie | 3399.9900 |
| 772 | Górskie — 100 srebrnych, 42 | Rowery górskie | 3399.9900 |
| 773 | Górskie — 100 srebrnych, 44 | Rowery górskie | 3399.9900 |
| ...\ | ...\ | ...\ | ...\ |
Określanie schematu ramki danych
W poprzednim przykładzie pierwszy wiersz pliku CSV zawierał nazwy kolumn, a platforma Spark mogła wywnioskować typ danych każdej kolumny z danych, które zawiera. Można również określić jawny schemat dla danych, co jest przydatne, gdy nazwy kolumn nie są uwzględnione w pliku danych, podobnie jak w tym przykładzie CSV:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
W poniższym przykładzie PySpark pokazano, jak określić schemat ramki danych do załadowania z pliku o nazwie product-data.csv w tym formacie:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Wyniki będą ponownie podobne do następujących:
| IdentyfikatorProduktu | ProductName | Kategoria | Cena Katalogowa |
|---|---|---|---|
| 771 | Górskie — 100 srebrnych, 38 | Rowery górskie | 3399.9900 |
| 772 | Górskie — 100 srebrnych, 42 | Rowery górskie | 3399.9900 |
| 773 | Górskie — 100 srebrnych, 44 | Rowery górskie | 3399.9900 |
| ...\ | ...\ | ...\ | ...\ |
Filtrowanie i grupowanie ramek danych
Metody klasy Ramka danych umożliwiają filtrowanie, sortowanie, grupowanie i manipulowanie danymi, które zawiera. Na przykład poniższy przykład kodu używa select metody w celu pobrania kolumn ProductName i ListPrice z ramki danych df zawierającej dane produktu w poprzednim przykładzie:
pricelist_df = df.select("ProductID", "ListPrice")
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
| IdentyfikatorProduktu | Cena Katalogowa |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ...\ | ...\ |
Podobnie jak większość metod manipulowania danymi, select zwraca nowy obiekt typu ramka danych.
Napiwek
Wybranie podzbioru kolumn z ramki danych jest wspólną operacją, którą można również osiągnąć przy użyciu następującej krótszej składni:
pricelist_df = df["ProductID", "ListPrice"]
Metody "łańcuchowe" można łączyć w celu wykonania serii manipulacji, które skutkują przekształconą ramką danych. Na przykład ten przykładowy kod łączy metody select i where w celu utworzenia nowej ramki danych zawierającej kolumny ProductName i ListPrice dla produktów z kategorią Mountain Bikes lub Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
| ProductName | Cena Katalogowa |
|---|---|
| Górskie — 100 srebrnych, 38 | 3399.9900 |
| Road-750, 52 | 539.9900 |
| ...\ | ...\ |
Aby grupować i agregować dane, możesz użyć groupby metody i funkcji agregujących. Na przykład następujący kod PySpark zlicza liczbę produktów dla każdej kategorii:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Wyniki z tego przykładu kodu będą wyglądać mniej więcej tak:
| Kategoria | liczyć |
|---|---|
| Zestawy słuchawkowe | 3 |
| Koła | 14 |
| Rowery górskie | 32 |
| ...\ | ...\ |
Uwaga
Ramki danych platformy Spark są deklaratywne i niezmienne. Każde przekształcenie (na przykład select, filterlub groupBy) tworzy nową ramkę danych reprezentującą żądane elementy, a nie sposób jej uruchamiania. Dzięki temu kod może być wielokrotnego użytku, optymalizowalny i wolny od skutków ubocznych. Jednak żadne z tych transformacji nie są wykonywane do momentu wyzwolenia akcji (na przykład display, collect, write), wówczas platforma Spark uruchamia pełny zoptymalizowany plan.
Używanie wyrażeń SQL na platformie Spark
Interfejs API ramki danych jest częścią biblioteki Spark o nazwie Spark SQL, która umożliwia analitykom danych używanie wyrażeń SQL do wykonywania zapytań o dane i manipulowania nimi.
Tworzenie obiektów bazy danych w wykazie platformy Spark
Wykaz platformy Spark to magazyn metadanych dla obiektów danych relacyjnych, takich jak widoki i tabele. Środowisko uruchomieniowe platformy Spark może używać wykazu do bezproblemowego integrowania kodu napisanego w dowolnym języku obsługiwanym przez platformę Spark z wyrażeniami SQL, które mogą być bardziej naturalne dla niektórych analityków danych lub deweloperów.
Jednym z najprostszych sposobów udostępniania danych w ramce danych na potrzeby wykonywania zapytań w wykazie platformy Spark jest utworzenie widoku tymczasowego, jak pokazano w poniższym przykładzie kodu:
df.createOrReplaceTempView("products")
Widok jest tymczasowy, co oznacza, że jest on automatycznie usuwany na końcu bieżącej sesji. Możesz również utworzyć tabele , które są utrwalane w wykazie, aby zdefiniować bazę danych, do której można wykonywać zapytania przy użyciu usługi Spark SQL.
Uwaga
Nie będziemy szczegółowo eksplorować tabel wykazu platformy Spark w tym module, ale warto poświęcić trochę czasu na wyróżnienie kilku kluczowych kwestii:
- Pustą tabelę można utworzyć przy użyciu
spark.catalog.createTablemetody . Tabele to struktury metadanych, które przechowują swoje dane bazowe w lokalizacji magazynu skojarzonej z wykazem. Usunięcie tabeli powoduje również usunięcie danych bazowych. - Ramkę danych można zapisać jako tabelę przy użyciu jej
saveAsTablemetody. - Tabelę zewnętrzną można utworzyć przy użyciu
spark.catalog.createExternalTablemetody . Tabele zewnętrzne definiują metadane w wykazie, ale pobierają swoje dane bazowe z zewnętrznej lokalizacji magazynu; zazwyczaj folder w usłudze Data Lake. Usunięcie tabeli zewnętrznej nie powoduje usunięcia danych bazowych.
Wykonywanie zapytań dotyczących danych przy użyciu interfejsu API SQL platformy Spark
Interfejs API SQL platformy Spark można używać w kodzie napisanym w dowolnym języku do wykonywania zapytań dotyczących danych w wykazie. Na przykład poniższy kod PySpark używa zapytania SQL do zwracania danych z widoku produktów jako ramki danych.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Wyniki z przykładu kodu będą wyglądać podobnie do poniższej tabeli:
| ProductName | Cena Katalogowa |
|---|---|
| Górskie — 100 srebrnych, 38 | 3399.9900 |
| Road-750, 52 | 539.9900 |
| ...\ | ...\ |
Korzystanie z kodu SQL
W poprzednim przykładzie pokazano, jak używać interfejsu API SQL platformy Spark do osadzania wyrażeń SQL w kodzie platformy Spark. W notesie można również użyć %sql funkcji magic do uruchomienia kodu SQL, który wysyła zapytania do obiektów w wykazie, w następujący sposób:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Przykład kodu SQL zwraca zestaw wyników, który jest automatycznie wyświetlany w notesie jako tabela, podobnie jak poniższy:
| Kategoria | LiczbaProduktów |
|---|---|
| Bib-Shorts | 3 |
| Stojaki rowerowe | 1 |
| Stojaki rowerowe | 1 |
| ...\ | ...\ |