Как построить 1-часовой буфер потока воспроизведения в Flink?

Я хотел бы динамически сохранять буфер последних 1-часовых событий. Этот буфер должен дать мне функцию воспроизведения, чтобы запросы могли быть выполнены на данных за последний час.
Есть ли уже что-то реализовано в Flink? Или мне нужно построить его самому?

Я пытался использовать API окна, но кажется, что Flink не дает мне вперед смещение фиксированной ширины окна времени.

1 ответ

  1. У меня есть решение моего собственного вопроса, но я хотел бы сохранить вопрос в случае, если у вас есть лучшие решения. Потому что мой определенно нарушает некоторые из лучших практик функционального программирования.

    Мой Хак следующий.

                val keyedEventStream: KeyedStream[E]
    
                // create a stream of [hourly window as a set of events]
                val eventWindowStream = keyedEventStream.timeWindow(Time.minutes(60), Time.milliseconds(50)).fold(scala.collection.Set[E]())((set: scala.collection.Set[E], event: E) => set + event)
    
    
                // This is the hourly buffer my process logic will use
                var workWindow = scala.collection.Set[E]()
                // update the workspace window with the stream of hourly window.
                eventWindowStream.map((set: scala.collection.Set[W]) => workWindow = set)
    

    Как вы можете видеть, единственной целью последней карты является обновление переменной workWindow, которая на самом деле является побочным эффектом встроенной функции.