Veri dosyalarıyla çalışmak için Spark kullanma

Tamamlandı

Spark kullanmanın avantajlarından biri, çeşitli programlama dillerinde kod yazıp çalıştırarak sahip olduğunuz programlama becerilerini kullanmanıza ve belirli bir görev için en uygun dili kullanmanıza olanak sağlamasıdır. Yeni bir Azure Databricks Spark not defterindeki varsayılan dil, veri işleme ve görselleştirmeye yönelik güçlü desteği nedeniyle veri bilimcileri ve analistler tarafından yaygın olarak kullanılan Spark için iyileştirilmiş bir Python sürümü olan PySpark'tır . Ayrıca Scala (etkileşimli olarak kullanılabilen Java türetilmiş bir dil) ve SQL (ilişkisel veri yapılarıyla çalışmak için Spark SQL kitaplığına dahil edilen yaygın olarak kullanılan SQL dilinin bir çeşidi) gibi dilleri de kullanabilirsiniz. Yazılım mühendisleri, Java gibi çerçeveleri kullanarak Spark üzerinde çalışan derlenmiş çözümler de oluşturabilir.

Veri çerçeveleriyle verileri keşfetme

Spark, yerel olarak dayanıklı dağıtılmış veri kümesi (RDD) olarak adlandırılan bir veri yapısı kullanır; ancak doğrudan RDD'lerle çalışan kod yazabilirsiniz. Spark'ta yapılandırılmış verilerle çalışmak için en yaygın kullanılan veri yapısı, Spark SQL kitaplığının bir parçası olarak sağlanan veri çerçevesidir. Spark'taki veri çerçeveleri yaygın Pandas Python kitaplığındakilere benzer, ancak Spark'ın dağıtılmış işleme ortamında çalışacak şekilde iyileştirilmiştir.

Not

Spark SQL, Dataframe API'sine ek olarak Java ve Scala'da desteklenen güçlü türde bir Veri Kümesi API'sini de sağlar. Bu modülde Dataframe API'sine odaklanacağız.

Verileri bir veri çerçevesine yükleme

Şimdi verilerle çalışmak için veri çerçevesini nasıl kullanabileceğinizi görmek için varsayımsal bir örneği inceleyelim. Databricks Dosya Sistemi (DBFS) depolama alanınızdaki veri klasöründeki products.csv adlı virgülle ayrılmış bir metin dosyasında aşağıdaki verilere sahip olduğunuzu varsayalım:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Spark not defterinde, verileri bir veri çerçevesine yüklemek ve ilk 10 satırı görüntülemek için aşağıdaki PySpark kodunu kullanabilirsiniz:

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

%pyspark Başlangıçtaki satıra büyü adı verilir ve Spark'a bu hücrede kullanılan dilin PySpark olduğunu söyler. Ürün verileri örneği için eşdeğer Scala kodu aşağıda verilmişti:

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

Büyü %spark Scala'nın belirtilmesi için kullanılır.

İpucu

Not Defteri arabirimindeki her hücre için kullanmak istediğiniz dili de seçebilirsiniz.

Daha önce gösterilen örneklerin her ikisi de aşağıdaki gibi bir çıkış oluşturur:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Gümüş, 38 Dağ Bisikletleri 3399.9900
772 Mountain-100 Gümüş, 42 Dağ Bisikletleri 3399.9900
773 Mountain-100 Gümüş, 44 Dağ Bisikletleri 3399.9900
... ... ... ...

Veri çerçevesi şeması belirtme

Önceki örnekte CSV dosyasının ilk satırı sütun adlarını içeriyordu ve Spark, içerdiği verilerden her sütunun veri türünü çıkarabiliyordu. Ayrıca veriler için açık bir şema belirtebilirsiniz. Bu şema, sütun adları veri dosyasına eklenmediğinde yararlı olabilir, örneğin şu CSV örneği:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Aşağıdaki PySpark örneğinde, product-data.csv adlı bir dosyadan yüklenecek veri çerçevesi için bu biçimde bir şemanın nasıl belirtileceği gösterilmektedir:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Sonuçlar bir kez daha şuna benzer olacaktır:

ProductID ProductName Kategori ListPrice
771 Mountain-100 Gümüş, 38 Dağ Bisikletleri 3399.9900
772 Mountain-100 Gümüş, 42 Dağ Bisikletleri 3399.9900
773 Mountain-100 Gümüş, 44 Dağ Bisikletleri 3399.9900
... ... ... ...

Veri çerçevelerini filtreleme ve gruplandırma

Dataframe sınıfının yöntemlerini kullanarak içerdiği verileri filtreleyebilir, sıralayabilir, gruplandırabilir ve başka şekilde işleyebilirsiniz. Örneğin, aşağıdaki kod örneği önceki örnekteki ürün verilerini içeren df veri çerçevesinden ProductName ve ListPrice sütunlarını almak için select yöntemini kullanır:

pricelist_df = df.select("ProductID", "ListPrice")

