Aracılığıyla paylaş


Kılavuz: Yerel uzamsal türlerle jeouzamsal işlem hattı oluşturma

GPS verilerini alan, koordinatları yerel uzamsal türlere dönüştüren ve gelenleri izlemek amacıyla veri düzenlemesi için Lakeflow Spark Bildirimli İşlem Hatlarını (SDP) ve Otomatik Yükleyici'yi kullanarak ambar bölgelerine karşı bu verileri eşleştiren bir işlem hattı oluşturmayı ve dağıtmayı öğrenin. Bu öğreticide Databricks'in yerel uzamsal türleri (GEOMETRY, GEOGRAPHY) ve ST_Point, ST_GeomFromWKT ve ST_Contains gibi yerleşik uzamsal işlevler kullanılmaktadır, böylece jeo-uzamsal iş akışlarını dış kitaplıklara ihtiyaç duymadan büyük ölçekli olarak çalıştırabilirsiniz.

Bu kılavuzda aşağıdakileri yapacaksınız:

  • Bir işlem hattı oluşturun ve Unity Kataloğu biriminde örnek GPS ve coğrafi konum verileri oluşturun.
  • Ham GPS ping'lerini Otomatik Yükleyici ile artımlı olarak bronz akış tablosuna alın.
  • Enlem ve boylamı yerel GEOMETRY bir noktaya dönüştüren bir gümüş akış tablosu oluşturun.
  • WKT çokgenlerinden ambar coğrafi bölgelerinin gerçekleştirilmiş bir görünümünü oluşturun.
  • Ambar gelenleri (hangi cihazın hangi coğrafi bölgeye girdiğini) içeren bir tablo oluşturmak için uzamsal birleştirmeyi çalıştırın.

Sonuç, medalyon tarzı bir sistemdir: bronz (ham GPS), gümüş (geometri olarak noktalar) ve altın (coğrafi sınırlar ve varış durumları). Daha fazla bilgi için bkz. Madalyon göl evi mimarisi nedir?

Gereksinimler

Bu öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:

1. Adım: İşlem hattı oluşturma

Yeni bir ETL işlem hattı oluşturun ve tablolarınız için varsayılan kataloğu ve şemayı ayarlayın.

  1. Çalışma alanınızda Artı simgesine tıklayın. Sol üst köşede yeni.

  2. ETL İşlem Hattı'ne tıklayın.

  3. İşlem hattının başlığını Spatial pipeline tutorial veya tercih ettiğiniz bir adla değiştirin.

  4. Başlığın altında, yazma izinlerine sahip olduğunuz bir katalog ve şema seçin.

    Bu katalog ve şema, kodunuzda bir katalog veya şema belirtmediğinizde varsayılan olarak kullanılır. ve <catalog> değerlerini aşağıdaki adımlarda burada seçtiğiniz değerlerle değiştirin<schema>.

  5. Gelişmiş seçenekler'deBoş bir dosyayla başla'yı seçin.

  6. Kodunuz için bir klasör seçin. Gözat seçeneğine basarak bir klasör seçebilirsiniz; sürüm denetimi için bir Git deposu kullanabilirsiniz.

  7. İlk dosyanızın dili için Python veya SQL'i seçin. Dosyaları daha sonra diğer dilde ekleyebilirsiniz.

  8. İşlem hattını oluşturmak ve Lakeflow Pipelines Düzenleyicisi'ni açmak için Seç'e tıklayın.

Artık varsayılan katalog ve şemaya sahip boş bir işlem hattınız var. Ardından örnek GPS ve coğrafi konum verilerini oluşturun.

2. Adım: Örnek GPS ve coğrafi konum verilerini oluşturma

