Brug Spark til at arbejde med datafiler
Når du har konfigureret en notesbog og knyttet den til en klynge, kan du bruge Spark til at læse og behandle datafiler. Spark understøtter en lang række formater – f.eks. CSV, JSON, Parquet, ORC, Avro og Delta – og Databricks leverer indbyggede connectorer til at få adgang til filer, der er gemt i arbejdsområdet, i Azure Data Lake eller Blob Storage eller i andre eksterne systemer.
Arbejdsgangen følger normalt tre trin:
Læs en fil i en Spark DataFrame ved hjælp af spark.read med det korrekte format og den korrekte sti. Når du læser rå tekstformater som CSV eller JSON, kan Spark udlede skemaet (kolonnenavne og datatyper), men dette er nogle gange langsomt eller upålideligt. En bedre praksis i produktion er at definere skemaet eksplicit, så dataene indlæses konsekvent og effektivt.
Udforsk og transformer DataFrame ved hjælp af SQL- eller DataFrame-handlinger (f.eks. filtrering af rækker, valg af kolonner, aggregering af værdier).
Skriv resultaterne tilbage til lageret i et valgt format.
Arbejde med filer i Spark er designet til at være ensartet på tværs af små og store datasæt. Den samme kode, der bruges til at teste en lille CSV-fil, fungerer også på meget større datasæt, da Spark distribuerer arbejdet på tværs af klyngen. Dette gør det nemmere at skalere op fra hurtig udforskning til mere kompleks databehandling.
Indlæser data i en dataramme
Lad os udforske et hypotetisk eksempel for at se, hvordan du kan bruge en dataramme til at arbejde med data. Lad os antage, at du har følgende data i en kommasepareret tekstfil med navnet products.csv i datamappen i dit DBFS-lager (Databricks File System):
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
I en Spark-notesbog kan du bruge følgende PySpark-kode til at indlæse dataene i en dataramme og vise de første 10 rækker:
%pyspark
df = spark.read.load('/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Linjen %pyspark i begyndelsen kaldes en magi, og fortæller Spark, at det sprog, der bruges i denne celle, er PySpark. Her er den tilsvarende Scala-kode for produktdataeksempel:
%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))
Den magiske %spark bruges til at angive Scala.
Drikkepenge
Du kan også vælge det sprog, du vil bruge til hver celle i brugergrænsefladen i notesbogen.
Begge de eksempler, der blev vist tidligere, ville producere output som dette:
| Instruktion | Produktnavn | Kategori | Listepris |
|---|---|---|---|
| 771 | Bjerg-100 Sølv, 38 | Mountainbikes | 3399.9900 |
| 772 | Bjerg-100 Sølv, 42 | Mountainbikes | 3399.9900 |
| 773 | Bjerg-100 Sølv, 44 | Mountainbikes | 3399.9900 |
| ... | ... | ... | ... |
Angivelse af et datarammeskema
I det forrige eksempel indeholdt den første række i CSV-filen kolonnenavnene, og Spark kunne udlede datatypen for hver kolonne ud fra de data, den indeholder. Du kan også angive et eksplicit skema for dataene, hvilket er nyttigt, når kolonnenavnene ikke er inkluderet i datafilen, f.eks. dette CSV-eksempel:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
I følgende PySpark-eksempel kan du se, hvordan du angiver et skema for den dataramme, der skal indlæses fra en fil med navnet product-data.csv i dette format:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Resultaterne vil igen ligne:
| Instruktion | Produktnavn | Kategori | Listepris |
|---|---|---|---|
| 771 | Bjerg-100 Sølv, 38 | Mountainbikes | 3399.9900 |
| 772 | Bjerg-100 Sølv, 42 | Mountainbikes | 3399.9900 |
| 773 | Bjerg-100 Sølv, 44 | Mountainbikes | 3399.9900 |
| ... | ... | ... | ... |
Filtrering og gruppering af datarammer
Du kan bruge metoderne i datarammeklassen til at filtrere, sortere, gruppere og på anden måde manipulere de data, den indeholder. I følgende kodeeksempel bruges select f.eks. metoden til at hente kolonnerne ProductName og ListPrice fra den df-dataramme , der indeholder produktdata i det forrige eksempel:
pricelist_df = df.select("ProductID", "ListPrice")
Resultaterne fra dette kodeeksempel ser nogenlunde sådan ud:
| Instruktion | Listepris |
|---|---|
| 771 | 3399.9900 |
| 772 | 3399.9900 |
| 773 | 3399.9900 |
| ... | ... |
Som med de fleste datamanipulationsmetoder select returneres et nyt dataframe-objekt.
Drikkepenge
Valg af et undersæt af kolonner fra en dataramme er en almindelig handling, som også kan opnås ved hjælp af følgende kortere syntaks:
pricelist_df = df["ProductID", "ListPrice"]
Du kan "kæde" metoder sammen for at udføre en række manipulationer, der resulterer i en transformeret dataramme. Denne eksempelkode kæder f.eks. metoderne select og where til at oprette en ny dataramme, der indeholder kolonnerne ProductName og ListPrice for produkter med en kategori af mountainbikes eller landevejscykler:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Resultaterne fra dette kodeeksempel ser nogenlunde sådan ud:
| Produktnavn | Listepris |
|---|---|
| Bjerg-100 Sølv, 38 | 3399.9900 |
| Road-750 Sort, 52 | 539.9900 |
| ... | ... |
Hvis du vil gruppere og aggregere data, kan du bruge metode- og aggregeringsfunktionerne groupby . Følgende PySpark-kode tæller f.eks. antallet af produkter for hver kategori:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Resultaterne fra dette kodeeksempel ser nogenlunde sådan ud:
| Kategori | tælle |
|---|---|
| Headsets | 3 |
| Hjul | 14 |
| Mountainbikes | 32 |
| ... | ... |
Seddel
Spark DataFrames er deklarative og uforanderlige. Hver transformation (f.eks. select, filter, eller groupBy) opretter en ny DataFrame, der repræsenterer det, du ønsker, ikke hvordan den kører. Dette gør kode genanvendelig, optimerbar og fri for bivirkninger. Men ingen af disse transformationer udføres faktisk, før du udløser en handling (f.eks. display, collect, write), hvorefter Spark kører den fulde optimerede plan.
Brug af SQL-udtryk i Spark
Dataframe-API'en er en del af et Spark-bibliotek med navnet Spark SQL, som gør det muligt for dataanalytikere at bruge SQL-udtryk til at forespørge på og manipulere data.
Oprettelse af databaseobjekter i Spark-kataloget
Spark-kataloget er et metalager for relationsdataobjekter, f.eks. visninger og tabeller. Spark-kørsel kan bruge kataloget til problemfrit at integrere kode, der er skrevet på et hvilket som helst Spark-understøttet sprog, med SQL-udtryk, der kan være mere naturlige for nogle dataanalytikere eller udviklere.
En af de nemmeste måder at gøre data i en dataramme tilgængelig for forespørgsler i Spark-kataloget på er ved at oprette en midlertidig visning, som vist i følgende kodeeksempel:
df.createOrReplaceTempView("products")
En visning er midlertidig, hvilket betyder, at den slettes automatisk i slutningen af den aktuelle session. Du kan også oprette tabeller , der bevares i kataloget, for at definere en database, der kan forespørges ved hjælp af Spark SQL.
Seddel
Vi udforsker ikke Spark-katalogtabeller i dybden i dette modul, men det er værd at bruge tid på at fremhæve nogle få vigtige punkter:
- Du kan oprette en tom tabel ved hjælp af metoden
spark.catalog.createTable. Tabeller er metadatastrukturer, der gemmer deres underliggende data på den lagerplacering, der er knyttet til kataloget. Hvis du sletter en tabel, slettes de underliggende data også. - Du kan gemme en dataramme som en tabel ved hjælp af metoden
saveAsTable. - Du kan oprette en ekstern tabel ved hjælp
spark.catalog.createExternalTableaf metoden . Eksterne tabeller definerer metadata i kataloget, men henter deres underliggende data fra en ekstern lagerplacering. typisk en mappe i en data lake. Hvis du sletter en ekstern tabel, slettes de underliggende data ikke.
Brug af Spark SQL API til at forespørge om data
Du kan bruge Spark SQL-API'en i kode, der er skrevet på et hvilket som helst sprog, til at forespørge om data i kataloget. Følgende PySpark-kode bruger f.eks. en SQL-forespørgsel til at returnere data fra produktvisningen som en dataramme.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
Resultaterne fra kodeeksemplen ligner følgende tabel:
| Produktnavn | Listepris |
|---|---|
| Bjerg-100 Sølv, 38 | 3399.9900 |
| Road-750 Sort, 52 | 539.9900 |
| ... | ... |
Brug af SQL-kode
I det forrige eksempel blev det vist, hvordan du bruger Spark SQL-API'en til at integrere SQL-udtryk i Spark-kode. I en notesbog kan du også bruge %sql magi til at køre SQL-kode, der forespørger objekter i kataloget, på følgende måde:
%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
Eksempel på SQL-kode returnerer et resultatsæt, der automatisk vises i notesbogen som en tabel, f.eks. den nedenfor:
| Kategori | Produktantal |
|---|---|
| Bib-Shorts | 3 |
| Cykelstativer | 0 |
| Cykelstativer | 0 |
| ... | ... |