Как хранить данные каждой итерации Spark Streaming в одном RDD?

Я новичок в Spark.
Я пишу следующий сценарий, который получает поток от Кафки, который затем преобразуется в RDD.

Моя цель состоит в том, чтобы сохранить в памяти данные из каждой итерации потока в один RDD. Например, добавление элемента в список в каждом цикле.

conf = SparkConf().setAppName("Application")
sc = SparkContext(conf=conf)

def joinRDDs(rdd):
     elements = rdd.collect()
     rdds = sc.parallelize(elements)
     transformed = rdds.map(lambda x: ('key', {u'name': x[1]}))

if __name__ == '__main__':
    ssc = StreamingContext(sc, 2)
    stream = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": host})
    stream.foreachRDD(joinRDDs)

Как я могу это сделать?

Спасибо за внимание

1 ответ

  1. Используйте updateStatebyKey () и передайте функцию по мере необходимости.Функция принимает 2 аргумента новые данные которые приходят в каждый пакет и также исторические данные которые вы держите в памяти.

    def countPurchasers (newValues,lastSum):
    если значение lastSum равно None:
    lastSum=0
    возвращаемая сумма (newValues, lastSum)

    updateStatebBykey (countPurchasers)