Spark veri çerçevesindeki verilerle çalışma

Tamamlandı

Spark, yerel olarak dayanıklı dağıtılmış veri kümesi (RDD) olarak adlandırılan bir veri yapısı kullanır; ancak doğrudan RDD'lerle çalışan kod yazabilirsiniz. Spark'ta yapılandırılmış verilerle çalışmak için en yaygın kullanılan veri yapısı, Spark SQL kitaplığının bir parçası olarak sağlanan veri çerçevesidir. Spark'taki veri çerçeveleri yaygın Pandas Python kitaplığındakilere benzer, ancak Spark'ın dağıtılmış işleme ortamında çalışacak şekilde iyileştirilmiştir.

Dekont

Spark SQL, Dataframe API'sine ek olarak Java ve Scala'da desteklenen güçlü türde bir Veri Kümesi API'sini de sağlar. Bu modülde Dataframe API'sine odaklanacağız.

Verileri bir veri çerçevesine yükleme

Şimdi verilerle çalışmak için veri çerçevesini nasıl kullanabileceğinizi görmek için varsayımsal bir örneği inceleyelim. Lakehouse'unuzun Dosyalar/veri klasöründe products.csv adlı virgülle ayrılmış bir metin dosyasında aşağıdaki verilere sahip olduğunuzu varsayalım:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Şema çıkarılıyor

Spark not defterinde, dosya verilerini bir veri çerçevesine yüklemek ve ilk 10 satırı görüntülemek için aşağıdaki PySpark kodunu kullanabilirsiniz:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

%%pyspark Başlangıçtaki satıra büyü adı verilir ve Spark'a bu hücrede kullanılan dilin PySpark olduğunu söyler. Not Defteri arabiriminin araç çubuğunda varsayılan olarak kullanmak istediğiniz dili seçebilir ve ardından belirli bir hücre için bu seçimi geçersiz kılmak için bir sihir kullanabilirsiniz. Örneğin, ürün verileri örneği için eşdeğer Scala kodu aşağıda verilmişti:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

Büyü %%spark Scala'nın belirtilmesi için kullanılır.

Bu kod örneklerinin her ikisi de şuna benzer bir çıkış oluşturur:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Gümüş, 38 Dağ Bisikletleri 3399.9900
772 Mountain-100 Gümüş, 42 Dağ Bisikletleri 3399.9900
773 Mountain-100 Gümüş, 44 Dağ Bisikletleri 3399.9900
... ... ... ...

Açık şema belirtme

Önceki örnekte CSV dosyasının ilk satırı sütun adlarını içeriyordu ve Spark, içerdiği verilerden her sütunun veri türünü çıkarabiliyordu. Ayrıca veriler için açık bir şema belirtebilirsiniz. Bu şema, sütun adları veri dosyasına eklenmediğinde yararlı olabilir, örneğin şu CSV örneği:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Aşağıdaki PySpark örneği, product-data.csv adlı bir dosyadan yüklenecek veri çerçevesi için bu biçimde bir şemanın nasıl belirtileceğini gösterir:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Sonuçlar bir kez daha şuna benzer olacaktır:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Gümüş, 38 Dağ Bisikletleri 3399.9900
772 Mountain-100 Gümüş, 42 Dağ Bisikletleri 3399.9900
773 Mountain-100 Gümüş, 44 Dağ Bisikletleri 3399.9900
... ... ... ...

Bahşiş

Açık bir şema belirtmek performansı da artırır!

Veri çerçevelerini filtreleme ve gruplandırma

Dataframe sınıfının yöntemlerini kullanarak içerdiği verileri filtreleyebilir, sıralayabilir, gruplandırabilir ve başka şekilde işleyebilirsiniz. Örneğin, aşağıdaki kod örneği önceki örnekteki ürün verilerini içeren df veri çerçevesinden ProductID ve ListPrice sütunlarını almak için select yöntemini kullanır:

pricelist_df = df.select("ProductID", "ListPrice")

Bu kod örneğinden alınan sonuçlar şuna benzer olacaktır:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Çoğu veri işleme yönteminde ortak olarak, select yeni bir veri çerçevesi nesnesi döndürür.

Bahşiş

Bir veri çerçevesinden sütunların alt kümesini seçmek yaygın bir işlemdir ve bu işlem aşağıdaki daha kısa söz dizimi kullanılarak da yapılabilir:

