агрегирование существующих данных по ID из mongodb в spark

Мне интересно, может ли Mongo-spark обрабатывать сценарий, в котором Im будет импортировать данные json из потока, но для каждого файла я хочу сначала, если есть соответствующая сущность уже в Mongo, и если есть, я хотел бы вручную объединить 2 документа.

Импортированные данные выглядят следующим образом

{orderId: 1290edoiadq, from: <Some_address_string>, to: <Some_address_string>, status: "Shipped"}. 

MongoDB, который у меня есть, имеет те же данные, но _idполе содержит orderId. То, что я ищу, чтобы получить все заказы, а затем проверить, если они должны обновляться или вставлены.

EDIT позвольте мне уточнить, что означает слияние. Если у меня есть заказ с тем же идентификатором, но их статус отличается, то я хотел бы обновить статус существующего заказа в БД, чтобы быть тем, что в данных JSON.

2 ответа

  1. Я хотел бы вручную объединить 2 документа.

    Зависит от вашего определения merge.

    Если это одностороннее направление, от входящего потока данных json до обновления документов, хранящихся в MongoDB, можно использовать upsert .

    Поскольку MongoDB Connector для Spark версии 1.1.0, если _idфрейм данных содержит совпадающие данные в MongoDB, save()будет использовать upsert. Который будет обновляться при _idналичии соответствия, в противном случае вставка.

    Например, чтобы изменить на status=delivered:

    > df.schema
      org.apache.spark.sql.types.StructType = StructType(StructField(_id,StringType,true), StructField(from,StringType,true), StructField(status,StringType,true), StructField(to,StringType,true))
    
    > df.first()
      org.apache.spark.sql.Row = [1290edoiadq,sender,delivered,receiver]
    
    > MongoSpark.save(df.write.option("collection", "order").mode("append"))
    

    Перед вызовом просто переименуйте имя orderIdполя_idsave().

    Дополнительную информацию смотрите в разделах SPARK-66 и MongoSpark: save ().

    Однако если под mergeвами подразумевается обновление в двух направлениях (входящий поток и MongoDB), то сначала необходимо консолидировать изменения в Spark. Разрешение любого конфликта, как вы считаете нужным в коде.

  2. Я не использовал MongoDB, но имел аналогичный случай использования.
    Мой сценарий:
    Существует входящий поток событий, прочитанных из темы Кафки. Эти события должны быть сопоставлены и сгруппированы по ключу, для каждого ключа может быть соответствующая запись в хранилище данных(HBase в моем случае, MongoDB в вашем). Если есть запись, объедините ключевые события в существующую сущность, если нет, создайте новую сущность и сохраните ее в HBase. Есть и другие осложнения в моем случае, такие как просмотр нескольких таблиц и т.д. но суть проблемы, кажется, похожа на вашу.

    Мой подход и проблемы, с которыми я столкнулся:
    Я использовал прямой потоковый подход Кафки, это дает мне пакет(для этого обсуждения является взаимозаменяемым с RDD) данных для настроенного периода времени. Прямой потоковый подход будет читать из всех разделов Кафки, но вы должны установить ручную контрольную точку в контексте потоковой передачи, чтобы сделать вашу программу восстанавливаемой.

    Теперь этот RDD представляет все сообщения, прочитанные в течение настроенного времени.Можно дополнительно настроить максимальный размер этого пакета. При любой обработке в этом RDD, RDD разбивается на куски и каждый кусок обрабатывается исполнителем. Spark обычно порождает по одному исполнителю на ядро на машину в кластере. Я бы посоветовал вам настроить максимальное число для вашего задания spark. Вы можете амортизировать доступ к источнику данных (HBase в моем случае) на основе каждого раздела. Таким образом, если у вас есть 10 исполнителей, работающих параллельно, помните, что вы можете открыть 10 соединений ввода-вывода параллельно вашим данным. Поскольку чтение должно отражать последнюю запись на сущности, это, вероятно, самый важный аспект вашего дизайна.Может ли ваш источник данных гарантировать согласованность?

    С точки зрения кода ваша программа будет выглядеть примерно так

    dataStream.foreachRDD(rdd -> {
     // for each incoming batch, do any shuffle operations like groupByKey   first
    // This is because during shuffle data is exchanged between partitions
     rdd.groupByKey().mapPartitions(eventsInPartition -> {
      // this part of the code executes at each partition.
      Connection connection = createConnectionToDataSource()
    
      eventsInPartition.forEachRemaining(eventPair -> {
    
      Entity entity = connection.getStuffFromDB(eventPair._1)
      entity.addNewEvents(eventPair._2) // your merge step
      connection.writeStuffToDB(eventPair._1, entity)
    
       })
     })
    })
    

    Вы начинаете с foreachRDD, чтобы действовать на каждом входящем пакете данных. Сначала выполните любые карты или преобразования, которые могут применяться к каждому отдельному событию параллельно. groupByKey будет перемешивать данные по разделам, и у вас будут все события с тем же ключом в том же разделе. mapPartitions принимает функцию, которая выполняется в одной секции. Здесь вы можете формировать соединения с вашей БД. В этом примере, поскольку мы группируем по ключу, у вас есть pairRDD, которые являются RDD кортежа ключа события + Итерабельная последовательность событий. Вы можете использовать соединение для поиска, слияния , выполнения другой магии, записи сущности в БД. Попробуйте различные конфигурации для пакетной длительности, max Core, maxRatePerPartitions для управления потоком данных на основе того, как ваша БД и ваш кластер обрабатывает нагрузки.