PySpark temel bilgileri

Bu makalede, PySpark kullanımını göstermek için basit örnekler gösterilmektedir. Temel Apache Spark kavramlarını anladığınızı ve işleme bağlı bir Azure Databricks not defterinde komut çalıştırdığınızı varsayar. Örnek verileri kullanarak DataFrame'ler oluşturur, bu verilerde satır ve sütun işlemleri dahil temel dönüştürmeler gerçekleştirir, birden çok DataFrame'i birleştirir ve bu verileri toplar, bu verileri görselleştirir ve sonra bir tabloya veya dosyaya kaydedersiniz.

Veri yükle

Bu makaledeki bazı örneklerde Databricks tarafından sağlanan örnek veriler kullanılarak DataFrame'lerin veri yükleme, dönüştürme ve kaydetme işlemleri gösterilmektedir. Henüz Databricks'te olmayan kendi verilerinizi kullanmak istiyorsanız, önce bu verileri karşıya yükleyebilir ve bu veriden bir DataFrame oluşturabilirsiniz. Bkz. Dosya yükleme kullanarak tablo oluşturma veya değiştirme ve Unity Kataloğu birimlerinde dosyalarla çalışma.

Databricks örnek verileri hakkında

Databricks, katalogda samples ve dizinde /databricks-datasets örnek veriler sağlar.

  • Katalogdaki samples örnek verilere erişmek için biçimini samples.<schema-name>.<table-name>kullanın. Bu makalede, şemadaki samples.tpch tablolar kullanılır ve bunlar kurgusal bir işletmeden alınan verileri içerir. Tabloda customer müşteriler hakkında bilgiler ve orders bu müşteriler tarafından verilen siparişler hakkında bilgiler yer alır.
  • dbutils.fs.ls verilerini /databricks-datasets içinde keşfetmek için kullanın. Dosya yollarını kullanarak bu konumdaki verileri sorgulamak için Spark SQL veya DataFrames kullanın. Databricks tarafından sağlanan örnek veriler hakkında daha fazla bilgi edinmek için bkz. Örnek veri kümeleri.

Veri türlerini içeri aktarma

Birçok PySpark işlemi, SQL işlevlerini kullanmanızı veya yerel Spark türleriyle etkileşim kurmanızı gerektirir. Yalnızca ihtiyacınız olan işlevleri ve türleri doğrudan içeri aktarın veya Python yerleşik işlevlerini geçersiz kılmamak için ortak bir diğer ad kullanarak bu modülleri içeri aktarın.

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F

Veri türlerinin kapsamlı bir listesi için bkz. PySpark Veri Türleri.

PySpark SQL işlevlerinin kapsamlı listesi için bkz. PySpark İşlevleri.

DataFrame oluşturma

DataFrame oluşturmanın birkaç yolu vardır. Genellikle tablo veya dosya koleksiyonu gibi bir veri kaynağında DataFrame tanımlarsınız. Ardından Apache Spark temel kavramları bölümünde açıklandığı gibi, displaydönüştürmeleri yürütmek için bir eylem kullanarak süreci başlatın. display yöntemi DataFrame'ler üretir.

Belirtilen değerlerle DataFrame oluşturma

Belirtilen değerlere sahip bir DataFrame oluşturmak için, satırların bir demetler listesi olarak ifade edildiği createDataFrame yöntemini kullanın.

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

Çıkışta df_children sütunlarının veri türlerinin otomatik olarak belirlendiğini fark edin. Alternatif olarak, bir şema ekleyerek türleri belirtebilirsiniz. Şemalar, adı, veri türünü ve null değer içerip içermediklerini belirten boole bayrağından oluşan StructType kullanılarak tanımlanırStructFields. veri türlerini pyspark.sql.types uygulamasından içeri aktarmanız gerekir.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Unity Kataloğu'ndaki bir tablodan DataFrame oluşturma

Unity Kataloğu'ndaki bir tablodan DataFrame oluşturmak için, tabloyu table biçimini kullanarak tanımlayan <catalog-name>.<schema-name>.<table-name> yöntemini kullanın. Tablonuza gitmek için Katalog Gezgini'ni kullanmak için sol gezinti çubuğunda Katalog'a tıklayın. Üzerine tıklayın, ardından tablo yolunu not defterine eklemek için Tablo Yolunu Kopyala'yı seçin.

Aşağıdaki örnek, tablosunu samples.tpch.customeryükler, ancak alternatif olarak kendi tablonuzun yolunu sağlayabilirsiniz.

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

Yüklenen bir dosyadan bir DataFrame oluştur.

Unity Kataloğu birimlerine yüklediğiniz bir dosyadan DataFrame oluşturmak için özelliğini kullanın read . Bu yöntem, uygun biçimi okumak için kullanabileceğiniz bir DataFrameReaderdöndürür. Soldaki küçük kenar çubuğundaki katalog seçeneğine tıklayın ve dosyanızı bulmak için katalog tarayıcısını kullanın. Seçin ve ardından Birim dosya yolunu kopyala'ya tıklayın.

