Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
GPS verilerini alan, koordinatları yerel uzamsal türlere dönüştüren ve gelenleri izlemek amacıyla veri düzenlemesi için Lakeflow Spark Bildirimli İşlem Hatlarını (SDP) ve Otomatik Yükleyici'yi kullanarak ambar bölgelerine karşı bu verileri eşleştiren bir işlem hattı oluşturmayı ve dağıtmayı öğrenin. Bu öğreticide Databricks'in yerel uzamsal türleri (GEOMETRY, GEOGRAPHY) ve ST_Point, ST_GeomFromWKT ve ST_Contains gibi yerleşik uzamsal işlevler kullanılmaktadır, böylece jeo-uzamsal iş akışlarını dış kitaplıklara ihtiyaç duymadan büyük ölçekli olarak çalıştırabilirsiniz.
Bu kılavuzda aşağıdakileri yapacaksınız:
- Bir işlem hattı oluşturun ve Unity Kataloğu biriminde örnek GPS ve coğrafi konum verileri oluşturun.
- Ham GPS ping'lerini Otomatik Yükleyici ile artımlı olarak bronz akış tablosuna alın.
- Enlem ve boylamı yerel
GEOMETRYbir noktaya dönüştüren bir gümüş akış tablosu oluşturun. - WKT çokgenlerinden ambar coğrafi bölgelerinin gerçekleştirilmiş bir görünümünü oluşturun.
- Ambar gelenleri (hangi cihazın hangi coğrafi bölgeye girdiğini) içeren bir tablo oluşturmak için uzamsal birleştirmeyi çalıştırın.
Sonuç, medalyon tarzı bir sistemdir: bronz (ham GPS), gümüş (geometri olarak noktalar) ve altın (coğrafi sınırlar ve varış durumları). Daha fazla bilgi için bkz. Madalyon göl evi mimarisi nedir?
Gereksinimler
Bu öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:
- Azure Databricks çalışma alanında oturum açabilirsiniz.
- Çalışma alanınız için Unity Kataloğu'nu etkinleştirin.
- Sunucusuz Lakeflow Spark Bildirimli İşlem Hatlarını kullanmak istiyorsanız hesabınız için sunucusuz işlemin etkinleştirilmesini sağlayın. Sunucusuz işlem etkin değilse, adımlar çalışma alanınız için varsayılan işlemle çalışır.
- İşlem kaynağı oluşturma veya işlem kaynağına erişim iznine sahip olun.
-
Katalogda yeni şema oluşturma izinlerine sahip olun. Gerekli izinler
USE CATALOGveCREATE SCHEMA'dir. -
Mevcut şemada yeni birim oluşturma izinlerine sahip olun. Gerekli izinler
USE SCHEMAveCREATE VOLUME'dir. - Yerel uzamsal türleri ve uzamsal işlevleri destekleyen bir çalışma zamanı kullanın.
1. Adım: İşlem hattı oluşturma
Yeni bir ETL işlem hattı oluşturun ve tablolarınız için varsayılan kataloğu ve şemayı ayarlayın.
Çalışma alanınızda
Sol üst köşede yeni.
ETL İşlem Hattı'ne tıklayın.
İşlem hattının başlığını
Spatial pipeline tutorialveya tercih ettiğiniz bir adla değiştirin.Başlığın altında, yazma izinlerine sahip olduğunuz bir katalog ve şema seçin.
Bu katalog ve şema, kodunuzda bir katalog veya şema belirtmediğinizde varsayılan olarak kullanılır. ve
<catalog>değerlerini aşağıdaki adımlarda burada seçtiğiniz değerlerle değiştirin<schema>.Gelişmiş seçenekler'deBoş bir dosyayla başla'yı seçin.
Kodunuz için bir klasör seçin. Gözat seçeneğine basarak bir klasör seçebilirsiniz; sürüm denetimi için bir Git deposu kullanabilirsiniz.
İlk dosyanızın dili için Python veya SQL'i seçin. Dosyaları daha sonra diğer dilde ekleyebilirsiniz.
İşlem hattını oluşturmak ve Lakeflow Pipelines Düzenleyicisi'ni açmak için Seç'e tıklayın.
Artık varsayılan katalog ve şemaya sahip boş bir işlem hattınız var. Ardından örnek GPS ve coğrafi konum verilerini oluşturun.
2. Adım: Örnek GPS ve coğrafi konum verilerini oluşturma
Bu adım bir birimde örnek veriler oluşturur: ham GPS ping'leri (JSON) ve ambar coğrafi konumları (WKT poligonlu JSON). GPS noktaları, iki ambar poligonuyla çakışan bir sınırlayıcı kutuda oluşturulur, bu nedenle sonraki bir adımda uzamsal birleşim varış satırlarını döndürür. Bir birimde veya tabloda zaten kendi verileriniz varsa bu adımı atlayabilirsiniz.
Lakeflow Pipelines Düzenleyicisi'ndeki varlık tarayıcısında
Ekle'yi ve ardından Keşif'i seçin.
Adı olarak ayarla, Python'ı seç ve varsayılan hedef klasörü olduğu gibi bırak.
Oluştur'utıklayın.
Yeni not defterine aşağıdaki kodu yapıştırın.
<catalog>ve<schema>değerlerini 1. Adımda ayarladığınız varsayılan katalog ve şema ile değiştirin.GPS ve coğrafi konum verileri oluşturmak için not defterinde aşağıdaki kodu kullanın.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Not defteri hücresini çalıştırın (Shift + Enter).
Çalıştırma tamamlandıktan sonra hacim gps (ham pingler) ve geofences (WKT'de çokgenler) içerir. Sonraki adımda, GPS verilerini bir bronze tabloya aktarırsınız.
3. Adım: GPS verilerini bronz akış tablosuna alma
Otomatik Yükleyici kullanarak ham GPS JSON'u artımlı olarak veri deposundan al ve bronze akış tablosuna yaz.
Varlık tarayıcısında
Ekle'yi ve ardından Dönüştürme'yi seçin.
Adı
gps_bronzeolarak ayarla, SQL veya Python seç ve Oluştur'a tıkla.Dosya içeriğini aşağıdakilerle değiştirin (dilinizle eşleşen sekmeyi kullanın). Varsayılan kataloğunuz ve şemanızla
<catalog>ve<schema>öğelerini değiştirin.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Bir güncelleştirmeyi çalıştırmak için dosyayı veya İşlem hattını çalıştır'ı çalıştırın.
Güncelleme tamamlandığında, pipeline grafiği gps_bronze tablosunu gösterir. Ardından koordinatları yerel geometri noktasına dönüştüren bir gümüş tablo ekleyin.
Adım 4: Geometri noktaları içeren bir gümüş veri akış tablosu ekleyin
Bronz tablodan okuyan ve ST_Point(longitude, latitude) kullanarak GEOMETRY sütunu ekleyen bir akış tablosu oluşturun.
Varlık tarayıcısında
Ekle'yi ve ardından Dönüştürme'yi seçin.
Adı
raw_gps_silverolarak ayarla, SQL veya Python seç ve Oluştur'a tıkla.Aşağıdaki kodu yeni dosyaya yapıştırın.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Dosyayı çalıştırın veya işlem hattını çalıştırın.
İşlem hattı grafiği artık gps_bronze ve raw_gps_silver değerlerini gösteriyor. Ardından, ambar coğrafi bölgelerini gerçekleştirilmiş görünüm olarak ekleyin.
5. Adım: Ambar geofences altın tablosunu oluşturma
Birimden coğrafi alanları okuyan ve WKT sütununu ST_GeomFromWKT kullanarak GEOMETRY sütununa dönüştüren bir maddi görünüm oluşturun.
Varlık tarayıcısında
Ekle'yi ve ardından Dönüştürme'yi seçin.
Ad'ı
warehouse_geofences_goldolarak ayarlayın, SQL veya Python'ı seçin ve Oluştur'a tıklayın.Aşağıdaki kodu yapıştırın. Varsayılan kataloğunuz ve şemanızla
<catalog>ve<schema>öğelerini değiştirin.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Dosyayı çalıştırın veya işlem hattını çalıştırın.
İşlem hattı artık geofences tablosunu içerir. Ardından uzamsal birleştirmeyi depo varışlarını hesaplamaya ekleyin.
6. Adım: Uzamsal birleşimle ambar varışları tablosunu oluşturma
Bir cihazın bir ambar çokgeninin içinde ne zaman olduğunu belirlemek için ST_Contains(boundary_geom, point_geom) kullanarak gümüş GPS noktalarını coğrafi bölgelere birleştiren gerçekleştirilmiş bir görünüm ekleyin.
Varlık tarayıcısında
Ekle'yi ve ardından Dönüştürme'yi seçin.
Adı
warehouse_arrivalsolarak ayarla, SQL veya Python seç ve Oluştur'a tıkla.Aşağıdaki kodu yapıştırın.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Dosyayı çalıştırın veya işlem hattını çalıştırın.
Güncelleştirme tamamlandığında işlem hattı grafiği dört veri kümesinin tümünü gösterir: gps_bronze, raw_gps_silver, warehouse_geofences_goldve warehouse_arrivals.
Uzamsal birleştirmeyi doğrulama
Uzamsal birleştirme sonucu üretilen satırları onaylayın: bir coğrafi bölge içinde bulunan gümüş tablodaki noktalar, warehouse_arrivals içinde görünmelidir. Not defterinde veya SQL düzenleyicisinde aşağıdakilerden birini çalıştırın (işlem hattı hedefinizle aynı kataloğu ve şemayı kullanın).
Ambara göre gelenleri say (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Warehouse_A ve Warehouse_B için sıfır olmayan sayıları görmelisiniz (örnek GPS verileri her iki çokgenle de çakışıyor). Örnek satırları incelemek için:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Python'da (not defteri) aynı denetimler:
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
warehouse_arrivals içinde satırlar görürseniz, ST_Contains(boundary_geom, point_geom) birleştirme düzgün çalışıyor.
7. Adım: İşlem hattını zamanlama (isteğe bağlı)
Yeni GPS verileri birime eklendikçe işlem hattını güncel tutmak için işlem hattını bir zamanlamaya göre çalıştırmak için bir iş oluşturun.
- Düzenleyicinin üst kısmında Zamanla düğmesini seçin.
- Zamanlamalar iletişim kutusu görüntülenirse Zamanlama ekle'yi seçin.
- İsteğe bağlı olarak, işe bir ad verin.
- Varsayılan olarak, zamanlama günde bir kez çalışır. Bunu kabul edebilir veya kendiniz ayarlayabilirsiniz. Gelişmiş'i seçmek belirli bir zaman ayarlamanıza olanak tanır; Diğer seçenekler çalıştırma bildirimleri eklemenize olanak tanır.
- Zamanlamayı uygulamak için Oluştur'u seçin.
İş yürütmeleri hakkında daha fazla bilgi için bkz. Lakeflow İşleri için izleme ve gözlemleme.