Udostępnij za pośrednictwem


read_kinesis funkcja przesyłania strumieniowego o wartości tabeli

Dotyczy: zaznacz pole wyboru oznaczone jako tak Databricks SQL zaznacz pole wyboru oznaczone jako tak Databricks Runtime 13.3 LTS i nowsze

Zwraca tabelę z rekordami odczytanymi z kinezy z co najmniej jednego strumienia.

Składnia

read_kinesis ( { parameter => value } [, ...] )

Argumenty

read_kinesis wymaga wywołania nazwanego parametru.

Jedynym wymaganym argumentem jest streamName. Wszystkie inne argumenty są opcjonalne.

Opisy argumentów są krótkie tutaj. Aby uzyskać więcej informacji, zobacz dokumentację Amazon Kinesis .

Istnieją różne opcje połączenia do nawiązywania połączenia i uwierzytelniania za pomocą platformy AWS. awsAccessKey, i awsSecretKey można określić w argumentach funkcji przy użyciu funkcji wpisu tajnego, ręcznie ustawić w argumentach lub skonfigurować jako zmienne środowiskowe, jak pokazano poniżej. roleArnroleSessionName, roleExternalIDmożna również użyć do uwierzytelniania za pomocą platformy AWS przy użyciu profilów wystąpień. Jeśli żadna z tych opcji nie zostanie określona, użyje domyślnego łańcucha dostawców platformy AWS.

Parametr Type Opis
streamName STRING Wymagana, rozdzielana przecinkami lista co najmniej jednego strumienia kinezy.
awsAccessKey STRING Jeśli istnieje, klucz dostępu platformy AWS. Można również określić za pomocą różnych opcji obsługiwanych za pośrednictwem domyślnego łańcucha dostawców poświadczeń platformy AWS, w tym zmiennych środowiskowych (AWS_ACCESS_KEY_ID) i pliku profilów poświadczeń.
awsSecretKey STRING Klucz tajny odpowiadający kluczowi dostępu. Można określić w argumentach lub za pomocą różnych opcji obsługiwanych za pośrednictwem domyślnego łańcucha dostawców poświadczeń platformy AWS, w tym zmiennych środowiskowych (AWS_SECRET_KEY lub AWS_SECRET_ACCESS_KEY) i pliku profilów poświadczeń.
roleArn STRING Nazwa zasobu firmy Amazon roli, która ma być przyjmowana podczas uzyskiwania dostępu do usługi Kinesis.
roleExternalId STRING Używany podczas delegowania dostępu do konta platformy AWS.
roleSessionName STRING Nazwa sesji roli platformy AWS.
stsEndpoint STRING Punkt końcowy do żądania poświadczeń dostępu tymczasowego.
region STRING Region, dla których mają być określone strumienie. Wartość domyślna to lokalnie rozwiązany region.
endpoint STRING regionalny punkt końcowy strumieni danych Kinesis. Wartość domyślna to lokalnie rozwiązany region.
initialPosition STRING Pozycja początkowa odczytu z w strumieniu. Jeden z: "latest" (wartość domyślna), "trim_horizon", "earliest", "at_timestamp".
consumerMode STRING Jeden z: "polling" (ustawienie domyślne) lub "EFO" (enhanced-fan-out).
consumerName STRING Nazwa konsumenta. Wszyscy odbiorcy mają prefiks "databricks_". Wartość domyślna to pusty ciąg.
registerConsumerTimeoutInterval STRING maksymalny limit czasu oczekiwania na zarejestrowanie konsumenta EFO Kinesis przy użyciu strumienia Kinesis przed zgłoszeniem błędu. Wartość domyślna to "300s".
requireConsumerDeregistration BOOLEAN true aby cofnąć rejestrację konsumenta EFO w przypadku zakończenia zapytań. Wartość domyślna to false.
deregisterConsumerTimeoutInterval STRING Maksymalny limit czasu oczekiwania na wyrejestrowanie konsumenta EFO Kinesis z strumieniem Kinesis przed zgłoszeniem błędu. Wartość domyślna to "300s".
consumerRefreshInterval STRING Interwał, w którym odbiorca jest sprawdzany i odświeżany. Wartość domyślna to "300s".

Następujące argumenty służą do kontrolowania przepływności i opóźnienia odczytu dla kinezy:

Parametr Type Opis
maxRecordsPerFetch INTEGER (>0) Opcjonalnie, z domyślną 10 000 rekordów do odczytu na żądanie interfejsu API do Kinesis.
maxFetchRate STRING Jak szybko pobierać dane wstępnie na fragment. Wartość z zakresu od "1,0" do "2,0" mierzona w MB/s. Wartość domyślna to "1.0".
minFetchPeriod STRING Maksymalny czas oczekiwania między kolejnymi próbami pobierania z góry. Wartość domyślna to "400 ms".
maxFetchDuration STRING Maksymalny czas trwania buforowania wstępnie pobranych nowych danych. Wartość domyślna to "10s".
fetchBufferSize STRING Ilość danych dla następnego wyzwalacza. Wartość domyślna to "20gb".
shardsPerTask INTEGER (>0) Liczba fragmentów kinezy do wstępnego pobierania z poszczególnych zadań platformy Spark. Ustawieniem domyślnym jest 5.
shardFetchinterval STRING Jak często sondować pod kątem ponownego dzielenia na fragmenty. Wartość domyślna to "1s".
coalesceThresholdBlockSize INTEGER (>0) Próg, przy którym występuje automatyczne łączenie. Wartość domyślna to 10 000 000.
coalesce BOOLEAN true łączenie wstępnie pobranych żądań. Wartość domyślna to true.
coalesceBinSize INTEGER (>0) Przybliżony rozmiar bloku po łączenia. Wartość domyślna to 128 000 000.
reuseKinesisClient BOOLEAN true aby ponownie użyć klienta Kinesis przechowywanego w pamięci podręcznej. Wartość domyślna jest true wyjątkiem klastra PE.
clientRetries INTEGER (>0) Liczba ponownych prób w scenariuszu ponawiania prób. Ustawieniem domyślnym jest 5.

Zwraca

Tabela rekordów Kinesis z następującym schematem:

Nazwisko Typ danych Dopuszczający wartość null Standardowa opis
partitionKey STRING Nie. Klucz używany do dystrybucji danych między fragmentami strumienia. Wszystkie rekordy danych z tym samym kluczem partycji będą odczytywane z tego samego fragmentu.
data BINARY Nie. Ładunek danych kinezy, zakodowany w formacie base-64.
stream STRING Nie. Nazwa strumienia, z którego odczytywano dane.
shardId STRING Nie. Unikatowy identyfikator fragmentu, z którego odczytywano dane.
sequenceNumber BIGINT Nie. Unikatowy identyfikator rekordu w ramach jego fragmentu.
approximateArrivalTimestamp TIMESTAMP Nie. Przybliżony czas wstawienia rekordu do strumienia.

Kolumny (stream, shardId, sequenceNumber) stanowią klucz podstawowy.

Przykłady

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');