Слияние документов в elasticsearch haoop, создание пар значений нескольких ключей с помощью es-sparksql

В настоящее время elasticsearch hadoop преобразует dataset/rdd в документы с сопоставлением 1 к 1, т. е. 1 строка в dataset преобразуется в один документ. В нашем сценарии мы делаем что-то вроде этого

for ‘ uni

PUT spark/docs/1
{
"_k":"one",
"_k":"two",
"_k":"three" // large sets , we dont need to store much, we just want to map multiple keys to single value.
"_v" :"key:
}

GET spark/docs/_search
{
"query" : {
  "constant_score" : {
    "filter" : {
      "terms" : {
        "_k" : ["one"] // all values work.
        }
      }
    }
  }
}

Любое предложение, как мы можем реализовать выше, если есть лучшая стратегия, пожалуйста, предложите.

Ниже код не работает, но я пытаюсь достичь чего-то вроде ниже в теории

  final Dataset<String> df = spark.read().csv("src/main/resources/star2000.csv").select("_c1").dropDuplicates().as(Encoders.STRING());
  final Dataset<ArrayList> arrayListDataset = df.mapPartitions(new MapPartitionsFunction<String, ArrayList>() {
        @Override
        public Iterator<ArrayList> call(Iterator<String> iterator) throws Exception {
            ArrayList<String> s = new ArrayList<>();
            iterator.forEachRemaining(it -> s.add(it));
            return Iterators.singletonIterator(s);
        }
    }, Encoders.javaSerialization(ArrayList.class));
  JavaEsSparkSQL.saveToEs(arrayListDataset,"spark/docs");

Я не хочу собирать полный набор данных в одном списке, так как это может привести к OOM, поэтому план состоит в том, чтобы получить список для каждого раздела и индексировать его по ключу раздела.

2 ответа

  1. Это поможет разместить некоторый исходный код, который вы используете, вопрос также не ясно, чего вы пытаетесь достичь.

    Я предполагаю, что вы хотели бы разместить массив в поле ключа (_k) и другое значение в поле значения (_v)?

    Таким образом, Вы можете создать JavaPairRDD и сохранить его в Elasticsearch, что-то вроде ниже:

    String[] keys = {"one", "two", "three"};
    String value = "key";
    
    List<Tuple2<String[],String>> l = new ArrayList<Tuple2<String[],String>>();
    l.add(new Tuple2<String[],String>(keys, value));
    
    JavaPairRDD<String[],String> R = ctx.parallelizePairs(l);
    
    JavaEsSpark.saveToEs(R,"index/type");
    
  2. Использование pojo как

    Document{
       String[] vals,
       String key
    } 
    

    и С ниже фрагментом кода

    Dataset<String> df = spark.sqlContext().read().parquet(params.getPath())
                            .select(params.getColumnName())
                            .as(Encoders.STRING());
    
    final Dataset<Document> documents = df.coalesce(numPartitions).mapPartitions(iterator -> {
           final Set<String> set = Sets.newHashSet(iterator);
           Document d = new Document(set.toArray(new String[set.size()]),"key1");
           return Iterators.singletonIterator(d);}, Encoders.bean(Document.class));
    JavaEsSparkSQL.saveToEs(documents, params.getTableIndexName() + "/"+params.getTableIndexType());
    

    Это создает над индексом массива.