read_kinesis
funzione di streaming con valori di tabella
Si applica a: Databricks SQL
Databricks Runtime 13.3 LTS e versioni successive
Restituisce una tabella con record letti da Kinesis da uno o più flussi.
Sintassi
read_kinesis ( { parameter => value } [, ...] )
Argomenti
read_kinesis
richiede l'invocazione di parametri nominati.
L'unico argomento obbligatorio è streamName
. Tutti gli altri argomenti sono facoltativi.
Le descrizioni degli argomenti sono brevi qui. Per ulteriori dettagli, consulta la documentazione di Amazon Kinesis.
Sono disponibili varie opzioni di connessione per connettersi ed eseguire l'autenticazione con AWS.
awsAccessKey
e awsSecretKey
possono essere specificati negli argomenti della funzione usando la funzione privata , impostata manualmente negli argomenti o configurata come variabili di ambiente come indicato di seguito.
roleArn
, roleExternalID
, roleSessionName
può essere usato anche per eseguire l'autenticazione con AWS usando i profili di istanza.
Se nessuno di questi elementi viene specificato, userà la catena di provider AWS predefinita.
Parametro | Tipo | Descrizione |
---|---|---|
streamName |
STRING |
Elenco obbligatorio di uno o più flussi Kinesis, separati da virgole. |
awsAccessKey |
STRING |
Chiave di accesso AWS, se presente. È anche possibile specificare tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_ACCESS_KEY_ID ) e un file di profili credenziali. |
awsSecretKey |
STRING |
Chiave privata che corrisponde alla chiave di accesso. Può essere specificato negli argomenti o tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_SECRET_KEY o AWS_SECRET_ACCESS_KEY ) e un file di profili di credenziali. |
roleArn |
STRING |
Nome della risorsa Amazon del ruolo da assumere quando si accede a Kinesis. |
roleExternalId |
STRING |
Usato per delegare l'accesso all'account AWS. |
roleSessionName |
STRING |
Nome della sessione del ruolo AWS. |
stsEndpoint |
STRING |
Endpoint per la richiesta di credenziali di accesso temporanee. |
region |
STRING |
Area per i flussi da specificare. Il valore predefinito è l'area risolta localmente. |
endpoint |
STRING |
endpoint regionale per i flussi di dati Kinesis. Il valore predefinito è l'area risolta localmente. |
initialPosition |
STRING |
Posizione iniziale per la lettura nel flusso. Uno dei seguenti: 'latest' (impostazione predefinita), 'trim_horizon', 'earliest', 'at_timestamp'. |
consumerMode |
STRING |
Uno dei seguenti: "polling" (impostazione predefinita) o "EFO" (enhanced-fan-out). |
consumerName |
STRING |
Nome del consumatore. Tutti i consumatori sono prefissati con "databricks_". Il valore predefinito è una stringa vuota. |
registerConsumerTimeoutInterval |
STRING |
il timeout massimo per attendere che il consumer EFO venga registrato con il flusso Kinesis prima di generare un errore. Il valore predefinito è '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true per annullare la registrazione del consumer EFO alla terminazione della query. Il valore predefinito è false . |
deregisterConsumerTimeoutInterval |
STRING |
Timeout massimo di attesa affinché il consumer EFO venga deregistrato dallo stream Kinesis prima di generare un errore. Il valore predefinito è '300s'. |
consumerRefreshInterval |
STRING |
Intervallo in cui il consumatore viene controllato e aggiornato. Il valore predefinito è '300s'. |
Gli argomenti seguenti vengono usati per controllare il throughput di lettura e la latenza per Kinesis.
Parametro | Tipo | Descrizione |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Facoltativo, con un valore predefinito di 10.000 record da leggere per ogni richiesta API a Kinesis. |
maxFetchRate |
STRING |
Velocità di prelettura dei dati per partizione. Valore compreso tra '1,0' e '2,0' misurato in MB/s. Il valore predefinito è '1.0'. |
minFetchPeriod |
STRING |
Tempo di attesa massimo tra tentativi di prefetching consecutivi. Il valore predefinito è '400ms'. |
maxFetchDuration |
STRING |
Durata massima per bufferizzare i nuovi dati precaricati. Il valore predefinito è '10s'. |
fetchBufferSize |
STRING |
Quantità di dati per il trigger successivo. Il valore predefinito è "20 gb". |
shardsPerTask |
INTEGER (>0) |
Numero di shard Kinesis da precaricare in parallelo per ogni attività Spark. L'impostazione predefinita è 5. |
shardFetchinterval |
STRING |
Frequenza con cui eseguire il polling per il partizionamento orizzontale. Il valore predefinito è "1s". |
coalesceThresholdBlockSize |
INTEGER (>0) |
Soglia in corrispondenza della quale si verifica l'unione automatica. Il valore predefinito è 10.000.000. |
coalesce |
BOOLEAN |
true per unire le richieste precaricate. Il valore predefinito è true . |
coalesceBinSize |
INTEGER (>0) |
Dimensioni approssimative del blocco dopo l'unione. Il valore predefinito è 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true per riutilizzare il Kinesis client archiviato nella cache. Il valore predefinito è true tranne in un cluster PE. |
clientRetries |
INTEGER (>0) |
Numero di tentativi nello scenario di ripetizione. L'impostazione predefinita è 5. |
Valori restituiti
Una tabella di record Kinesis con il seguente schema:
Nome | Tipo di dati | Nullabile | Standard | Descrizione |
---|---|---|---|---|
partitionKey |
STRING |
No | Chiave utilizzata per distribuire i dati tra le partizioni di un flusso. Tutti i record di dati con la stessa chiave di partizione verranno letti dalla stessa partizione. | |
data |
BINARY |
No | Payload dei dati Kinesis, codificato in base 64. | |
stream |
STRING |
No | Nome del flusso da cui sono stati letti i dati. | |
shardId |
STRING |
No | Identificatore univoco per la partizione da cui sono stati letti i dati. | |
sequenceNumber |
BIGINT |
No | Identificatore univoco del record all'interno della partizione. | |
approximateArrivalTimestamp |
TIMESTAMP |
No | L'ora approssimativa in cui il record è stato inserito nel flusso. |
Le colonne (stream, shardId, sequenceNumber)
costituiscono una chiave primaria.
Esempi
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');