Condividi tramite


read_kinesis funzione di streaming con valori di tabella

Si applica a:segno di spunta sì Databricks SQL segno di spunta sì Databricks Runtime 13.3 LTS e versioni successive

Restituisce una tabella con record letti da Kinesis da uno o più flussi.

Sintassi

read_kinesis ( { parameter => value } [, ...] )

Argomenti

read_kinesis richiede l'invocazione di parametri nominati.

L'unico argomento obbligatorio è streamName. Tutti gli altri argomenti sono facoltativi.

Le descrizioni degli argomenti sono brevi qui. Per ulteriori dettagli, consulta la documentazione di Amazon Kinesis.

Sono disponibili varie opzioni di connessione per connettersi ed eseguire l'autenticazione con AWS. awsAccessKeye awsSecretKey possono essere specificati negli argomenti della funzione usando la funzione privata , impostata manualmente negli argomenti o configurata come variabili di ambiente come indicato di seguito. roleArn, roleExternalID, roleSessionName può essere usato anche per eseguire l'autenticazione con AWS usando i profili di istanza. Se nessuno di questi elementi viene specificato, userà la catena di provider AWS predefinita.

Parametro Tipo Descrizione
streamName STRING Elenco obbligatorio di uno o più flussi Kinesis, separati da virgole.
awsAccessKey STRING Chiave di accesso AWS, se presente. È anche possibile specificare tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_ACCESS_KEY_ID) e un file di profili credenziali.
awsSecretKey STRING Chiave privata che corrisponde alla chiave di accesso. Può essere specificato negli argomenti o tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_SECRET_KEY o AWS_SECRET_ACCESS_KEY) e un file di profili di credenziali.
roleArn STRING Nome della risorsa Amazon del ruolo da assumere quando si accede a Kinesis.
roleExternalId STRING Usato per delegare l'accesso all'account AWS.
roleSessionName STRING Nome della sessione del ruolo AWS.
stsEndpoint STRING Endpoint per la richiesta di credenziali di accesso temporanee.
region STRING Area per i flussi da specificare. Il valore predefinito è l'area risolta localmente.
endpoint STRING endpoint regionale per i flussi di dati Kinesis. Il valore predefinito è l'area risolta localmente.
initialPosition STRING Posizione iniziale per la lettura nel flusso. Uno dei seguenti: 'latest' (impostazione predefinita), 'trim_horizon', 'earliest', 'at_timestamp'.
consumerMode STRING Uno dei seguenti: "polling" (impostazione predefinita) o "EFO" (enhanced-fan-out).
consumerName STRING Nome del consumatore. Tutti i consumatori sono prefissati con "databricks_". Il valore predefinito è una stringa vuota.
registerConsumerTimeoutInterval STRING il timeout massimo per attendere che il consumer EFO venga registrato con il flusso Kinesis prima di generare un errore. Il valore predefinito è '300s'.
requireConsumerDeregistration BOOLEAN true per annullare la registrazione del consumer EFO alla terminazione della query. Il valore predefinito è false.
deregisterConsumerTimeoutInterval STRING Timeout massimo di attesa affinché il consumer EFO venga deregistrato dallo stream Kinesis prima di generare un errore. Il valore predefinito è '300s'.
consumerRefreshInterval STRING Intervallo in cui il consumatore viene controllato e aggiornato. Il valore predefinito è '300s'.

Gli argomenti seguenti vengono usati per controllare il throughput di lettura e la latenza per Kinesis.

Parametro Tipo Descrizione
maxRecordsPerFetch INTEGER (>0) Facoltativo, con un valore predefinito di 10.000 record da leggere per ogni richiesta API a Kinesis.
maxFetchRate STRING Velocità di prelettura dei dati per partizione. Valore compreso tra '1,0' e '2,0' misurato in MB/s. Il valore predefinito è '1.0'.
minFetchPeriod STRING Tempo di attesa massimo tra tentativi di prefetching consecutivi. Il valore predefinito è '400ms'.
maxFetchDuration STRING Durata massima per bufferizzare i nuovi dati precaricati. Il valore predefinito è '10s'.
fetchBufferSize STRING Quantità di dati per il trigger successivo. Il valore predefinito è "20 gb".
shardsPerTask INTEGER (>0) Numero di shard Kinesis da precaricare in parallelo per ogni attività Spark. L'impostazione predefinita è 5.
shardFetchinterval STRING Frequenza con cui eseguire il polling per il partizionamento orizzontale. Il valore predefinito è "1s".
coalesceThresholdBlockSize INTEGER (>0) Soglia in corrispondenza della quale si verifica l'unione automatica. Il valore predefinito è 10.000.000.
coalesce BOOLEAN true per unire le richieste precaricate. Il valore predefinito è true.
coalesceBinSize INTEGER (>0) Dimensioni approssimative del blocco dopo l'unione. Il valore predefinito è 128.000.000.
reuseKinesisClient BOOLEAN true per riutilizzare il Kinesis client archiviato nella cache. Il valore predefinito è true tranne in un cluster PE.
clientRetries INTEGER (>0) Numero di tentativi nello scenario di ripetizione. L'impostazione predefinita è 5.

Valori restituiti

Una tabella di record Kinesis con il seguente schema:

Nome Tipo di dati Nullabile Standard Descrizione
partitionKey STRING No Chiave utilizzata per distribuire i dati tra le partizioni di un flusso. Tutti i record di dati con la stessa chiave di partizione verranno letti dalla stessa partizione.
data BINARY No Payload dei dati Kinesis, codificato in base 64.
stream STRING No Nome del flusso da cui sono stati letti i dati.
shardId STRING No Identificatore univoco per la partizione da cui sono stati letti i dati.
sequenceNumber BIGINT No Identificatore univoco del record all'interno della partizione.
approximateArrivalTimestamp TIMESTAMP No L'ora approssimativa in cui il record è stato inserito nel flusso.

Le colonne (stream, shardId, sequenceNumber) costituiscono una chiave primaria.

Esempi

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');