Samouczek: praca z ramkami danych PySpark w usłudze Azure Databricks

W tym artykule pokazano, jak ładować i przekształcać dane przy użyciu interfejsu API ramki danych platformy Apache Spark (PySpark) w usłudze Azure Databricks.

Zobacz też dokumentację interfejsu API platformy Apache Spark PySpark.

Co to jest ramka danych?

Ramka danych to dwuwymiarowa struktura danych z kolumnami potencjalnie różnych typów. Ramka danych może przypominać arkusz kalkulacyjny, tabelę SQL lub słownik obiektów serii. Ramki danych platformy Apache Spark zapewniają bogaty zestaw funkcji (wybieranie kolumn, filtrowanie, łączenie, agregowanie), które umożliwiają wydajne rozwiązywanie typowych problemów z analizą danych.

Ramki danych platformy Apache Spark to abstrakcja oparta na odpornych rozproszonych zestawach danych (RDD). Ramki danych Platformy Spark i platforma Spark SQL używają ujednoliconego aparatu planowania i optymalizacji, co pozwala uzyskać niemal identyczną wydajność we wszystkich obsługiwanych językach w usłudze Azure Databricks (Python, SQL, Scala i R).

Tworzenie ramki danych przy użyciu języka Python

Większość zapytań platformy Apache Spark zwraca ramkę danych. Obejmuje to odczytywanie z tabeli, ładowanie danych z plików i operacje, które przekształcają dane.

Możesz również utworzyć ramkę danych platformy Spark na podstawie listy lub ramki danych biblioteki pandas, na przykład w poniższym przykładzie:

import pandas as pd

data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]

pdf = pd.DataFrame(data, columns=["id", "name"])

df1 = spark.createDataFrame(pdf)
df2 = spark.createDataFrame(data, schema="id LONG, name STRING")

Odczytywanie tabeli do ramki danych

Usługa Azure Databricks domyślnie używa usługi Delta Lake dla wszystkich tabel. Tabele można łatwo załadować do ramek danych, na przykład w poniższym przykładzie:

spark.read.table("<catalog_name>.<schema_name>.<table_name>")

Ładowanie danych do ramki danych z plików

Dane można załadować z wielu obsługiwanych formatów plików. W poniższym przykładzie użyto zestawu danych dostępnego w katalogu dostępnego /databricks-datasets z większości obszarów roboczych. Zobacz temat Przykładowe zestawy danych.

df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)

Przypisywanie kroków przekształcania do ramki danych

Wyniki większości przekształceń platformy Spark zwracają ramkę danych. Te wyniki można przypisać z powrotem do zmiennej ramki danych, podobnie jak w przypadku używania obiektów CTE, widoków tymczasowych lub ramek danych w innych systemach.

Łączenie ramek danych ze sprzężeniami i łączeniem

Ramki danych używają standardowej semantyki SQL na potrzeby operacji sprzężenia. Sprzężenie zwraca połączone wyniki dwóch ramek danych na podstawie podanych warunków dopasowania i typu sprzężenia. Poniższy przykład to sprzężenie wewnętrzne, które jest ustawieniem domyślnym:

joined_df = df1.join(df2, how="inner", on="id")

Wiersze jednej ramki danych można dodać do innej przy użyciu operacji union, jak w poniższym przykładzie:

unioned_df = df1.union(df2)

Filtrowanie wierszy w ramce danych

Wiersze w ramce danych można filtrować przy użyciu polecenia .filter() lub .where(). Nie ma różnicy w wydajności ani składni, jak pokazano w poniższym przykładzie:

filtered_df = df.filter("id > 1")

filtered_df = df.where("id > 1")

Użyj filtrowania, aby wybrać podzbiór wierszy do zwrócenia lub zmodyfikowania w ramce danych.

Wybieranie kolumn z ramki danych

Możesz wybrać kolumny, przekazując jedną lub więcej nazw kolumn do .select()elementu , jak w poniższym przykładzie:

select_df = df.select("id", "name")

Zapytania wyboru i filtrowania można łączyć, aby ograniczyć zwracane wiersze i kolumny.

subset_df = df.filter("id > 1").select("name")

Wyświetlanie ramki danych

Aby wyświetlić te dane w formacie tabelarycznym, możesz użyć polecenia usługi Azure Databricks display() , jak w poniższym przykładzie:

display(df)

Platforma Spark używa terminu schema do odwoływania się do nazw i typów danych kolumn w ramce danych.

Uwaga

Usługa Azure Databricks używa również terminu schema do opisywania kolekcji tabel zarejestrowanych w wykazie.

Schemat można wydrukować przy użyciu .printSchema() metody , jak w poniższym przykładzie:

df.printSchema()

Zapisywanie ramki danych w tabeli

Usługa Azure Databricks domyślnie używa usługi Delta Lake dla wszystkich tabel. Zawartość ramki danych można zapisać w tabeli przy użyciu następującej składni:

df.write.saveAsTable("<table_name>")

Zapisywanie ramki danych w kolekcji plików

Większość aplikacji platformy Spark jest przeznaczona do pracy na dużych zestawach danych i działa w sposób rozproszony, a platforma Spark zapisuje katalog plików, a nie pojedynczy plik. Wiele systemów danych jest skonfigurowanych do odczytywania tych katalogów plików. Usługa Azure Databricks zaleca używanie tabel przez ścieżki plików dla większości aplikacji.

Poniższy przykład zapisuje katalog plików JSON:

df.write.format("json").save("/tmp/json_data")

Uruchamianie zapytań SQL w narzędziu PySpark

Ramki danych platformy Spark udostępniają wiele opcji łączenia języka SQL z językiem Python.

Metoda selectExpr() umożliwia określenie każdej kolumny jako zapytania SQL, na przykład w poniższym przykładzie:

display(df.selectExpr("id", "upper(name) as big_name"))

Możesz zaimportować expr() funkcję z pyspark.sql.functions , aby użyć składni SQL w dowolnym miejscu, w którym zostanie określona kolumna, jak w poniższym przykładzie:

from pyspark.sql.functions import expr

display(df.select("id", expr("lower(name) as little_name")))

Można również użyć spark.sql() polecenia , aby uruchamiać dowolne zapytania SQL w jądrze języka Python, jak w poniższym przykładzie:

query_df = spark.sql("SELECT * FROM <table_name>")

Ponieważ logika jest wykonywana w jądrze języka Python, a wszystkie zapytania SQL są przekazywane jako ciągi, możesz użyć formatowania języka Python, aby sparametryzować zapytania SQL, jak w poniższym przykładzie:

table_name = "my_table"

query_df = spark.sql(f"SELECT * FROM {table_name}")