Aracılığıyla paylaş


Öğretici: Spark SQL ile COPY INTO

Databricks, binlerce dosya içeren veri kaynakları için artımlı ve toplu veri yükleme için COPY INTO komutunu kullanmanızı önerir. Databricks, gelişmiş kullanım örnekleri için Otomatik Yükleyici'yi kullanmanızı önerir.

Bu öğreticide COPY INTO , bulut nesne depolamasından Azure Databricks çalışma alanınızdaki bir tabloya veri yüklemek için komutunu kullanacaksınız.

Gereksinimler

  1. Bir Azure aboneliği, bu abonelikteki bir Azure Databricks çalışma alanı ve bu çalışma alanında bir küme. Bunları oluşturmak için bkz . Hızlı Başlangıç: Azure portalını kullanarak Azure Databricks Çalışma Alanında Spark işi çalıştırma. Bu hızlı başlangıcı izlerseniz Spark SQL işi çalıştırma bölümündeki yönergeleri izlemeniz gerekmez.
  2. Çalışma alanınızda Databricks Runtime 11.3 LTS veya üzerini çalıştıran çok amaçlı bir küme . Çok amaçlı bir küme oluşturmak için bkz . İşlem yapılandırma başvurusu.
  3. Azure Databricks çalışma alanı kullanıcı arabirimi hakkında bilgi. Bkz . Çalışma alanında gezinme.
  4. Databricks not defterleriyle çalışma hakkında bilgi.
  5. Veri yazabileceğiniz bir konum; Bu tanıtımda örnek olarak DBFS kökü kullanılır, ancak Databricks Unity Kataloğu ile yapılandırılmış bir dış depolama konumu önerir.

1. Adım. Ortamınızı yapılandırma ve veri oluşturucu oluşturma

Bu öğreticide, Azure Databricks hakkında temel bilgiler ve varsayılan çalışma alanı yapılandırması varsayılır. Sağlanan kodu çalıştıramıyorsanız, işlem kaynaklarına ve veri yazabileceğiniz bir konuma erişiminiz olduğundan emin olmak için çalışma alanı yöneticinize başvurun.

Sağlanan kodun, veri kaynağınız olarak COPY INTO yapılandıracağınız konumu belirtmek için bir source parametre kullandığını unutmayın. Bu kod, yazıldıkçe DBFS kökündeki bir konumu gösterir. Dış nesne depolama konumu üzerinde yazma izinleriniz varsa, kaynak dizenin dbfs:/ bölümünü nesne depolamanızın yoluyla değiştirin. Bu kod bloğu da bu tanıtımı sıfırlamak için özyinelemeli silme işlemi gerçekleştirdiğinden, üretim verilerine işaret etmediğinizden ve mevcut verilerin üzerine yazılmasını veya silinmesini önlemek için iç içe geçmiş dizini tuttuğunuzdan /user/{username}/copy-into-demo emin olun.

  1. Yeni bir SQL not defteri oluşturun ve Databricks Runtime 11.3 LTS veya üzerini çalıştıran bir kümeye ekleyin.

  2. Bu öğreticide kullanılan depolama konumunu ve veritabanını sıfırlamak için aşağıdaki kodu kopyalayıp çalıştırın:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Verileri rastgele oluşturmak için kullanılacak bazı tabloları ve işlevleri yapılandırmak için aşağıdaki kodu kopyalayın ve çalıştırın:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

2. Adım: Örnek verileri bulut depolama alanına yazma

Azure Databricks'te Delta Lake dışındaki veri biçimlerine yazma nadirdir. Burada sağlanan kod JSON'a yazar ve başka bir sistemden alınan sonuçları nesne depolama alanına atabilecek bir dış sistemin benzetimini oluşturur.

  1. Bir toplu ham JSON verisi yazmak için aşağıdaki kodu kopyalayın ve çalıştırın:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

3. Adım: JSON verilerini aynı anda yüklemek için COPY INTO kullanma

kullanabilmeniz COPY INTOiçin önce hedef Delta Lake tablosu oluşturmanız gerekir. Databricks Runtime 11.3 LTS ve üzerinde, deyiminizde CREATE TABLE tablo adı dışında bir şey sağlamanız gerekmez. Databricks Runtime'ın önceki sürümleri için boş bir tablo oluştururken bir şema sağlamanız gerekir.

  1. Hedef Delta tablonuzu oluşturmak ve kaynağınızdan veri yüklemek için aşağıdaki kodu kopyalayıp çalıştırın:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Bu eylem bir kez etkili olduğundan, bunu birden çok kez çalıştırabilirsiniz, ancak veriler yalnızca bir kez yüklenir.

4. Adım: Tablonuzun içeriğini önizleme

Bu tablonun içeriğini el ile gözden geçirmek için basit bir SQL sorgusu çalıştırabilirsiniz.

  1. Tablonuzun önizlemesini görüntülemek için aşağıdaki kodu kopyalayın ve yürütin:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

5. Adım: Daha fazla veri yükleme ve sonuçları önizleme

Rastgele ham JSON verilerinin yeni toplu işlemlerini kaynağınıza getirmek, ile COPY INTOdelta Lake'e yüklemek ve sonuçları önizlemek için 2-4 arası adımları birçok kez yeniden çalıştırabilirsiniz. Yeni veriler gelmeden birden çok kez yazılan veya yürütülen ham verilerin birden çok toplu işleminin benzetimini yapmak için bu adımları sıra dışı veya birden çok kez çalıştırmayı COPY INTO deneyin.

6. Adım: Öğreticiyi temizleme

Bu öğreticiyi tamamladığınızda, ilişkili kaynakları artık saklamak istemiyorsanız temizleyebilirsiniz.

  1. Veritabanını, tabloları bırakmak ve tüm verileri kaldırmak için aşağıdaki kodu kopyalayıp çalıştırın:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. İşlem kaynağınızı durdurmak için Kümeler sekmesine gidin ve Kümenizi sonlandırın.

Ek kaynaklar