Что такое потоковая передача с отслеживанием состояния?
Запрос структурированной потоковой передачи с отслеживанием состояния требует добавочных обновлений сведений о промежуточном состоянии, тогда как запрос структурированной потоковой передачи без отслеживания состояния отслеживает только сведения о том, какие записи были обработаны из источника в приемник.
Операции с отслеживанием состояния включают агрегирование потоковой передачи, потоковую передачу dropDuplicates
, соединения "поток — поток", mapGroupsWithState
и flatMapGroupsWithState
.
Сведения о промежуточном состоянии, необходимые для запросов структурированной потоковой передачи с отслеживанием состояния, могут привести к непредвиденным задержкам и проблемам в рабочей среде, если они настроены неправильно.
В Databricks Runtime 13.3 LTS и более поздних версиях можно включить контрольную точку журнала изменений с помощью RocksDB для снижения длительности и сквозной задержки для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует включать контрольные точки журнала изменений для всех запросов структурированных потоков с отслеживанием состояния. См. раздел "Включить контрольную точку журнала изменений".
Optimize запросы структурированной потоковой передачи с отслеживанием состояния
Управление сведениями о промежуточном состоянии для запросов структурированной потоковой передачи с отслеживанием состояния позволяет предотвратить непредвиденные задержки и проблемы в рабочей среде.
Databricks рекомендует следующее:
- Используйте оптимизированные для вычислений экземпляры в качестве рабочих ролей.
- Set количество секций перетасовки до 1–2 раз числа ядер в кластере.
-
Set конфигурацию
spark.sql.streaming.noDataMicroBatches.enabled
дляfalse
в SparkSession. Это не допускает обработку микропакетов, которые не содержат данных, в обработчике микропакетов. Обратите внимание, что настройка этой конфигурацииfalse
может привести к операциям с отслеживанием состояния, которые используют подложки или время ожидания обработки, чтобы не get выходные данные до тех пор, пока новые данные не будут немедленно поступать.
Databricks рекомендует использовать RocksDB с контрольными точками журнала изменений для управления состоянием для потоков с отслеживанием состояния. См. статью Настройка хранилища состояний RocksDB в Azure Databricks.
Примечание.
Схему управления состоянием нельзя изменить между перезапусками запросов. То есть если запрос был запущен с управлением по умолчанию, то он не может быть изменен без запуска запроса с нуля с новым расположением контрольной точки.
Работа с несколькими операторами с отслеживанием состояния в структурированной потоковой передаче
В Databricks Runtime 13.3 LTS и более поздних версиях Azure Databricks предлагает расширенную поддержку операторов с отслеживанием состояния в структурированных рабочих нагрузках потоковой передачи. Теперь можно объединить несколько операторов с отслеживанием состояния, что означает, что вы можете передать выходные данные операции, например агрегирование окна в другую операцию с отслеживанием состояния, например join.
В следующих примерах показано несколько шаблонов, которые можно использовать.
Внимание
При работе с несколькими операторами с отслеживанием состояния существуют следующие ограничения:
- Функция
FlatMapGroupWithState
не поддерживается. - Поддерживается только режим вывода добавления.
Агрегирование цепного времени window
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Агрегирование времени window в двух потоках, за которым следует объединение потоков-потоков windowjoin
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Интервал времени между последовательными потоками join и последующее агрегирование времени window
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Перебалансирование состояния для структурированной потоковой передачи
Перебалансирование состояния включено по умолчанию для всех рабочих нагрузок потоковой передачи в Delta Live Tables. В Databricks Runtime 11.3 LTS и более поздних версиях можно set следующий параметр конфигурации в конфигурации кластера Spark, чтобы включить перебалансирование состояния:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Перебалансирование состояния дает преимущества конвейеров структурированной потоковой передачи с отслеживанием состояния, которые проходят события изменения размера кластера. Операции потоковой передачи без отслеживания состояния не получают преимущества независимо от изменения размеров кластера.
Примечание.
Автоматическое масштабирование вычислений имеет ограничения, ограничивающие размер кластера для структурированных рабочих нагрузок потоковой передачи. Databricks рекомендует использовать Delta Live Tables с расширенным автомасштабированием для потоковых рабочих нагрузок. См. Optimize использование кластеров конвейеров Delta Live Tables с расширенным автомасштабированием.
События изменения размера кластера вызывают перебалансировку состояния. При перебалансировке событий микропакеты могут иметь более высокую задержку, так как состояние загружается из облачного хранилища в новые исполнители.
Укажите начальное состояние для mapGroupsWithState
Вы можете указать определяемое пользователем начальное состояние для обработки структурированной потоковой передачи с отслеживанием состояния с помощью flatMapGroupsWithState
или mapGroupsWithState
. Это позволяет избежать повторной обработки данных при запуске потока с отслеживанием состояния без допустимой контрольной точки.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Пример варианта использования, который задает начальное состояние для оператора flatMapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Пример варианта использования, который задает начальное состояние для оператора mapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Проверить функцию mapGroupsWithState
update
API TestGroupState
позволяет протестировать функцию состояния update, используемую для Dataset.groupByKey(...).mapGroupsWithState(...)
и Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
Функция update состояния принимает предыдущее состояние в качестве входных данных с помощью объекта типа GroupState
. См. справочную документацию по GroupState Apache Spark. Например:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}