Aşağıdaki örnek bir *.csv dosyadan okunur, ancak DataFrameReader dosyaları başka birçok biçimde karşıya yüklemeyi destekler. Bkz. DataFrameReader yöntemleri.

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Unity Kataloğu birimleri hakkında daha fazla bilgi için bkz . Unity Kataloğu birimleri nedir?.

JSON yanıtından DataFrame oluşturma

REST API tarafından döndürülen bir JSON yanıt yükünden DataFrame oluşturmak için Python paketini kullanarak requests yanıtı sorgulayıp ayrıştırın. Kullanmak için paketi içeri aktarmanız gerekir. Bu örnek, Amerika Birleşik Devletleri Gıda ve İlaç Dairesi'nin ilaç uygulama veritabanındaki verileri kullanır.

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Databricks'te JSON ve diğer yarı yapılandırılmış verilerle çalışma hakkında bilgi için bkz . Yarı yapılandırılmış verileri modelleme.

JSON alanı veya nesnesi seçme

Dönüştürülen JSON'dan belirli bir alanı veya nesneyi seçmek için [] gösterimini kullanın. Örneğin, bir ürün dizisi olan products alanını seçmek için:

display(df_drugs.select(df_drugs["products"]))

Ayrıca, birden çok alanda çapraz geçiş yapmak için yöntem çağrılarını birbirine zincirleyebilirsiniz. Örneğin, bir ilaç uygulamasındaki ilk ürünün marka adını çıkarmak için:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

Bir dosyadan DataFrame oluşturma

Bir dosyadan DataFrame oluşturmayı göstermek için bu örnek dizindeki /databricks-datasets CSV verilerini yükler.

Örnek veri kümelerine gitmek için Databricks Utilties dosya sistemi komutlarını kullanabilirsiniz. Aşağıdaki örnek, dbutils içindeki mevcut olan veri kümelerini listelemek için /databricks-datasets kullanır.

display(dbutils.fs.ls('/databricks-datasets'))

Alternatif olarak, %fs kullanarak aşağıdaki örnekte gösterildiği gibi Databricks CLI dosya sistemi komutlarına erişebilirsiniz.

%fs ls '/databricks-datasets'

Bir dosyadan veya dosya dizininden DataFrame oluşturmak için yönteminde load yolu belirtin:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

DataFrame'lerle verileri dönüştürme

DataFrame'ler, verileri sıralamak, filtrelemek ve toplamak için yerleşik yöntemleri kullanarak verileri dönüştürmeyi kolaylaştırır. Birçok dönüştürme DataFrame'lerde yöntem olarak belirtilmez, bunun yerine pakette pyspark.sql.functions sağlanır. Bkz. Databricks PySpark SQL İşlevleri.

Sütun işlemleri

Spark birçok temel sütun işlemi sağlar:

Tip

DataFrame'deki tüm sütunların çıktısını almak için kullanın, columnsörneğin df_customer.columns.

Sütunları seç

select ve col kullanarak belirli sütunları seçebilirsiniz. col işlevi alt modüldedirpyspark.sql.functions.

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

Dize olarak tanımlanan bir ifadeyi kullanan bir sütuna expr ile başvurabilirsiniz.

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

SQL ifadelerini kabul eden öğesini de kullanabilirsiniz selectExpr:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

Metin sabiti kullanarak sütunları seçmek için aşağıdakileri yapın:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

Belirli bir DataFrame'den bir sütunu açıkça seçmek için işlecini [] veya işlecini . kullanabilirsiniz. (İşleç . , tamsayı ile başlayan sütunları veya boşluk ya da özel karakter içeren sütunları seçmek için kullanılamaz.) Bu, özellikle bazı sütunların aynı ada sahip olduğu DataFrame'lere katılırken yararlı olabilir.

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

Sütunlar oluştur

Yeni bir sütun oluşturmak için yöntemini kullanın withColumn . Aşağıdaki örnek, müşteri hesabı bakiyesinin c_acctbal değerini aşıp aşmadığına 1000bağlı olarak boole değeri içeren yeni bir sütun oluşturur:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

Sütunları yeniden adlandırma

Bir sütunu yeniden adlandırmak için mevcut ve yeni sütun adlarını kabul eden yöntemini kullanın withColumnRenamed :

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

Yöntemi alias özellikle sütunlarınızı toplamaların bir parçası olarak yeniden adlandırmak istediğinizde yararlıdır:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

Sütun türlerini dönüştürme

