read_kinesis
funkcja przesyłania strumieniowego o wartości tabeli
Dotyczy: Databricks SQL Databricks Runtime 13.3 LTS i nowsze
Zwraca tabelę z rekordami odczytanymi z kinezy z co najmniej jednego strumienia.
Składnia
read_kinesis ( { parameter => value } [, ...] )
Argumenty
read_kinesis
wymaga wywołania nazwanego parametru.
Jedynym wymaganym argumentem jest streamName
. Wszystkie inne argumenty są opcjonalne.
Opisy argumentów są krótkie tutaj. Aby uzyskać więcej informacji, zobacz dokumentację Amazon Kinesis .
Istnieją różne opcje połączenia do nawiązywania połączenia i uwierzytelniania za pomocą platformy AWS.
awsAccessKey
, i awsSecretKey
można określić w argumentach funkcji przy użyciu funkcji wpisu tajnego, ręcznie ustawić w argumentach lub skonfigurować jako zmienne środowiskowe, jak pokazano poniżej.
roleArn
roleSessionName
, roleExternalID
można również użyć do uwierzytelniania za pomocą platformy AWS przy użyciu profilów wystąpień.
Jeśli żadna z tych opcji nie zostanie określona, użyje domyślnego łańcucha dostawców platformy AWS.
Parametr | Type | Opis |
---|---|---|
streamName |
STRING |
Wymagana, rozdzielana przecinkami lista co najmniej jednego strumienia kinezy. |
awsAccessKey |
STRING |
Jeśli istnieje, klucz dostępu platformy AWS. Można również określić za pomocą różnych opcji obsługiwanych za pośrednictwem domyślnego łańcucha dostawców poświadczeń platformy AWS, w tym zmiennych środowiskowych (AWS_ACCESS_KEY_ID ) i pliku profilów poświadczeń. |
awsSecretKey |
STRING |
Klucz tajny odpowiadający kluczowi dostępu. Można określić w argumentach lub za pomocą różnych opcji obsługiwanych za pośrednictwem domyślnego łańcucha dostawców poświadczeń platformy AWS, w tym zmiennych środowiskowych (AWS_SECRET_KEY lub AWS_SECRET_ACCESS_KEY ) i pliku profilów poświadczeń. |
roleArn |
STRING |
Nazwa zasobu firmy Amazon roli, która ma być przyjmowana podczas uzyskiwania dostępu do usługi Kinesis. |
roleExternalId |
STRING |
Używany podczas delegowania dostępu do konta platformy AWS. |
roleSessionName |
STRING |
Nazwa sesji roli platformy AWS. |
stsEndpoint |
STRING |
Punkt końcowy do żądania poświadczeń dostępu tymczasowego. |
region |
STRING |
Region, dla których mają być określone strumienie. Wartość domyślna to lokalnie rozwiązany region. |
endpoint |
STRING |
regionalny punkt końcowy strumieni danych Kinesis. Wartość domyślna to lokalnie rozwiązany region. |
initialPosition |
STRING |
Pozycja początkowa odczytu z w strumieniu. Jeden z: "latest" (wartość domyślna), "trim_horizon", "earliest", "at_timestamp". |
consumerMode |
STRING |
Jeden z: "polling" (ustawienie domyślne) lub "EFO" (enhanced-fan-out). |
consumerName |
STRING |
Nazwa konsumenta. Wszyscy odbiorcy mają prefiks "databricks_". Wartość domyślna to pusty ciąg. |
registerConsumerTimeoutInterval |
STRING |
maksymalny limit czasu oczekiwania na zarejestrowanie konsumenta EFO Kinesis przy użyciu strumienia Kinesis przed zgłoszeniem błędu. Wartość domyślna to "300s". |
requireConsumerDeregistration |
BOOLEAN |
true aby cofnąć rejestrację konsumenta EFO w przypadku zakończenia zapytań. Wartość domyślna to false . |
deregisterConsumerTimeoutInterval |
STRING |
Maksymalny limit czasu oczekiwania na wyrejestrowanie konsumenta EFO Kinesis z strumieniem Kinesis przed zgłoszeniem błędu. Wartość domyślna to "300s". |
consumerRefreshInterval |
STRING |
Interwał, w którym odbiorca jest sprawdzany i odświeżany. Wartość domyślna to "300s". |
Następujące argumenty służą do kontrolowania przepływności i opóźnienia odczytu dla kinezy:
Parametr | Type | Opis |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Opcjonalnie, z domyślną 10 000 rekordów do odczytu na żądanie interfejsu API do Kinesis. |
maxFetchRate |
STRING |
Jak szybko pobierać dane wstępnie na fragment. Wartość z zakresu od "1,0" do "2,0" mierzona w MB/s. Wartość domyślna to "1.0". |
minFetchPeriod |
STRING |
Maksymalny czas oczekiwania między kolejnymi próbami pobierania z góry. Wartość domyślna to "400 ms". |
maxFetchDuration |
STRING |
Maksymalny czas trwania buforowania wstępnie pobranych nowych danych. Wartość domyślna to "10s". |
fetchBufferSize |
STRING |
Ilość danych dla następnego wyzwalacza. Wartość domyślna to "20gb". |
shardsPerTask |
INTEGER (>0) |
Liczba fragmentów kinezy do wstępnego pobierania z poszczególnych zadań platformy Spark. Ustawieniem domyślnym jest 5. |
shardFetchinterval |
STRING |
Jak często sondować pod kątem ponownego dzielenia na fragmenty. Wartość domyślna to "1s". |
coalesceThresholdBlockSize |
INTEGER (>0) |
Próg, przy którym występuje automatyczne łączenie. Wartość domyślna to 10 000 000. |
coalesce |
BOOLEAN |
true łączenie wstępnie pobranych żądań. Wartość domyślna to true . |
coalesceBinSize |
INTEGER (>0) |
Przybliżony rozmiar bloku po łączenia. Wartość domyślna to 128 000 000. |
reuseKinesisClient |
BOOLEAN |
true aby ponownie użyć klienta Kinesis przechowywanego w pamięci podręcznej. Wartość domyślna jest true wyjątkiem klastra PE. |
clientRetries |
INTEGER (>0) |
Liczba ponownych prób w scenariuszu ponawiania prób. Ustawieniem domyślnym jest 5. |
Zwraca
Tabela rekordów Kinesis z następującym schematem:
Nazwisko | Typ danych | Dopuszczający wartość null | Standardowa | opis |
---|---|---|---|---|
partitionKey |
STRING |
Nie. | Klucz używany do dystrybucji danych między fragmentami strumienia. Wszystkie rekordy danych z tym samym kluczem partycji będą odczytywane z tego samego fragmentu. | |
data |
BINARY |
Nie. | Ładunek danych kinezy, zakodowany w formacie base-64. | |
stream |
STRING |
Nie. | Nazwa strumienia, z którego odczytywano dane. | |
shardId |
STRING |
Nie. | Unikatowy identyfikator fragmentu, z którego odczytywano dane. | |
sequenceNumber |
BIGINT |
Nie. | Unikatowy identyfikator rekordu w ramach jego fragmentu. | |
approximateArrivalTimestamp |
TIMESTAMP |
Nie. | Przybliżony czas wstawienia rekordu do strumienia. |
Kolumny (stream, shardId, sequenceNumber)
stanowią klucz podstawowy.
Przykłady
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');