Запуск запускаемого из модуля записи пакета Spring partitioned step

У меня есть пакетное задание Spring, состоящее из секционированного шага, и секционированный шаг выполняет обработку кусками.

Можно ли в дальнейшем запускать новые потоки ( реализацию Runnable) из метода,public void write(List<? extends VO> itemsToWrite)?

В основном, writer здесь пишет индексы, используя Lucene, и так как у writer есть Listchunk-sizeэлементы, я думал разделить их Listна сегменты и передать каждый сегмент новому Runnable.

Это хороший подход?

Я закодировал образец, и он работает в большинстве случаев, но застревает несколько раз.

Есть ли что-то, о чем мне нужно беспокоиться? Или для этого есть что-то встроенное в весеннюю партию?

Я не хочу, чтобы запись происходила одним потоком для целого куска. Я хочу еще разделить кусок.

Lucene IndexWriterявляется потокобезопасным, и подход указан здесь

Образец Code-Writer получает a Listэлементов, для которых я открываю потоки из пула потоков? Будут ли какие-либо проблемы, даже если я жду завершения пула для куска,

@Override
    public void write(List<? extends IndexerInputVO> inputItems) throws Exception {


        int docsPerThread = Constants.NUMBER_OF_DOCS_PER_INDEX_WRITER_THREADS;
        int docSize = inputItems.size();
        int remainder = docSize%docsPerThread;
        int poolSize = docSize/docsPerThread;

        ExecutorService executor = Executors.newFixedThreadPool(poolSize+1);


        int fromIndex=0;
        int toIndex = docsPerThread;

        if(docSize < docsPerThread){
            executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems));
        }else{
            for(int i=1;i<=poolSize;i++){
                executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
                fromIndex+=docsPerThread;
                toIndex+=docsPerThread;
            }

            if(remainder != 0){
                toIndex=docSize;
                executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex)));
            }
        }

        executor.shutdown();

        while(executor.isTerminated()){
            ;
        }

1 ответ

  1. Я не уверен, что запуск новых потоков в writer это хорошая идея.
    Эти потоки выходят за рамки spring batch framework, поэтому вам нужно будет реализовать политику выключения и отмены для выше. Если обработка одного сегмента завершится неудачно, это может привести к сбою всей очереди.

    В качестве альтернативного подхода я могу предложить продвижение пользовательских сегментов списка от писателя к следующему шагу, как описано в официальных документах passingDataToFutureSteps