Bazı durumlarda DataFrame'inizdeki bir veya daha fazla sütunun veri türünü değiştirmek isteyebilirsiniz. Bunu yapmak için yöntemini kullanarak cast sütun veri türleri arasında dönüştürme yapın. Aşağıdaki örnek, bir sütunun tamsayıdan dize türüne dönüştürülmesini sağlamak için col yöntemini nasıl kullanabileceğinizi göstermektedir.

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

Sütunları kaldır

Sütunları kaldırmak için, seçme sırasında sütunları atlayabilir veya select(*) except yöntemini kullanabilirsiniz drop :

df_customer_flag_renamed.drop("balance_flag_renamed")

Aynı anda birden çok sütun da bırakabilirsiniz:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

Satır işlemleri

Spark birçok temel satır işlemi sağlar:

Satırları filtreleme

Bazı satırları döndürmek için bir DataFrame'de filter veya where yöntemini kullanarak satırları filtreleyin. Filtre uygulanacak bir sütunu tanımlamak için, col yöntemini veya bir sütun değerlendiren ifadeyi kullanın.

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

Birden çok koşula göre filtrelemek için mantıksal işleçler kullanın. Örneğin, & ve |, sırasıyla AND ve OR koşullarına izin vermenizi sağlar. Aşağıdaki örnek, c_nationkey değerine eşit olan ve 20 değeri c_acctbal değerinden büyük olan satırları filtreler.

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

Yinelenen satırları kaldırma

Satırların yinelenenlerini kaldırmak için, yalnızca benzersiz satırları döndüren distinct kullanın.

df_unique = df_customer.distinct()

Null değerleri işleme

Null değerleri işlemek için, na.drop yöntemini kullanarak null değerler içeren satırları silin. Bu yöntem, any null değerler veya all null değerler içeren satırları kaldırmak isteyip istemediğinizi belirtmenize olanak tanır.

Null değerleri bırakmak için aşağıdaki örneklerden birini kullanın.

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

Bunun yerine yalnızca tüm null değerleri içeren satırları filtrelemek istiyorsanız aşağıdakileri kullanın:

df_customer_no_nulls = df_customer.na.drop("all")

Aşağıda gösterildiği gibi bunu belirterek sütunların bir alt kümesi için uygulayabilirsiniz:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

Eksik değerleri doldurmak için yöntemini kullanın fill . Bunu tüm sütunlara veya sütunların bir alt kümesine uygulamayı seçebilirsiniz. Aşağıdaki örnekte, hesap bakiyesi için null değere sahip hesap bakiyeleri c_acctbal ile 0doldurulur.

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

Dizeleri diğer değerlerle değiştirmek için yöntemini kullanın replace . Aşağıdaki örnekte, tüm boş adres dizeleri sözcüğüyle UNKNOWNdeğiştirilir:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

Satırları ekle

Satırları eklemek için yöntemini kullanarak union yeni bir DataFrame oluşturmanız gerekir. Aşağıdaki örnekte, daha önce oluşturulmuş olan DataFrame ile df_that_one_customer birleştirilen df_filtered_customer, üç müşteriyi içeren bir DataFrame döndürür.

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Note

DataFrame'leri bir tabloya yazıp yeni satırlar ekleyerek de birleştirebilirsiniz. Üretim iş yükleri için veri kaynaklarının hedef tabloya artımlı olarak işlenmesi, verilerin boyutu büyüdükçe gecikme süresini ve işlem maliyetlerini önemli ölçüde azaltabilir. Bkz. Lakeflow Connect'te standart bağlayıcılar.

Satırları sıralama

Important

Büyük ölçeklerde sıralama maliyetli olabilir ve sıralanmış verileri depolar ve Spark ile yeniden yüklerseniz, verilerin sıralı kalacağı garanti edilmez. Sıralama kullanımınızda kasıtlı olarak kullandığınızdan emin olun.

Satırları bir veya daha fazla sütuna göre sıralamak için sort veya orderBy yöntemlerini kullanın. Varsayılan olarak bu yöntemler artan düzende sıralanır:

df_customer.orderBy(col("c_acctbal"))

Azalan sırada filtrelemek için kullanın desc:

df_customer.sort(col("c_custkey").desc())

Aşağıdaki örnekte iki sütuna göre sıralama gösterilmektedir:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

DataFrame sıralandıktan sonra döndürülecek satır sayısını sınırlamak için yöntemini kullanın limit . Aşağıdaki örnek yalnızca en iyi 10 sonuçları görüntüler:

display(df_sorted.limit(10))

DataFrame'leri Birleştirme

İki veya daha fazla DataFrame'i birleştirmek için yöntemini kullanın join . DataFrame'lerin how (birleştirme türü) ve on (birleştirmenin temel alınacağı sütunlar) parametrelerinde nasıl birleştirileceğini belirtebilirsiniz. Ortak birleştirme türleri şunlardır:

  • inner: Bu, DataFrame'ler arasında on parametresi için eşleşme olan satırları tutan bir DataFrame döndüren varsayılan birleştirme türüdür.
  • left: Bu, ilk belirtilen DataFrame'in tüm satırlarını ve yalnızca ikinci belirtilen DataFrame'den birinciyle eşleşen satırları tutar.
  • outer: Dış birleşim, eşleşmeye bakılmaksızın her iki DataFrame'den de tüm satırları korur.

