Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule wyjaśniono, jak używać potoków deklaratywnych lakeflow do przetwarzania komunikatów z usługi Azure Event Hubs. Nie można używać łącznika Structured Streaming Event Hubs, ponieważ ta biblioteka nie jest dostępna jako część Databricks Runtime, a Potoki Deklaratywne Lakeflow nie pozwalają na użycie bibliotek JVM innych firm.
Jak potoki deklaratywne Lakeflow mogą łączyć się z usługą Azure Event Hubs?
Usługa Azure Event Hubs zapewnia punkt końcowy zgodny z Apache Kafka, którego można używać z łącznikiem Kafka do Strukturalnego Przesyłania Strumieniowego , dostępnym w środowisku Databricks Runtime, do przetwarzania komunikatów z usługi Azure Event Hubs. Aby uzyskać więcej informacji na temat zgodności usług Azure Event Hubs i Apache Kafka, zobacz Use Azure Event Hubs from Apache Kafka applications(Korzystanie z usługi Azure Event Hubs z poziomu aplikacji platformy Apache Kafka).
W poniższych krokach opisano łączenie potoków deklaratywnych Lakeflow z istniejącym wystąpieniem usługi Event Hubs i korzystanie ze zdarzeń z tematu. Do wykonania tych kroków potrzebne są następujące wartości połączenia usługi Event Hubs:
- Nazwa przestrzeni nazw usługi Event Hubs.
- Nazwa wystąpienia centrum zdarzeń w przestrzeni nazw usługi Event Hubs.
- Nazwa zasad dostępu współdzielonego i klucz zasad dla usługi Event Hubs. Domyślnie dla każdej przestrzeni nazw usługi Event Hubs jest tworzona polityka
RootManageSharedAccessKey
. Ta polityka ma ustawienia dostępumanage
,send
ilisten
. Jeśli potok odczytuje tylko z usługi Event Hubs, usługa Databricks zaleca utworzenie nowej polityki z uprawnieniami do nasłuchiwania.
Aby uzyskać więcej informacji na temat parametrów połączenia usługi Event Hubs, zobacz Pobieranie parametrów połączenia usługi Event Hubs.
Notatka
- Usługa Azure Event Hubs udostępnia zarówno opcje protokołu OAuth 2.0, jak i sygnatury dostępu współdzielonego (SAS), aby autoryzować dostęp do bezpiecznych zasobów. Te instrukcje korzystają z uwierzytelniania opartego na SAS.
- Jeśli otrzymasz parametry połączenia usługi Event Hubs z witryny Azure Portal, może nie zawierać wartości
EntityPath
. WartośćEntityPath
jest wymagana tylko w przypadku korzystania z łącznika Event Hubs Structured Streaming. Użycie łącznika Kafka Structured Streaming wymaga podania tylko nazwy tematu.
Przechowaj klucz zasad w sekrecie usługi Azure Databricks
Ponieważ klucz polityki to poufne informacje, firma Databricks zaleca, aby nie zapisywać jego wartości bezpośrednio w kodzie potoku. Zamiast tego użyj wpisów tajnych usługi Azure Databricks, aby przechowywać klucz i zarządzać dostępem do niego.
W poniższym przykładzie użyto Databricks CLI do utworzenia zakresu tajemnicy i zapisania klucza w tym zakresie tajemnicy. W kodzie potoku użyj funkcji dbutils.secrets.get()
razem z scope-name
i shared-policy-name
, aby pobrać kluczową wartość.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Aby uzyskać więcej informacji na temat tajemnic w Azure Databricks, odwiedź zarządzanie tajemnicami.
Utwórz notes i dodaj kod potoku do obsługi zdarzeń
Poniższy przykład odczytuje zdarzenia IoT z tematu, ale możesz dostosować przykład wymagań aplikacji. Zalecana praktyka sugerowana przez Databricks to użycie ustawień potoków deklaratywnych Lakeflow w celu skonfigurowania zmiennych aplikacji. Kod potokowy następnie używa funkcji spark.conf.get()
do pobierania wartości. Aby uzyskać więcej informacji na temat używania ustawień potoku do sparametryzowania potoku, zobacz Używanie parametrów z potokami deklaratywnymi Lakeflow.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Utwórz potok
Utwórz nowy potok przy użyciu następujących ustawień, zastępując wartości symboli zastępczych odpowiednimi wartościami dla danego środowiska.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Zamień
-
<container-name>
z nazwą kontenera konta usługi Azure Storage. -
<storage-account-name>
z nazwą konta magazynu usługi ADLS (Azure Data Lake Storage). -
<eh-namespace>
z nazwą przestrzeni nazw usługi Event Hubs. -
<eh-policy-name>
z tajnym kluczem zakresu dla klucza polityki Event Hubs. -
<eventhub>
z nazwą wystąpienia usługi Event Hubs. -
<secret-scope-name>
z nazwą tajnego zakresu usługi Azure Databricks, który zawiera klucz polityki Event Hubs.
Zgodnie z najlepszymi praktykami, ten potok nie używa domyślnej ścieżki składowania systemu plików DBFS, lecz korzysta z konta składowania Azure Data Lake Storage (ADLS). Aby uzyskać więcej informacji na temat konfigurowania uwierzytelniania dla konta magazynu ADLS, zobacz Bezpieczny dostęp do poświadczeń magazynu z użyciem tajnych danych w potoku.