Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Pomocí integrovaného konektoru se můžete přihlásit k odběru Google Pub/Sub. Tento konektor poskytuje sémantiku zpracování přesně jednou pro záznamy od odběratele.
Poznámka:
Pub/Sub může publikovat duplicitní záznamy nebo záznamy můžou přijít odběrateli mimo objednávku. Napište kód pro zpracování duplicitních a zastaralých záznamů.
Konfigurace streamu Pub/Sub
Následující příklad kódu ukazuje základní syntaxi pro konfiguraci strukturovaného čtení streamu z Pub/Sub.
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
Další možnosti konfigurace najdete v tématu Nastavení konfigurace pro Pub/Sub čtení streamu.
Konfigurace přístupu k pub/Sub
Přihlašovací údaje, které nakonfigurujete, musí mít následující role.
| Role | Požadované nebo volitelné | Jak se role používá |
|---|---|---|
roles/pubsub.viewer nebo roles/viewer |
Požaduje se | Zkontroluje, jestli existuje předplatné a získá předplatné. |
roles/pubsub.subscriber |
Požaduje se | Načte data z předplatného. |
roles/pubsub.editor nebo roles/editor |
Volitelné | Povolí vytvoření předplatného, pokud neexistuje, a umožní použití deleteSubscriptionOnStreamStop k odstranění předplatných při ukončení streamu. |
Databricks doporučuje používat tajné kódy při poskytování možností autorizace. K autorizaci připojení se vyžadují následující možnosti:
clientEmailclientIdprivateKeyprivateKeyId
Vysvětlení schématu Pub/Sub
Schéma datového proudu odpovídá záznamům načteným z pub/Sub, jak je popsáno v následující tabulce.
| Pole | Typ |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurace možností streamování čtení v Pub/Sub
Následující tabulka popisuje možnosti podporované pro Pub/Sub. Všechny možnosti jsou nakonfigurované jako součást čtení strukturovaného streamování pomocí syntaxe .option("<optionName>", "<optionValue>").
Poznámka:
Některé možnosti konfigurace Pub/Sub používají místo mikrodávek koncept načítání. Toto odráží podrobnosti interní implementace a možnosti fungují podobně jako analogy v jiných konektorech strukturovaného streamování, s tím rozdílem, že záznamy se nejprve načítají a poté zpracovávají.
| Možnost | Výchozí hodnota | Popis |
|---|---|---|
numFetchPartitions |
Nastavte na polovinu počtu výkonných procesů, které jsou přítomny při inicializaci datového proudu. | Počet paralelních úloh Spark, které získávají záznamy z odběru. |
deleteSubscriptionOnStreamStop |
false |
Pokud true, předplatné předané do datového proudu je odstraněno, jakmile úloha streamování skončí. |
maxBytesPerTrigger |
none |
Měkký limit velikosti dávky, která se má zpracovat během každé aktivované mikrodávkové dávky. |
maxRecordsPerFetch |
1000 |
Počet záznamů, které se mají načíst pro každou úlohu, než dojde ke zpracování záznamů. |
maxFetchPeriod |
10s |
Délka času potřebného na načtení každého úkolu před zpracováním záznamů. Přijímá řetězec doby trvání, 1s například 1 sekundu nebo 1m 1 minutu. Databricks doporučuje použít výchozí hodnotu. |
Použití přírůstkového dávkového zpracování s Pub/Sub
Můžete použít Trigger.AvailableNow k využívání dostupných záznamů ze zdrojů Pub/Sub jako přírůstkové dávky.
Azure Databricks zaznamenává časové razítko, když zahájíte čtení s nastavením Trigger.AvailableNow. Záznamy zpracovávané dávkou zahrnují všechna dříve načtená data a všechny nově publikované záznamy s časovým razítkem kratším než časové razítko spuštění zaznamenaného datového proudu. Další informace naleznete v tématu AvailableNow: Přírůstkové dávkové zpracování.
Monitorování streamovacích metrik Pub/Sub
Metriky průběhu strukturovaného streamování hlásí počet načtených a připravených záznamů ke zpracování, velikost načtených a připravených záznamů ke zpracování a počet duplicit zobrazených od spuštění datového proudu. Následuje příklad těchto metrik:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Omezení
Pub/Sub nepodporuje spekulativní spuštění (spark.speculation).