Bu adım bir birimde örnek veriler oluşturur: ham GPS ping'leri (JSON) ve ambar coğrafi konumları (WKT poligonlu JSON). GPS noktaları, iki ambar poligonuyla çakışan bir sınırlayıcı kutuda oluşturulur, bu nedenle sonraki bir adımda uzamsal birleşim varış satırlarını döndürür. Bir birimde veya tabloda zaten kendi verileriniz varsa bu adımı atlayabilirsiniz.

  1. Lakeflow Pipelines Düzenleyicisi'ndeki varlık tarayıcısında Artı simgesine tıklayın.Ekle'yi ve ardından Keşif'i seçin.

  2. Adı olarak ayarla, Python'ı seç ve varsayılan hedef klasörü olduğu gibi bırak.

  3. Oluştur'utıklayın.

  4. Yeni not defterine aşağıdaki kodu yapıştırın. <catalog> ve <schema> değerlerini 1. Adımda ayarladığınız varsayılan katalog ve şema ile değiştirin.

    GPS ve coğrafi konum verileri oluşturmak için not defterinde aşağıdaki kodu kullanın.

    from pyspark.sql import functions as F
    
    catalog = "<catalog>"   # for example, "main"
    schema = "<schema>"    # for example, "default"
    
    spark.sql(f"USE CATALOG `{catalog}`")
    spark.sql(f"USE SCHEMA `{schema}`")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`")
    volume_base = f"/Volumes/{catalog}/{schema}/raw_data"
    
    # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area)
    gps_path = f"{volume_base}/gps"
    df_gps = (
        spark.range(0, 5000)
        .repartition(10)
        .select(
            F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"),
            F.current_timestamp().alias("timestamp"),
            (-118.3 + F.rand() * 0.2).alias("longitude"),   # -118.3 to -118.1
            (34.0 + F.rand() * 0.2).alias("latitude"),     # 34.0 to 34.2
        )
    )
    df_gps.write.format("json").mode("overwrite").save(gps_path)
    print(f"Wrote 5000 GPS rows to {gps_path}")
    
    # Geofences: two warehouse polygons (WKT) in the same region
    geofences_path = f"{volume_base}/geofences"
    geofences_data = [
        ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"),
        ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"),
    ]
    df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"])
    df_geo.write.format("json").mode("overwrite").save(geofences_path)
    print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")
    
  5. Not defteri hücresini çalıştırın (Shift + Enter).

Çalıştırma tamamlandıktan sonra hacim gps (ham pingler) ve geofences (WKT'de çokgenler) içerir. Sonraki adımda, GPS verilerini bir bronze tabloya aktarırsınız.

3. Adım: GPS verilerini bronz akış tablosuna alma

Otomatik Yükleyici kullanarak ham GPS JSON'u artımlı olarak veri deposundan al ve bronze akış tablosuna yaz.

  1. Varlık tarayıcısında Artı simgesine tıklayın.Ekle'yi ve ardından Dönüştürme'yi seçin.

  2. Adı gps_bronze olarak ayarla, SQL veya Python seç ve Oluştur'a tıkla.

  3. Dosya içeriğini aşağıdakilerle değiştirin (dilinizle eşleşen sekmeyi kullanın). Varsayılan kataloğunuz ve şemanızla <catalog> ve <schema> öğelerini değiştirin.

    SQL

    CREATE OR REFRESH STREAMING TABLE gps_bronze
    COMMENT "Raw GPS pings ingested from volume using Auto Loader";
    
    CREATE FLOW gps_bronze_ingest_flow AS
    INSERT INTO gps_bronze BY NAME
    SELECT *
    FROM STREAM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/gps",
      format => "json",
      inferColumnTypes => "true"
    )
    

    Python

    from pyspark import pipelines as dp
    
    path = "/Volumes/<catalog>/<schema>/raw_data/gps"
    
    dp.create_streaming_table(
      name="gps_bronze",
      comment="Raw GPS pings ingested from volume using Auto Loader",
    )
    
    @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow")
    def gps_bronze_ingest_flow():
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load(path)
        )
    
  4. Yürüt simgesine tıklayın. Bir güncelleştirmeyi çalıştırmak için dosyayı veya İşlem hattını çalıştır'ı çalıştırın.

Güncelleme tamamlandığında, pipeline grafiği gps_bronze tablosunu gösterir. Ardından koordinatları yerel geometri noktasına dönüştüren bir gümüş tablo ekleyin.

Adım 4: Geometri noktaları içeren bir gümüş veri akış tablosu ekleyin

Bronz tablodan okuyan ve ST_Point(longitude, latitude) kullanarak GEOMETRY sütunu ekleyen bir akış tablosu oluşturun.

  1. Varlık tarayıcısında Artı simgesine tıklayın.Ekle'yi ve ardından Dönüştürme'yi seçin.

  2. Adı raw_gps_silver olarak ayarla, SQL veya Python seç ve Oluştur'a tıkla.

  3. Aşağıdaki kodu yeni dosyaya yapıştırın.

    SQL

    CREATE OR REFRESH STREAMING TABLE raw_gps_silver
    COMMENT "GPS pings with native geometry point for spatial joins";
    
    CREATE FLOW raw_gps_silver_flow AS
    INSERT INTO raw_gps_silver BY NAME
    SELECT
      device_id,
      timestamp,
      longitude,
      latitude,
      ST_Point(longitude, latitude) AS point_geom
    FROM STREAM(gps_bronze)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    dp.create_streaming_table(
      name="raw_gps_silver",
      comment="GPS pings with native geometry point for spatial joins",
    )
    
    @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow")
    def raw_gps_silver_flow():
        return (
            spark.readStream.table("gps_bronze")
            .select(
                "device_id",
                "timestamp",
                "longitude",
                "latitude",
                F.expr("ST_Point(longitude, latitude)").alias("point_geom"),
            )
        )
    
  4. Yürüt simgesine tıklayın. Dosyayı çalıştırın veya işlem hattını çalıştırın.

İşlem hattı grafiği artık gps_bronze ve raw_gps_silver değerlerini gösteriyor. Ardından, ambar coğrafi bölgelerini gerçekleştirilmiş görünüm olarak ekleyin.

5. Adım: Ambar geofences altın tablosunu oluşturma

Birimden coğrafi alanları okuyan ve WKT sütununu ST_GeomFromWKT kullanarak GEOMETRY sütununa dönüştüren bir maddi görünüm oluşturun.

  1. Varlık tarayıcısında Artı simgesine tıklayın.Ekle'yi ve ardından Dönüştürme'yi seçin.

  2. Adwarehouse_geofences_gold olarak ayarlayın, SQL veya Python'ı seçin ve Oluştur'a tıklayın.

  3. Aşağıdaki kodu yapıştırın. Varsayılan kataloğunuz ve şemanızla <catalog> ve <schema> öğelerini değiştirin.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS
    SELECT
      warehouse_name,
      ST_GeomFromWKT(boundary_wkt) AS boundary_geom
    FROM read_files(
      "/Volumes/<catalog>/<schema>/raw_data/geofences",
      format => "json"
    )
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    path = "/Volumes/<catalog>/<schema>/raw_data/geofences"
    
    @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry")
    def warehouse_geofences_gold():
        return (
            spark.read.format("json").load(path).select(
                "warehouse_name",
                F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"),
            )
        )
    
  4. Yürüt simgesine tıklayın. Dosyayı çalıştırın veya işlem hattını çalıştırın.

İşlem hattı artık geofences tablosunu içerir. Ardından uzamsal birleştirmeyi depo varışlarını hesaplamaya ekleyin.

6. Adım: Uzamsal birleşimle ambar varışları tablosunu oluşturma

Bir cihazın bir ambar çokgeninin içinde ne zaman olduğunu belirlemek için ST_Contains(boundary_geom, point_geom) kullanarak gümüş GPS noktalarını coğrafi bölgelere birleştiren gerçekleştirilmiş bir görünüm ekleyin.

  1. Varlık tarayıcısında Artı simgesine tıklayın.Ekle'yi ve ardından Dönüştürme'yi seçin.

  2. Adı warehouse_arrivals olarak ayarla, SQL veya Python seç ve Oluştur'a tıkla.

  3. Aşağıdaki kodu yapıştırın.

    SQL

    CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS
    SELECT
      g.device_id,
      g.timestamp,
      w.warehouse_name
    FROM raw_gps_silver g
    JOIN warehouse_geofences_gold w
      ON ST_Contains(w.boundary_geom, g.point_geom)
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    
    @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence")
    def warehouse_arrivals():
        g = spark.read.table("raw_gps_silver")
        w = spark.read.table("warehouse_geofences_gold")
        return (
            g.alias("g")
            .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)"))
            .select(
                F.col("g.device_id").alias("device_id"),
                F.col("g.timestamp").alias("timestamp"),
                F.col("w.warehouse_name").alias("warehouse_name"),
            )
        )
    
  4. Yürüt simgesine tıklayın. Dosyayı çalıştırın veya işlem hattını çalıştırın.

Güncelleştirme tamamlandığında işlem hattı grafiği dört veri kümesinin tümünü gösterir: gps_bronze, raw_gps_silver, warehouse_geofences_goldve warehouse_arrivals.

Uzamsal birleştirmeyi doğrulama

Uzamsal birleştirme sonucu üretilen satırları onaylayın: bir coğrafi bölge içinde bulunan gümüş tablodaki noktalar, warehouse_arrivals içinde görünmelidir. Not defterinde veya SQL düzenleyicisinde aşağıdakilerden birini çalıştırın (işlem hattı hedefinizle aynı kataloğu ve şemayı kullanın).

Ambara göre gelenleri say (SQL):

SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;

Warehouse_A ve Warehouse_B için sıfır olmayan sayıları görmelisiniz (örnek GPS verileri her iki çokgenle de çakışıyor). Örnek satırları incelemek için:

SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;

Python'da (not defteri) aynı denetimler:

# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))

# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))

warehouse_arrivals içinde satırlar görürseniz, ST_Contains(boundary_geom, point_geom) birleştirme düzgün çalışıyor.

7. Adım: İşlem hattını zamanlama (isteğe bağlı)

Yeni GPS verileri birime eklendikçe işlem hattını güncel tutmak için işlem hattını bir zamanlamaya göre çalıştırmak için bir iş oluşturun.

  1. Düzenleyicinin üst kısmında Zamanla düğmesini seçin.
  2. Zamanlamalar iletişim kutusu görüntülenirse Zamanlama ekle'yi seçin.
  3. İsteğe bağlı olarak, işe bir ad verin.
  4. Varsayılan olarak, zamanlama günde bir kez çalışır. Bunu kabul edebilir veya kendiniz ayarlayabilirsiniz. Gelişmiş'i seçmek belirli bir zaman ayarlamanıza olanak tanır; Diğer seçenekler çalıştırma bildirimleri eklemenize olanak tanır.
  5. Zamanlamayı uygulamak için Oluştur'u seçin.

İş yürütmeleri hakkında daha fazla bilgi için bkz. Lakeflow İşleri için izleme ve gözlemleme.

Ek kaynaklar