Modul 3: Durchführen der Datenbereinigung und -vorbereitung mithilfe von Apache Spark
Das NYC Yellow Taxi-Dataset enthält mehr als 1,5 Milliarden Reisedatensätze, wobei jeden Monat Reisedaten in Millionen von Datensätzen ausgeführt werden, was die Verarbeitung dieser Datensätze rechnerisch teuer macht und oft nicht mit nicht verteilten Verarbeitungs-Engines möglich ist.
Wichtig
Microsoft Fabric befindet sich derzeit in der Vorschauversion. Diese Informationen beziehen sich auf eine Vorabversion des Produkts, an der vor der Veröffentlichung noch wesentliche Änderungen vorgenommen werden können. Microsoft übernimmt keine Garantie, weder ausdrücklich noch stillschweigend, für die hier bereitgestellten Informationen.
In diesem Tutorial wird veranschaulicht, wie Sie Apache Spark-Notebooks zum sauber und Vorbereiten des Datasets für Taxifahrten verwenden. Die optimierte Verteilungs-Engine von Spark eignet sich ideal für die Verarbeitung großer Datenmengen.
Tipp
Verwenden Sie für Datasets mit relativ kleiner Größe die Data Wrangler-Benutzeroberfläche, die ein Notebook-basiertes grafisches Benutzeroberflächestool ist, das benutzern, die mit Pandas-Datenframes in Microsoft Fabric-Notebooks arbeiten, eine interaktive Untersuchung und eine Datenbereinigung bietet.
In den folgenden Schritten lesen Sie die rohen NYC Taxi-Daten aus einer Lakehouse Delta Lake-Tabelle (gespeichert in Modul 1) und führen verschiedene Vorgänge aus, um diese Daten zu sauber und zu transformieren, um sie für das Trainieren von Machine Learning-Modellen vorzubereiten.
Folgen Sie im Notizbuch
Die python-Befehle/skripts, die in jedem Schritt dieses Tutorials verwendet werden, finden Sie im begleitenden Notebook: 03-perform-data-cleansing-and-preparation-using-apache-spark.ipynb. Achten Sie darauf, dass Sie ein Lakehouse an das Notebook anfügen , bevor Sie es ausführen.
Reinigen und Vorbereiten
Laden Sie nyc yellow taxi Data from lakehouse delta table nyctaxi_raw using the
spark.read
command.nytaxi_df = spark.read.format("delta").load("Tables/nyctaxi_raw")
Um den Datenbereinigungsprozess zu unterstützen, verwenden wir als Nächstes das integrierte Zusammenfassungsfeature von Apache Spark, das Zusammenfassungsstatistiken generiert, bei denen es sich um numerische Measures handelt, die Aspekte einer Spalte im Datenrahmen beschreiben. Diese Measures umfassen Anzahl, Mittelwert, Standardabweichung, min und max. Verwenden Sie den folgenden Befehl, um die Zusammenfassungsstatistik aller Spalten im Taxidataset anzuzeigen.
display(nytaxi_df.summary())
Hinweis
Das Generieren von Zusammenfassungsstatistiken ist ein rechenintensiver Prozess und kann je nach Größe des Datenrahmens eine erhebliche Ausführungszeit in Anspruch nehmen. In diesem Tutorial dauert der Schritt zwischen zwei und drei Minuten.
In diesem Schritt sauber wir den nytaxi_df-Datenrahmen und fügen weitere Spalten hinzu, die von den Werten vorhandener Spalten abgeleitet werden.
Im Folgenden finden Sie die in diesem Schritt ausgeführten Vorgänge:
Hinzufügen abgeleiteter Spalten
- pickupDate: Konvertieren von datetime in date für Visualisierungen und Berichte
- weekDay – Tagesanzahl der Woche
- weekDayName – Tagnamen abgekürzt
- dayofMonth – Tagesanzahl des Monats
- pickupHour - Stunde der Abholzeit
- tripDuration : Stellt die Dauer in Minuten der Reise dar
- timeBins – Binned Time of the Day
Filterbedingungen
- fareAmount liegt zwischen und 100.
- tripDistance größer als 0.
- tripDuration beträgt weniger als 3 Stunden (180 Minuten).
- passengerCount liegt zwischen 1 und 8.
- startLat, startLon, endLat, endLon sind nicht NULL.
- Entfernen Sie Ausreißertrips (Ausreißer) tripDistance>100.
from pyspark.sql.functions import col,when, dayofweek, date_format, hour,unix_timestamp, round, dayofmonth, lit nytaxidf_prep = nytaxi_df.withColumn('pickupDate', col('tpepPickupDateTime').cast('date'))\ .withColumn("weekDay", dayofweek(col("tpepPickupDateTime")))\ .withColumn("weekDayName", date_format(col("tpepPickupDateTime"), "EEEE"))\ .withColumn("dayofMonth", dayofweek(col("tpepPickupDateTime")))\ .withColumn("pickupHour", hour(col("tpepPickupDateTime")))\ .withColumn("tripDuration", (unix_timestamp(col("tpepDropoffDateTime")) - unix_timestamp(col("tpepPickupDateTime")))/60)\ .withColumn("timeBins", when((col("pickupHour") >=7) & (col("pickupHour")<=10) ,"MorningRush")\ .when((col("pickupHour") >=11) & (col("pickupHour")<=15) ,"Afternoon")\ .when((col("pickupHour") >=16) & (col("pickupHour")<=19) ,"EveningRush")\ .when((col("pickupHour") <=6) | (col("pickupHour")>=20) ,"Night"))\ .filter("""fareAmount > 0 AND fareAmount < 100 and tripDistance > 0 AND tripDistance < 100 AND tripDuration > 0 AND tripDuration <= 189 AND passengerCount > 0 AND passengerCount <= 8 AND startLat IS NOT NULL AND startLon IS NOT NULL AND endLat IS NOT NULL AND endLon IS NOT NULL""")
Hinweis
Apache Spark verwendet das Lazy-Auswertungsparadigma, das die Ausführung von Transformationen verzögert, bis eine Aktion ausgelöst wird. Dadurch kann Spark den Ausführungsplan optimieren und unnötige Berechnungen vermeiden. In diesem Schritt werden die Definitionen der Transformationen und Filter erstellt. Die eigentliche Bereinigung und Transformation wird ausgelöst, sobald Daten im nächsten Schritt geschrieben wurden (eine Aktion).
Nachdem wir die Bereinigungsschritte definiert und einem Dataframe namens nytaxidf_prep zugewiesen haben, schreiben wir die bereinigten und vorbereiteten Daten mit den folgenden Befehlen in eine neue Deltatabelle (nyctaxi_prep) im angefügten Lakehouse.
table_name = "nyctaxi_prep" nytaxidf_prep.write.mode("overwrite").format("delta").save(f"Tables/{table_name}") print(f"Spark dataframe saved to delta table: {table_name}")
Die bereinigten und aufbereiteten Daten, die in diesem Modul erzeugt werden, stehen nun als Deltatabelle im Lakehouse zur Verfügung und können für die weitere Verarbeitung und Generierung von Erkenntnissen verwendet werden.