Birleştirmeler hakkında ayrıntılı bilgi için, Azure Databricks'te birleştirmelerle çalışma bölümüne bakınız. PySpark'ta desteklenen birleştirmelerin listesi için bkz. DataFrame birleşimleri.

Aşağıdaki örnek, DataFrame'in her satırının orders DataFrame'den customers karşılık gelen satırla birleştirildiği tek bir DataFrame döndürür. İç birleşimler kullanılır; çünkü beklentiye göre her siparişin tek bir müşteriye ait olması beklenir.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

Birden fazla koşulda birleştirmek için & ve | gibi mantıksal operatörleri kullanarak sırasıyla AND ve OR belirtin. Aşağıdaki örnek, o_totalprice'dan büyük 500,000 satırlarını filtrelemek için ek bir koşul ekler.

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

Verileri toplama

SQL'deki bir GROUP BY'ya benzer şekilde, bir DataFrame'deki verileri toplamak için, gruplandırılacak sütunları belirtmek üzere groupBy yöntemini ve toplama işlemlerini belirtmek için agg yöntemini kullanın. İçeri aktarın: avg, sum, max ve min gibi yaygın birleştirmeleri pyspark.sql.functions'den. Aşağıdaki örnekte pazar segmentlerine göre ortalama müşteri bakiyesi gösterilmektedir:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

Bazı toplamalar eylemlerdir ve bu da hesaplamaları tetikledikleri anlamına gelir. Bu durumda, sonuçların çıktısını almak için başka eylemler kullanmanız gerekmez.

DataFrame'deki satırları saymak için yöntemini kullanın count :

df_customer.count()

Çağrıları zincirleme

DataFrame'leri dönüştüren yöntemler DataFrame'ler döndürür ve Spark, eylemler çağrılana kadar dönüştürmeler üzerinde işlem yapmaz. Bu tembel değerlendirme kolaylık ve okunabilirlik için birden çok yöntemi zincirleyebileceğiniz anlamına gelir. Aşağıdaki örnek, filtreleme, toplama ve sıralamanın nasıl zincirleneceğini gösterir.

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

DataFrame'inizi görselleştirme

Not defterindeki bir DataFrame'i görselleştirmek için, DataFrame'in sol üst kısmındaki tablonun yanındaki işaretine tıklayın + ve ardından DataFrame'inize göre bir veya daha fazla grafik eklemek için Görselleştirme'yi seçin. Görselleştirmelerle ilgili ayrıntılar için bkz. Databricks not defterlerindeki görselleştirmeler ve SQL düzenleyicisi.

display(df_order)

Databricks, ek görselleştirmeler gerçekleştirmek için Spark için pandas API'sini kullanmanızı önerir. .pandas_api() Spark DataFrame'i karşılık gelen pandas API'sine dönüştürmenizi sağlar. Daha fazla bilgi için bkz . Spark üzerinde Pandas API'si.

Verilerinizi kaydetme

Verilerinizi dönüştürdükten sonra, DataFrameWriter yöntemlerini kullanarak kaydedebilirsiniz. Bu yöntemlerin tam listesi DataFrameWriter'da bulunabilir. Aşağıdaki bölümlerde DataFrame'inizi tablo olarak ve veri dosyaları koleksiyonu olarak kaydetme gösterilmektedir.

DataFrame'inizi tablo olarak kaydetme

DataFrame'inizi Unity Kataloğu'nda tablo olarak kaydetmek için write.saveAsTable yöntemini kullanın ve yolu <catalog-name>.<schema-name>.<table-name> biçiminde belirtin.

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

DataFrame'inizi CSV olarak yazma

DataFrame'inizi *.csv biçiminde yazmak için, format ve seçenekleri belirterek write.csv yöntemini kullanın. Belirtilen yolda veriler varsa, varsayılan olarak yazma işlemi başarısız olur. Farklı bir işlem yapmak için aşağıdaki modlardan birini belirtebilirsiniz:

  • overwrite DataFrame içeriğiyle hedef yolda var olan tüm verileri geçersiz kılar.
  • append DataFrame'in içeriğini hedef yoldaki verilere ekler.
  • ignore hedef yolda veriler varsa yazma işlemi sessizce başarısız olur.

Aşağıdaki örnek, DataFrame içeriğiyle verilerin üzerine CSV dosyaları olarak yazma işlemini gösterir:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

Sonraki Adımlar

Databricks'te daha fazla Spark özelliğinden yararlanmak için bkz: