Обратное давление, наблюдаемое на основе дефицита ресурсов

В RxJava 1 / RxScala, как я могу дросселировать/противодавить источник, наблюдаемый в следующей ситуации?

def fast: Observable[Foo] // Supports backpressure

def afterExpensiveOp: Observable[Bar] = 
    fast.flatMap(foo => Observable.from(expensiveOp(foo))

// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
   if(noResources()) Future.failed(new OutOfResourcesException())
   else Future { Bar() }
}

Возможное решение-просто блокировать пока. Что работает, но это очень неэлегантно и предотвращает несколько одновременных запросов:

def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo => 
   Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)

1 ответ

  1. flatMap имеет параметр для ограничения числа параллельных подписчиков. Если вы используете эту карту flatMap заботится о обратном давлении для вас.

    def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))