Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfada, Lakeflow Connect kullanarak Workday raporlarını alma ve Azure Databricks'e yükleme işlemleri açıklanır.
Başlamadan önce
Alma işlem hattı oluşturmak için aşağıdaki gereksinimleri karşılamanız gerekir:
Unity Kataloğu için çalışma alanınızın etkinleştirilmesi gerekir.
Sunucusuz işlem çalışma alanınız için etkinleştirilmelidir. Bkz . Sunucusuz işlem etkinleştirme.
Yeni bir bağlantı oluşturmayı planlıyorsanız: Meta veri deposunda ayrıcalıklarınız olmalıdır
CREATE CONNECTION
.Bağlayıcınız ui tabanlı işlem hattı yazmayı destekliyorsa, bu sayfadaki adımları tamamlayarak bağlantıyı ve işlem hattını aynı anda oluşturabilirsiniz. Ancak API tabanlı işlem hattı yazma özelliğini kullanıyorsanız, bu sayfadaki adımları tamamlamadan önce Bağlantıyı Katalog Gezgini'nde oluşturmanız gerekir. Bkz Yönetilen alım kaynaklarına bağlanın.
Mevcut bir bağlantıyı kullanmayı planlıyorsanız: Ayrıcalıklarınız olmalıdır
USE CONNECTION
veyaALL PRIVILEGES
bağlantı nesnesinde.Hedef katalog için
USE CATALOG
ayrıcalıklarına sahip olmalısınız.Mevcut bir şemada
USE SCHEMA
veCREATE TABLE
ayrıcalıklarına veya hedef katalogdaCREATE SCHEMA
ayrıcalıklarına sahip olmanız gerekir.
Workday'den veri almak için kaynak kurulumunu tamamlamanız gerekir.
Ağı yapılandırma
Eğer sunucusuz çıkış denetimi etkinse, rapor URL'lerinizin sunucu adlarını izin verilenler listesine ekleyin. Örneğin, rapor URL'si https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json
ana bilgisayar adına https://ww1.workday.com
sahiptir. Bkz. Sunucusuz çıkış denetimi için ağ ilkelerini yönetme.
1. Seçenek: Azure Databricks Kullanıcı Arabirimi
Azure Databricks çalışma alanının kenar çubuğunda Veri Alımı'na tıklayın.
Veri ekle sayfasında, Databricks bağlayıcıları'nın altında Workday Raporları'na tıklayın.
Alma sihirbazı açılır.
Sihirbazın Alma işlem hattı sayfasında, hat için benzersiz bir ad girin.
Olay günlüğü konumu için işlem hattı olay günlüğünü depolamak için bir katalog ve şema seçin.
Kaynak verilere erişmek için gereken kimlik bilgilerini depolayan Unity Kataloğu bağlantısını seçin.
Kaynakla var olan bir bağlantı yoksa, Bağlantı oluştur'a tıklayın ve kaynak kurulumdan aldığınız kimlik doğrulama ayrıntılarını girin. Meta veri deposunda
CREATE CONNECTION
ayrıcalıklarınız olmalıdır.İşlem hattı oluştur'a tıklayın ve devam edin.
Rapor sayfasında Raporekle'ye tıklayın ve rapor URL'sini girin. Almak istediğiniz her rapor için yineleyin ve ardından İleri'ye tıklayın.
Hedef sayfasında, veri yazmak için Unity Catalog kataloğunu ve şemasını seçin.
Mevcut bir şemayı kullanmak istemiyorsanız Şema oluştur'a tıklayın. Üst katalogda
USE CATALOG
veCREATE SCHEMA
yetkilerine sahip olmalısınız.İşlem hattını kaydet'e tıklayın ve devam edin.
(İsteğe bağlı) Ayarlar sayfasında Zamanlama oluştur'a tıklayın. Hedef tabloları yenileme sıklığını ayarlayın.
(İsteğe bağlı) İşlem hattı işleminin başarılı veya başarısız olması için e-posta bildirimlerini ayarlayın.
İşlem hattını kaydet ve çalıştır'a tıklayın.
Seçenek 2: Databricks Varlık Paketleri
Bu bölümde Databricks Varlık Paketleri kullanılarak bir veri işleme hattının nasıl dağıtılacağı açıklanmaktadır. Paketler, işlerin ve görevlerin YAML tanımlarını içerebilir, Databricks CLI kullanılarak yönetilir ve farklı hedef çalışma alanlarında (geliştirme, hazırlama ve üretim gibi) paylaşılabilir ve çalıştırılabilir. Daha fazla bilgi için bkz. Databricks Varlık Paketleri.
Hat tanımınızda belirli sütunları alınacak veya alınmayacak şekilde seçmek için aşağıdaki tablo yapılandırma özelliklerini kullanabilirsiniz.
-
include_columns
: İsteğe bağlı olarak, ithalat için dahil edilecek sütunların bir listesini belirtin. Bu seçeneği açıkça sütunları dahil etmek için kullanırsanız, boru hattı gelecekte kaynağa eklenen sütunları otomatik olarak hariç tutar. Gelecekteki sütunları alınabilmek için onları listeye eklemeniz gerekecek. -
exclude_columns
: İsteğe bağlı olarak, belirli sütunları veri alımından hariç tutmak için bir liste belirleyin. Bu seçeneği sütunları açıkça hariç tutmak için kullanırsanız, gelecekte kaynağa eklenen sütunları ardışık düzen otomatik olarak dahil eder. Gelecekteki sütunları alınabilmek için onları listeye eklemeniz gerekecek.
Ayrıca, filtrelenmiş raporları almanızı sağlayan rapor URL'sinde ()source_url
istemleri belirtebilirsiniz.
Workday'e bir Unity Kataloğu bağlantısı olduğunu onaylayın. Bağlantı oluşturma adımları için bkz. Yönetilen alım kaynaklarına bağlanma.
Databricks CLI kullanarak yeni bir paket oluşturun:
databricks bundle init
Pakete iki yeni kaynak dosyası ekleyin:
- İşlem hattı tanım dosyası (
resources/workday_pipeline.yml
). - Veri alımı sıklığını (
resources/workday_job.yml
) denetleen bir iş akışı dosyası.
Aşağıda örnek
resources/workday_pipeline.yml
bir dosya verilmiştir:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for workday_dab resources: pipelines: pipeline_workday: name: workday_pipeline catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <workday-connection> objects: # An array of objects to ingest from Workday. This example # ingests a sample report about all active employees. The Employee_ID key is used as # the primary key for the report. - report: source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} destination_table: All_Active_Employees_Data table_configuration: primary_keys: - Employee_ID include_columns: # This can be exclude_columns instead - <column_a> - <column_b> - <column_c>
Aşağıda örnek
resources/workday_job.yml
bir dosya verilmiştir:resources: jobs: workday_dab_job: name: workday_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_workday.id}
- İşlem hattı tanım dosyası (
Databricks CLI kullanarak işlem hattını dağıtın:
databricks bundle deploy
Seçenek 3: Azure Databricks defter
Hat tanımınızda belirli sütunları alınacak veya alınmayacak şekilde seçmek için aşağıdaki tablo yapılandırma özelliklerini kullanabilirsiniz.
-
include_columns
: İsteğe bağlı olarak, ithalat için dahil edilecek sütunların bir listesini belirtin. Bu seçeneği açıkça sütunları dahil etmek için kullanırsanız, boru hattı gelecekte kaynağa eklenen sütunları otomatik olarak hariç tutar. Gelecekteki sütunları alınabilmek için onları listeye eklemeniz gerekecek. -
exclude_columns
: İsteğe bağlı olarak, belirli sütunları veri alımından hariç tutmak için bir liste belirleyin. Bu seçeneği sütunları açıkça hariç tutmak için kullanırsanız, gelecekte kaynağa eklenen sütunları ardışık düzen otomatik olarak dahil eder. Gelecekteki sütunları alınabilmek için onları listeye eklemeniz gerekecek.
Ayrıca, filtrelenmiş raporları almanızı sağlayan rapor URL'sinde ()source_url
istemleri belirtebilirsiniz.
Workday'e bir Unity Kataloğu bağlantısı olduğunu onaylayın. Bağlantı oluşturma adımları için bkz. Yönetilen alım kaynaklarına bağlanma.
Kişisel erişim belirteci oluşturun.
Değeri değiştirerek aşağıdaki kodu bir Python not defteri hücresine yapıştırın
<personal-access-token>
:# SHOULD MODIFY # This step sets up a PAT to make API calls to the Databricks service. api_token = "<personal-access-token>"
Aşağıdaki kodu ikinci bir not defteri hücresine yapıştırın:
# DO NOT MODIFY # This step sets up a connection to make API calls to the Databricks service. import requests import json notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() workspace_url = notebook_context.apiUrl().get() api_url = f"{workspace_url}/api/2.0/pipelines" headers = { 'Authorization': 'Bearer {}'.format(api_token), 'Content-Type': 'application/json' } def check_response(response): if response.status_code == 200: print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False))) else: print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}") # DO NOT MODIFY # These are API definition to be used. def create_pipeline(pipeline_definition: str): response = requests.post(url=api_url, headers=headers, data=pipeline_definition) check_response(response) def edit_pipeline(id: str, pipeline_definition: str): response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition) check_response(response) def delete_pipeline(id: str): response = requests.delete(url=f"{api_url}/{id}", headers=headers) check_response(response) def get_pipeline(id: str): response = requests.get(url=f"{api_url}/{id}", headers=headers) check_response(response) def list_pipeline(filter: str = ""): body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}""" response = requests.get(url=api_url, headers=headers, data=body) check_response(response)
İşlem hattı belirtimlerinizi yansıtacak şekilde değiştirerek aşağıdaki kodu üçüncü bir not defteri hücresine yapıştırın:
# SHOULD MODIFY # Update this notebook to configure your ingestion pipeline. pipeline_spec = """ { "name": "<YOUR_PIPELINE_NAME>", "ingestion_definition": { "connection_name": "<YOUR_CONNECTON_NAME>", "objects": [ { "report": { "source_url": "<YOUR_REPORT_URL>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_TABLE>", "table_configuration": { "primary_keys": ["<PRIMARY_KEY>"] } } }, { "report": { "source_url": "<YOUR_SECOND_REPORT_URL>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>", "table_configuration": { "primary_keys": ["<PRIMARY_KEY>"], "scd_type": "SCD_TYPE_2", "include_columns": ["<column_a>", "<column_b>", "<column_c>"] } } } ] } } """ create_pipeline(pipeline_spec)
Kişisel erişim belirtecinizle ilk defter hücresini çalıştırın.
İkinci not defteri hücresini çalıştırın.
İşlem hattı ayrıntılarınızla üçüncü defter hücresini çalıştırın. Bu çalışır
create_pipeline
.-
list_pipeline
işlem hattı kimliğini ve onunla ilgili ayrıntıları döndürür. -
edit_pipeline
işlem hattı tanımını düzenlemenize olanak tanır. -
delete_pipeline
işlem hattını siler.
-
Seçenek 4: Databricks CLI
Hat tanımınızda belirli sütunları alınacak veya alınmayacak şekilde seçmek için aşağıdaki tablo yapılandırma özelliklerini kullanabilirsiniz.
-
include_columns
: İsteğe bağlı olarak, ithalat için dahil edilecek sütunların bir listesini belirtin. Bu seçeneği açıkça sütunları dahil etmek için kullanırsanız, boru hattı gelecekte kaynağa eklenen sütunları otomatik olarak hariç tutar. Gelecekteki sütunları alınabilmek için onları listeye eklemeniz gerekecek. -
exclude_columns
: İsteğe bağlı olarak, belirli sütunları veri alımından hariç tutmak için bir liste belirleyin. Bu seçeneği sütunları açıkça hariç tutmak için kullanırsanız, gelecekte kaynağa eklenen sütunları ardışık düzen otomatik olarak dahil eder. Gelecekteki sütunları alınabilmek için onları listeye eklemeniz gerekecek.
Ayrıca, filtrelenmiş raporları almanızı sağlayan rapor URL'sinde ()source_url
istemleri belirtebilirsiniz.
- Workday'e bir Unity Kataloğu bağlantısı olduğunu onaylayın. Bağlantı oluşturma adımları için bkz. Yönetilen alım kaynaklarına bağlanma.
- İşlem hattını oluşturmak için aşağıdaki komutu çalıştırın:
databricks pipelines create --json "<pipeline-definition OR json-file-path>"
İşlem hattı tanımı şablonu
Aşağıda bir JSON işlem hattı tanımı şablonu yer alır:
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"primary_keys": ["<primary-key>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
İşlem hattınızda uyarıları başlatın, programlayın ve ayarlayın.
İşlem hattı ayrıntıları sayfasında işlem hattı için bir zamanlama oluşturabilirsiniz.
İşlem hattı oluşturulduktan sonra Azure Databricks çalışma alanını yeniden ziyaret edin ve İşlem Hatları'na tıklayın.
Yeni boru hattı işlem hattı listesinde görünür.
İşlem hattı ayrıntılarını görüntülemek için işlem hattı adına tıklayın.
İşlem hattı ayrıntıları sayfasında Zamanla'ya tıklayarak işlem hattını zamanlayabilirsiniz.
İşlem hattında bildirim ayarlamak için ayarlar 'e tıklayın ve ardından bir bildirim ekleyin.
bir işlem hattına eklediğiniz her zamanlama için Lakeflow Connect otomatik olarak bunun için bir iş oluşturur. Veri alma hattı, işin içindeki bir görevdir. İsteğe bağlı olarak işe daha fazla görev ekleyebilirsiniz.
Örnek: İki Workday raporunu ayrı şemalara alma
Bu bölümdeki örnek işlem hattı tanımı, iki Workday raporunu ayrı şemalara alır. Çoklu hedefli boru hattı desteği yalnızca API içindir.
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>