Bu kod örneğinden alınan sonuçlar şuna benzer olacaktır:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Çoğu veri işleme yönteminde ortak olarak, select yeni bir veri çerçevesi nesnesi döndürür.

İpucu

Bir veri çerçevesinden sütunların alt kümesini seçmek yaygın bir işlemdir ve bu işlem aşağıdaki daha kısa söz dizimi kullanılarak da yapılabilir:

pricelist_df = df["ProductID", "ListPrice"]

Dönüştürülen bir veri çerçevesiyle sonuçlanan bir dizi işleme gerçekleştirmek için yöntemleri birlikte "zincirleyebilirsiniz". Örneğin, bu örnek kod, Dağ Bisikletleri veya Yol Bisikletleri kategorisine sahip ürünler için ProductName ve ListPrice sütunlarını içeren yeni bir veri çerçevesi oluşturmak için select ve where yöntemlerini zincirler:

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Bu kod örneğinden alınan sonuçlar şuna benzer olacaktır:

ProductName ListPrice
Mountain-100 Gümüş, 38 3399.9900
Road-750 Siyah, 52 539.9900
... ...

Verileri gruplandırmak ve toplamak için groupBy yöntemini ve toplama işlevlerini kullanabilirsiniz. Örneğin, aşağıdaki PySpark kodu her kategori için ürün sayısını sayar:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Bu kod örneğinden alınan sonuçlar şuna benzer olacaktır:

Kategori count
Kulaklık 3
Tekerlekler 14
Dağ Bisikletleri 32
... ...

Spark'ta SQL ifadelerini kullanma

Veri Çerçevesi API'si, veri analistlerinin verileri sorgulamak ve işlemek için SQL ifadelerini kullanmasına olanak tanıyan Spark SQL adlı spark kitaplığının bir parçasıdır.

Spark kataloğunda veritabanı nesneleri oluşturma

Spark kataloğu, görünümler ve tablolar gibi ilişkisel veri nesneleri için bir meta veri deposudur. Spark çalışma zamanı, spark destekli herhangi bir dilde yazılmış kodu bazı veri analistleri veya geliştiriciler için daha doğal olabilecek SQL ifadeleriyle sorunsuz bir şekilde tümleştirmek için kataloğu kullanabilir.

Aşağıdaki kod örneğinde gösterildiği gibi, bir veri çerçevesindeki verileri Spark kataloğunda sorgulama için kullanılabilir hale getirmenin en basit yollarından biri geçici bir görünüm oluşturmaktır:

df.createOrReplaceTempView("products")

Görünüm geçicidir, yani geçerli oturumun sonunda otomatik olarak silinir. Spark SQL kullanılarak sorgulanabilen bir veritabanı tanımlamak için katalogda kalıcı olan tablolar da oluşturabilirsiniz.

Not

Bu modülde Spark katalog tablolarını derinlemesine incelemeyeceğiz, ancak birkaç önemli noktayı vurgulamak için zaman ayırabilirsiniz:

  • yöntemini kullanarak spark.catalog.createTable boş bir tablo oluşturabilirsiniz. Tablolar, temel verilerini katalogla ilişkili depolama konumunda depolayan meta veri yapılarıdır. Bir tablonun silinmesi, temel alınan verileri de siler.
  • Yöntemini kullanarak bir veri çerçevesini saveAsTable tablo olarak kaydedebilirsiniz.
  • yöntemini kullanarak spark.catalog.createExternalTable bir dış tablo oluşturabilirsiniz. Dış tablolar katalogda meta verileri tanımlar ancak temel verilerini bir dış depolama konumundan alır; genellikle bir veri gölündeki bir klasör. Dış tablo silindiğinde temel alınan veriler silinmez.

Verileri sorgulamak için Spark SQL API'sini kullanma

Spark SQL API'sini herhangi bir dilde yazılmış kodda kullanarak katalogdaki verileri sorgulayabilirsiniz. Örneğin, aşağıdaki PySpark kodu, ürünler görünümünden veri çerçevesi olarak veri döndürmek için bir SQL sorgusu kullanır.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Kod örneğinden elde edilen sonuçlar aşağıdaki tabloya benzer olacaktır:

ProductName ListPrice
Mountain-100 Gümüş, 38 3399.9900
Road-750 Siyah, 52 539.9900
... ...

SQL kodunu kullanma

Önceki örnekte, Spark koduna SQL ifadelerini eklemek için Spark SQL API'sinin nasıl kullanılacağı gösterilmiştir. Not defterinde, aşağıdaki gibi katalogdaki nesneleri sorgulayan SQL kodunu çalıştırmak için de büyü kullanabilirsiniz %sql :

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

SQL kodu örneği, not defterinde aşağıdaki gibi otomatik olarak tablo olarak görüntülenen bir sonuç kümesi döndürür:

Kategori ProductCount
Bib-Şort 3
Bisiklet Rafları 1
Bisiklet Standları 1
... ...