Prenumerera på Google Pub/Sub
Azure Databricks tillhandahåller en inbyggd anslutningsapp för att prenumerera på Google Pub/Sub i Databricks Runtime 13.3 LTS och senare. Den här anslutningsappen tillhandahåller exakt en gång bearbetningssemantik för poster från prenumeranten.
Kommentar
Pub/sub kan publicera dubblettposter och poster kan komma till prenumeranten i ur ordning. Du bör skriva Azure Databricks-kod för att hantera dubbletter och out-of-order-poster.
Syntaxexempel
I följande kodexempel visas den grundläggande syntaxen för att konfigurera en strukturerad direktuppspelning från Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Fler konfigurationsalternativ finns i Konfigurera alternativ för pub-/underströmningsläsning.
Konfigurera åtkomst till Pub/Sub
Databricks rekommenderar att du använder hemligheter när du tillhandahåller auktoriseringsalternativ. Följande alternativ krävs för att auktorisera en anslutning:
clientEmail
clientId
privateKey
privateKeyId
I följande tabell beskrivs de roller som krävs för de konfigurerade autentiseringsuppgifterna:
Roller | Obligatorisk eller valfri | Hur den används |
---|---|---|
roles/pubsub.viewer eller roles/viewer |
Obligatoriskt | Kontrollera om prenumerationen finns och hämta en prenumeration |
roles/pubsub.subscriber |
Obligatoriskt | Hämta data från en prenumeration |
roles/pubsub.editor eller roles/editor |
Valfritt | Gör det möjligt att skapa en prenumeration om det inte finns en prenumeration och gör det också möjligt att ta deleteSubscriptionOnStreamStop bort prenumerationer vid stream-avslutning |
Pub-/underschema
Schemat för strömmen matchar de poster som hämtas från Pub/Sub enligt beskrivningen i följande tabell:
Fält | Typ |
---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Konfigurera alternativ för pub-/underströmningsläsning
I följande tabell beskrivs de alternativ som stöds för Pub/Sub. Alla alternativ konfigureras som en del av en strukturerad strömningsläsning med hjälp av .option("<optionName>", "<optionValue>")
syntax.
Kommentar
Vissa alternativ för pub-/underkonfiguration använder begreppet hämtningar i stället för mikrobatch. Detta återspeglar intern implementeringsinformation och alternativen fungerar på samma sätt som registreringar i andra anslutningsappar för strukturerad direktuppspelning, förutom att poster hämtas och sedan bearbetas.
Alternativ | Standardvärde | beskrivning |
---|---|---|
numFetchPartitions |
Ange till hälften av antalet utförare som finns vid initieringen av dataströmmen. | Antalet parallella Spark-uppgifter som hämtar poster från en prenumeration. |
deleteSubscriptionOnStreamStop |
false |
Om true tas prenumerationen som skickas till dataströmmen bort när strömningsjobbet slutar. |
maxBytesPerTrigger |
inget | En mjuk gräns för batchstorleken som ska bearbetas under varje utlöst mikrobatch. |
maxRecordsPerFetch |
1000 | Antalet poster som ska hämtas per aktivitet innan poster bearbetas. |
maxFetchPeriod |
10 sekund | Tidsåtgången för varje aktivitet som ska hämtas innan poster bearbetas. Databricks rekommenderar att du använder standardvärdet. |
Inkrementell batchbearbetningssemantik för Pub/Sub
Du kan använda Trigger.AvailableNow
för att använda tillgängliga poster från pub-/underkällorna en inkrementell batch.
Azure Databricks registrerar tidsstämpeln när du börjar läsa med inställningen Trigger.AvailableNow
. Poster som bearbetas av batchen innehåller alla tidigare hämtade data och eventuella nyligen publicerade poster med en tidsstämpel som är mindre än den inspelade starttidsstämpeln för dataströmmen.
Se Konfigurera inkrementell batchbearbetning.
Övervaka strömningsmått
Förloppsmått för strukturerad direktuppspelning rapporterar antalet poster som hämtats och är redo att bearbetas, storleken på de poster som hämtats och är redo att bearbetas samt antalet dubbletter som setts sedan strömmen startade. Följande är ett exempel på dessa mått:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Begränsningar
Spekulativ körning (spark.speculation
) stöds inte med Pub/Sub.