Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide, Lakeflow Spark Bildirimli İşlem Hatları ve Otomatik Yükleyici kullanarak veri düzenleme için ETL (ayıklama, dönüştürme ve yükleme) işlem hattı oluşturma ve dağıtma işlemleri açıklanmaktadır. ETL işlem hattı, kaynak sistemlerden verileri okuma, veri kalitesi denetimleri ve yinelenenleri kaldırma gibi gereksinimlere göre bu verileri dönüştürme ve verileri veri ambarı veya veri gölü gibi bir hedef sisteme yazma adımlarını uygular.
Bu öğreticide işlem hatlarını ve Otomatik Yükleyici'yi kullanarak şunları yapacaksınız:
- Ham kaynak verilerini hedef tabloya alma.
- Ham kaynak verileri dönüştürün ve dönüştürülen verileri iki hedef materialize edilmiş görünüme yazın.
- Dönüştürülen verileri sorgulama.
- Databricks işiyle ETL işlem hattını otomatikleştirin.
İşlem hatları ve Otomatik Yükleyici hakkında daha fazla bilgi için bkz. Lakeflow Spark Bildirimli İşlem Hatları ve Otomatik Yükleyici nedir?
Gereksinimler
Bu öğreticiyi tamamlamak için aşağıdaki gereksinimleri karşılamanız gerekir:
- Azure Databricks çalışma alanında oturum açabilirsiniz.
- Çalışma alanınız için Unity Kataloğu'nu etkinleştirin.
- Hesabınız için sunucusuz işlemin etkinleştirilmesini sağlayın. Sunucusuz Lakeflow Spark Bildirimli İşlem Hatları tüm çalışma alanı bölgelerinde kullanılamaz. Bkz. Kullanılabilir bölgeler için sınırlı bölgesel kullanılabilirliğe sahip özellikler .
- İşlem kaynağı oluşturma veya işlem kaynağınaerişim iznine sahip olun.
-
Katalogda yeni şema oluşturma izinlerine sahip olun. Gerekli izinler
USE CATALOGveCREATE SCHEMA'dir. -
Mevcut şemada yeni birim oluşturma izinlerine sahip olun. Gerekli izinler
USE SCHEMAveCREATE VOLUME'dir.
Veri kümesi hakkında
Bu örnekte kullanılan veri kümesi, çağdaş müzik parçaları için bir özellik ve meta veri koleksiyonu olan Milyon Şarkı Veri Kümesinin bir alt kümesidir. Bu veri kümesi, Azure Databricks çalışma alanınıza dahil edilen örnek veri kümelerinde kullanılabilir.
1. Adım: İşlem hattı oluşturma
İlk olarak, işlem hattı söz dizimini kullanarak dosyalardaki veri kümelerini ( kaynak kodu olarak adlandırılır) tanımlayarak bir işlem hattı oluşturun. Her kaynak kod dosyası yalnızca bir dil içerebilir, ancak işlem hattına dile özgü birden çok dosya ekleyebilirsiniz. Daha fazla bilgi edinmek için bkz. Lakeflow Spark Bildirimli İşlem Hatları
Bu öğreticide sunucusuz işlem ve Unity Kataloğu kullanılır. Belirtilmeyen tüm yapılandırma seçenekleri için varsayılan ayarları kullanın. Sunucusuz işlem çalışma alanınızda etkinleştirilmediyse veya desteklenmiyorsa, öğreticiyi varsayılan işlem ayarlarını kullanarak yazıldı olarak tamamlayabilirsiniz.
Yeni işlem hattı oluşturmak için şu adımları izleyin:
- Çalışma alanınızda
Kenar çubuğunda yeni, ardından ETL İşlem Hattı'nı seçin.
- İşlem hattınıza benzersiz bir ad verin.
- Adın hemen altında, oluşturduğunuz veriler için varsayılan kataloğu ve şemayı seçin. Dönüştürmelerinizdeki diğer hedefleri belirtebilirsiniz, ancak bu öğreticide bu varsayılanlar kullanılır. Oluşturduğunuz katalog ve şema izinlerine sahip olmanız gerekir. Bkz . Gereksinimler.
- Bu öğretici için Boş bir dosyayla başla'yı seçin.
- Klasör yolu'nda, kaynak dosyalarınız için bir konum belirtin veya varsayılanı (kullanıcı klasörünüz) kabul edin.
- İlk kaynak dosyanızın dili olarak Python veya SQL'i seçin (işlem hattı dilleri karıştırabilir ve eşleştirebilir, ancak her dosyanın tek bir dilde olması gerekir).
- Seç'e tıklayın.
yeni işlem hattı için işlem hattı düzenleyicisi görüntülenir. Diliniz için boş bir kaynak dosyası oluşturulur ve ilk dönüştürmeniz için hazır olur.
2. Adım: İşlem hattı mantığınızı geliştirme
Bu adımda, işlem hattı için kaynak kodunu etkileşimli olarak geliştirmek ve doğrulamak için Lakeflow Pipelines Düzenleyicisi'ni kullanacaksınız.
Kod, artımlı veri alımı için Otomatik Yükleyici'yi kullanır. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler. Daha fazla bilgi edinmek için bkz. Otomatik Yükleyici nedir?
İşlem hattı için otomatik olarak boş bir kaynak kod dosyası oluşturulur ve yapılandırılır. Dosya, işlem hattınızın dönüşümler klasöründe oluşturulur. Varsayılan olarak, dönüştürmeler klasöründeki tüm *.py ve *.sql dosyaları işlem hattınızın kaynağının bir parçasıdır.
Aşağıdaki kodu kopyalayıp kaynak dosyanıza yapıştırın. 1. Adımda dosya için seçtiğiniz dili kullandığınızdan emin olun.
Piton
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;Bu kaynak üç sorgu için kod içerir. Ayrıca bu sorguları ayrı dosyalara koyarak dosyaları istediğiniz gibi düzenleyebilir ve kodlayabilirsiniz.
Bağlı işlem hattı için bir güncelleştirme başlatmak için dosyayı çalıştırın veya işlem hattını çalıştırın. İşlem hattınızda yalnızca bir kaynak dosyası varsa bunlar işlevsel olarak eşdeğerdir.
Güncelleştirme tamamlandığında, editör işlem hattınızla ilgili bilgilerle güncellenir.
- İşlem hattı grafiği (DAG), kodunuzun sağ tarafındaki kenar çubuğunda üç tablo gösterir:
songs_raw,songs_preparedvetop_artists_by_year. - İşlem hattı varlıkları tarayıcısının üst kısmında güncelleştirmenin özeti gösterilir.
- Oluşturulan tabloların ayrıntıları alt bölmede gösterilir ve tablolardan birini seçerek verilere göz atabilirsiniz.
Buna ham ve temizlenmiş verilerin yanı sıra yıla göre en iyi sanatçıları bulmak için bazı basit analizler dahildir. Sonraki adımda, işlem hattınızdaki ayrı bir dosyada daha fazla analiz için geçici sorgular oluşturacaksınız.
3. Adım: İşlem hattınız tarafından oluşturulan veri kümelerini keşfetme
Bu adımda, Databricks SQL Düzenleyicisi'nde şarkı verilerini analiz etmek için ETL işlem hattında işlenen veriler üzerinde geçici sorgular gerçekleştirirsiniz. Bu sorgular, önceki adımda oluşturulan hazırlanmış kayıtları kullanır.
İlk olarak, 1990'dan bu yana her yıl en çok şarkı yayınlayan sanatçıları bulan bir sorgu çalıştırın.
İşlem hattı varlıkları tarayıcı kenar çubuğunda
Ardından Keşif'i ekleyin.
Bir Ad girin ve araştırma dosyası için SQL'i seçin. Yeni
explorationsbir klasörde bir SQL not defteri oluşturulur. Klasördekiexplorationsdosyalar varsayılan olarak bir işlem hattı güncelleştirmesinin parçası olarak çalıştırılmaz. SQL not defteri, birlikte veya ayrı ayrı çalıştırabileceğiniz hücrelere sahiptir.1990 yılından sonra her yıl en çok şarkı yayınlayan sanatçılardan oluşan bir tablo oluşturmak için, yeni SQL dosyasına aşağıdaki kodu girin (dosyada örnek kod varsa, değiştirin). Bu not defteri işlem hattının bir parçası olmadığından, varsayılan kataloğu ve şemayı kullanmaz. öğesini
<catalog>.<schema>, işlem hattı için varsayılan olarak kullandığınız katalog ve şemayla değiştirin:-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;tıklayın veya bu sorguyu çalıştırmak için basın
Shift + Enter.
Şimdi 4/4 tempolu ve dans edilebilir tempolu şarkıları bulan başka bir sorgu çalıştırın.
Aşağıdaki kodu aynı dosyadaki bir sonraki hücreye ekleyin. Tekrar,
<catalog>.<schema>öğesini, işlem hattı için varsayılan olarak kullandığınız katalog ve şemayla değiştirin.-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;tıklayın veya bu sorguyu çalıştırmak için basın
Shift + Enter.
4. Adım: İşlem hattını çalıştırmak için iş oluşturma
Ardından, bir zamanlamaya göre çalışan bir Databricks işi kullanarak veri alımı, işleme ve analiz adımlarını otomatikleştirmek için bir iş akışı oluşturun.
- Düzenleyicinin üst kısmında Zamanla düğmesini seçin.
- Zamanlamalar iletişim kutusu görüntülenirse Zamanlama ekle'yi seçin.
- Bu, Yeni zamanlama iletişim kutusunu açar, burada işlem hattınızı belirli bir zaman çizelgesine göre çalıştırmak için bir iş oluşturabilirsiniz.
- İsteğe bağlı olarak, işe bir ad verin.
- Varsayılan olarak, zamanlama günde bir kez çalışacak şekilde ayarlanır. Varsayılanı kabul edebilir veya kendi programınızı ayarlayabilirsiniz. Gelişmiş'i seçtiğinizde işin çalıştırılacağı belirli bir zamanı ayarlayabilirsiniz. Diğer seçenekler'i seçtiğinizde iş çalıştırıldığında bildirim oluşturabilirsiniz.
- Değişiklikleri uygulamak ve işi oluşturmak için Oluştur'u seçin.
Artık iş, işlem hattınızı güncel tutmak için günlük olarak çalışır. Zamanlama listesini görüntülemek için Yeniden Zamanla'yı seçebilirsiniz. İşlem hattınız için takvimleri ekleme, düzenleme veya kaldırma gibi zamanlamaları bu iletişim kutusundan yönetebilirsiniz.
Zamanlamanın (veya işin) adına tıklanması, sizi İşler ve işlem hatları listesindeki işin sayfasına götürür. Buradan çalıştırmaların geçmişi de dahil olmak üzere iş çalıştırmalarıyla ilgili ayrıntıları görüntüleyebilir veya şimdi çalıştır düğmesiyle işi hemen çalıştırabilirsiniz.
İş yürütmeleri hakkında daha fazla bilgi için bkz. Lakeflow İşleri için izleme ve gözlemleme.
Daha fazla bilgi edinin
- Veri işleme işlem hatları hakkında daha fazla bilgi edinmek için bkz. Lakeflow Spark Bildirimli İşlem Hatları
- Databricks Not Defterleri hakkında daha fazla bilgi edinmek için bkz. Databricks not defterleri.
- Lakeflow İşleri hakkında daha fazla bilgi edinmek için bkz. İşler nelerdir?
- Delta Lake hakkında daha fazla bilgi edinmek için bkz. Azure Databricks'te Delta Lake nedir?