Курсоры базы данных
Применимо: ✅Azure Data Explorer
Курсор базы данных — это объект уровня базы данных, который позволяет запрашивать базу данных несколько раз. Вы получаете согласованные результаты даже при наличии data-append
или data-retention
операций параллельно с запросами.
Курсоры базы данных предназначены для решения двух важных сценариев:
Возможность повторять один и тот же запрос несколько раз и получать те же результаты, если запрос указывает на "тот же набор данных".
Возможность выполнения запроса "ровно один раз". Этот запрос "видит" только данные, которые предыдущий запрос не видел, так как данные не были доступны. Запрос позволяет выполнять итерацию, например, через все вновь поступающие данные в таблице без страха обработки одной записи дважды или пропускания записей по ошибке.
Курсор базы данных представлен на языке запросов в виде скалярного значения типа string
. Фактическое значение должно считаться непрозрачным и не поддерживается любая операция, кроме сохранения его значения или использования следующих функций курсора.
Функции курсора
Kusto предоставляет три функции для реализации двух приведенных выше сценариев:
cursor_current(): используйте эту функцию для получения текущего значения курсора базы данных. Это значение можно использовать в качестве аргумента для двух других функций.
cursor_after(rhs:string) : эта специальная функция может использоваться для записей таблиц с включеннымполитикой IngestionTime. Он возвращает скалярное значение типа bool
, указывающее, поступает ли значение курсора базы данных записиingestion_time()
после значения курсора базы данныхrhs
.cursor_before_or_at(rhs:string) : эта специальная функция может использоваться в записях таблиц с включеннымполитикой IngestionTime. Он возвращает скалярное значение типа bool
, указывающее, поступает ли значение курсораingestion_time()
базы данных записи до или по значению курсора базы данныхrhs
.
Две специальные функции (cursor_after
и cursor_before_or_at
) также имеют побочный эффект: когда они используются, Kusto выдает текущее значение курсора базы данных в @ExtendedProperties
результирующий набор запроса. Имя свойства для курсора Cursor
, а его значение — один string
.
Например:
{"Cursor" : "636040929866477946"}
Ограничения
Курсоры базы данных можно использовать только с таблицами, для которых включена политика
Объект курсора базы данных не содержит значимых значений, если база данных не имеет по крайней мере одну таблицу с политикой IngestionTime. Это значение гарантированно обновляется по мере необходимости в журнал приема данных в такие таблицы и запросы, которые ссылаются на такие таблицы. Это может быть обновлено или не может быть обновлено в других случаях.
Процесс приема сначала фиксирует данные, чтобы он был доступен для запроса, а затем назначает фактическое значение курсора каждой записи. Запрос данных сразу после приема с помощью курсора базы данных может не включать последние записи, добавленные, так как значение курсора еще не назначено. Кроме того, повторное получение текущего значения курсора базы данных может возвращать одно и то же значение, даже если прием был выполнен между ними, так как только фиксация курсора может обновить его значение.
Запрос таблицы на основе курсоров базы данных гарантируется только "работать" (предоставляя точные гарантии), если записи обрабатываются непосредственно в этой таблице. Если вы используете команды экстентов, такие как экстенты перемещения или .replace экстенты для перемещения данных в таблицу, или если вы используете .rename table, запрос этой таблицы с помощью курсоров базы данных не гарантируется, чтобы избежать отсутствия данных. Это связано с тем, что время приема записей назначается при первоначальном приеме и не изменяется во время операции экстентов перемещения.
При перемещении экстентов в целевую таблицу назначенный курсор может быть обработано, а следующий запрос курсором базы данных пропустит новые записи.
Пример. Обработка записей ровно один раз
Для таблицы Employees
со схемой [Name, Salary]
для непрерывной обработки новых записей при приеме в таблицу используйте следующий процесс:
// [Once] Enable the IngestionTime policy on table Employees
.set table Employees policy ingestiontime true
// [Once] Get all the data that the Employees table currently holds
Employees | where cursor_after('')
// The query above will return the database cursor value in
// the @ExtendedProperties result set. Lets assume that it returns
// the value '636040929866477946'
// [Many] Get all the data that was added to the Employees table
// since the previous query was run using the previously-returned
// database cursor
Employees | where cursor_after('636040929866477946') // -> 636040929866477950
Employees | where cursor_after('636040929866477950') // -> 636040929866479999
Employees | where cursor_after('636040929866479999') // -> 636040939866479000