pricelist_df = df["ProductID", "ListPrice"]

Dönüştürülen bir veri çerçevesiyle sonuçlanan bir dizi işleme gerçekleştirmek için yöntemleri birlikte "zincirleyebilirsiniz". Örneğin, bu örnek kod, Dağ Bisikletleri veya Yol Bisikletleri kategorisine sahip ürünler için ProductName ve ListPrice sütunlarını içeren yeni bir veri çerçevesi oluşturmak için select ve where yöntemlerini zincirler:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Bu kod örneğinden alınan sonuçlar şuna benzer olacaktır:

ProductName Kategori ListPrice
Mountain-100 Gümüş, 38 Dağ Bisikletleri 3399.9900
Road-750 Siyah, 52 Yol Bisikletleri 539.9900
... ... ...

Verileri gruplandırmak ve toplamak için groupBy yöntemini ve toplama işlevlerini kullanabilirsiniz. Örneğin, aşağıdaki PySpark kodu her kategori için ürün sayısını sayar:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Bu kod örneğinden alınan sonuçlar şuna benzer olacaktır:

Kategori count
Kulaklık 3
Tekerlekler 14
Dağ Bisikletleri 32
... ...

Veri çerçeveyi kaydetme

Spark'ı genellikle ham verileri dönüştürmek ve daha fazla analiz veya aşağı akış işleme için sonuçları kaydetmek için kullanmak istersiniz. Aşağıdaki kod örneği dataFrame'i veri gölündeki bir parquet dosyasına kaydeder ve aynı ada sahip mevcut dosyaları değiştirir.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Dekont

Parquet biçimi genellikle analiz deposuna daha fazla analiz veya veri alımı için kullanacağınız veri dosyaları için tercih edilir. Parquet, çoğu büyük ölçekli veri analizi sistemi tarafından desteklenen çok verimli bir biçimdir. Aslında bazen veri dönüştürme gereksiniminiz verileri başka bir biçimden (CSV gibi) Parquet'e dönüştürmek olabilir!

Çıkış dosyasını bölümleme

Bölümleme, Spark'ın çalışan düğümleri genelinde performansı en üst düzeye çıkarmasını sağlayan bir iyileştirme tekniğidir. Gereksiz disk GÇ'sini ortadan kaldırarak sorgulardaki veriler filtrelenirken daha fazla performans kazancı elde edilebilir.

Bir veri çerçevesini bölümlenmiş dosya kümesi olarak kaydetmek için, verileri yazarken partitionBy yöntemini kullanın. Aşağıdaki örnek, bikes_df veri çerçevesini (dağ bisikletleri ve yol bisikletleri kategorilerinin ürün verilerini içerir) kaydeder ve verileri kategoriye göre bölümlere ayırır:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Bir veri çerçevesi bölümlenirken oluşturulan klasör adları, sütun=değer biçiminde bölümleme sütun adını ve değerini içerir, bu nedenle kod örneği aşağıdaki alt klasörleri içeren bike_data adlı bir klasör oluşturur:

  • Category=Dağ Bisikletleri
  • Category=Yol Bisikletleri

Her alt klasör, uygun kategori için ürün verilerini içeren bir veya daha fazla parquet dosyası içerir.

Dekont

Verileri birden çok sütuna göre bölümleyebilirsiniz ve bu da her bölümleme anahtarı için bir klasör hiyerarşisine neden olur. Örneğin, satış siparişi verilerini yıla ve aya göre bölümleyebilirsiniz; böylece klasör hiyerarşisi her yıl değeri için bir klasör içerir ve bu da her ay değeri için bir alt klasör içerir.

Bölümlenmiş verileri yükleme

Bölümlenmiş verileri bir veri çerçevesine okurken, bölümlenmiş alanlar için açık değerler veya joker karakterler belirterek hiyerarşideki herhangi bir klasörden veri yükleyebilirsiniz. Aşağıdaki örnek, Yol Bisikletleri kategorisindeki ürünlerin verilerini yükler:

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Dekont

Dosya yolunda belirtilen bölümleme sütunları, sonuçta elde edilen veri çerçevesinde atlanır. Örnek sorgu tarafından üretilen sonuçlar kategori sütunu içermez; tüm satırların kategorisi Yol Bisikletleri olur.