Přihlášení k odběru Google Pub/Sub

Pomocí integrovaného konektoru se můžete přihlásit k odběru Google Pub/Sub. Tento konektor poskytuje sémantiku zpracování přesně jednou pro záznamy od odběratele.

Poznámka:

Pub/Sub může publikovat duplicitní záznamy nebo záznamy můžou přijít odběrateli mimo objednávku. Napište kód pro zpracování duplicitních a zastaralých záznamů.

Konfigurace streamu Pub/Sub

Následující příklad kódu ukazuje základní syntaxi pro konfiguraci strukturovaného čtení streamu z Pub/Sub.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

Další možnosti konfigurace najdete v tématu Nastavení konfigurace pro Pub/Sub čtení streamu.

Konfigurace přístupu k pub/Sub

Přihlašovací údaje, které nakonfigurujete, musí mít následující role.

Role Požadované nebo volitelné Jak se role používá
roles/pubsub.viewer nebo roles/viewer Požaduje se Zkontroluje, jestli existuje předplatné a získá předplatné.
roles/pubsub.subscriber Požaduje se Načte data z předplatného.
roles/pubsub.editor nebo roles/editor Volitelné Povolí vytvoření předplatného, pokud neexistuje, a umožní použití deleteSubscriptionOnStreamStop k odstranění předplatných při ukončení streamu.

Databricks doporučuje používat tajné kódy při poskytování možností autorizace. K autorizaci připojení se vyžadují následující možnosti:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Vysvětlení schématu Pub/Sub

Schéma datového proudu odpovídá záznamům načteným z pub/Sub, jak je popsáno v následující tabulce.

Pole Typ
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Konfigurace možností streamování čtení v Pub/Sub

Následující tabulka popisuje možnosti podporované pro Pub/Sub. Všechny možnosti jsou nakonfigurované jako součást čtení strukturovaného streamování pomocí syntaxe .option("<optionName>", "<optionValue>").

Poznámka:

Některé možnosti konfigurace Pub/Sub používají místo mikrodávek koncept načítání. Toto odráží podrobnosti interní implementace a možnosti fungují podobně jako analogy v jiných konektorech strukturovaného streamování, s tím rozdílem, že záznamy se nejprve načítají a poté zpracovávají.

Možnost Výchozí hodnota Popis
numFetchPartitions Nastavte na polovinu počtu výkonných procesů, které jsou přítomny při inicializaci datového proudu. Počet paralelních úloh Spark, které získávají záznamy z odběru.
deleteSubscriptionOnStreamStop false Pokud true, předplatné předané do datového proudu je odstraněno, jakmile úloha streamování skončí.
maxBytesPerTrigger none Měkký limit velikosti dávky, která se má zpracovat během každé aktivované mikrodávkové dávky.
maxRecordsPerFetch 1000 Počet záznamů, které se mají načíst pro každou úlohu, než dojde ke zpracování záznamů.
maxFetchPeriod 10s Délka času potřebného na načtení každého úkolu před zpracováním záznamů. Přijímá řetězec doby trvání, 1s například 1 sekundu nebo 1m 1 minutu. Databricks doporučuje použít výchozí hodnotu.

Použití přírůstkového dávkového zpracování s Pub/Sub

Můžete použít Trigger.AvailableNow k využívání dostupných záznamů ze zdrojů Pub/Sub jako přírůstkové dávky.

Azure Databricks zaznamenává časové razítko, když zahájíte čtení s nastavením Trigger.AvailableNow. Záznamy zpracovávané dávkou zahrnují všechna dříve načtená data a všechny nově publikované záznamy s časovým razítkem kratším než časové razítko spuštění zaznamenaného datového proudu. Další informace naleznete v tématu AvailableNow: Přírůstkové dávkové zpracování.

Monitorování streamovacích metrik Pub/Sub

Metriky průběhu strukturovaného streamování hlásí počet načtených a připravených záznamů ke zpracování, velikost načtených a připravených záznamů ke zpracování a počet duplicit zobrazených od spuštění datového proudu. Následuje příklad těchto metrik:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Omezení

Pub/Sub nepodporuje spekulativní spuštění (spark.speculation).