Задержка RxJava для каждого элемента списка, испускаемого

Я изо всех сил пытаюсь реализовать то, что, как я предполагал, будет довольно простым в Rx.

У меня есть список элементов, и я хочу, чтобы каждый элемент испускался с задержкой.

Кажется, оператор Rx delay () просто сдвигает излучение всех элементов на указанную задержку, а не на каждый отдельный элемент.

Вот несколько тестовых кодов. Он группирует элементы в списке. Затем каждая группа должна иметь задержку перед испусканием.

Observable.range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .delay(50, TimeUnit.MILLISECONDS) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first(); 

Результат:

 154ms [5] 155ms [2] 155ms [1] 155ms [3] 155ms [4] 

Но то, что я ожидаю увидеть, выглядит примерно так:

 174ms [5] 230ms [2] 285ms [1] 345ms [3] 399ms [4] 

Что я делаю не так?

Один из способов сделать это – использовать zip для объединения наблюдаемого с наблюдаемым Interval для задержки вывода.

 Observable.zip(Observable.range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> g.toList()), Observable.interval(50, TimeUnit.MILLISECONDS), (obs, timer) -> obs) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first(); 

Простейший способ сделать это, похоже, просто использовать concatMap и обернуть каждый элемент в задержанном Obserable.

 long startTime = System.currentTimeMillis(); Observable.range(1, 5) .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS)) .doOnNext(i-> System.out.println( "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms")) .toCompletable().await(); 

Печать:

 Item: 1, Time: 51ms Item: 2, Time: 101ms Item: 3, Time: 151ms Item: 4, Time: 202ms Item: 5, Time: 252ms 

Просто используйте простой подход для выделения каждого элемента в коллекции с интервалом:

 Observable.just(1,2,3,4,5) .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item) .subscribe(System.out::println); 

Каждый элемент будет выходить каждые 500 миллисекунд

Вы можете реализовать пользовательский оператор rx, такой как MinRegularIntervalDelayOperator а затем использовать его с функцией lift

 Observable.range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .lift(new MinRegularIntervalDelayOperator(50L)) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first(); 

Чтобы задержать каждую группу, вы можете изменить flatMap() чтобы вернуть Observable, что задерживает излучение группы.

 Observable .range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> Observable .timer(50, TimeUnit.MILLISECONDS) .flatMap(t -> g.toList()) ) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first(); 

Не совсем чистый способ заключается в изменении задержки с помощью итерации с помощью оператора .delay (Func1).

 Observable.range(1, 5) .delay(n -> n*50) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first(); 

Существует другой способ сделать это, используя concatMap, поскольку concatMap возвращает наблюдаемые исходные элементы. поэтому мы можем добавить задержку на наблюдаемое.

вот что я пробовал.

 Observable.range(1, 5) .groupBy(n -> n % 5) .concatMap(integerIntegerGroupedObservable -> integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS)) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first(); 

Я думаю, это именно то, что вам нужно. Посмотрите:

 long startTime = System.currentTimeMillis(); Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS) .timestamp(TimeUnit.MILLISECONDS) .subscribe(emitTime -> { System.out.println(emitTime.time() - startTime); }); 

Вы можете использовать

  Observable.interval(1, TimeUnit.SECONDS) .map(new Function() { @Override public Integer apply(Long aLong) throws Exception { return aLong.intValue() + 1; } }) .startWith(0) .take(listInput.size()) .subscribe(new Consumer() { @Override public void accept(Integer index) throws Exception { Log.d(TAG, "---index of your list --" + index); } }); 

Этот код выше не дублирует значение (индекс). “Я уверен”

Для пользователей kotlin я написал функцию расширения для подхода «zip with interval»

 fun  Observable.delayEach(interval: Long, timeUnit: TimeUnit): Observable = Observable.zip(this, Observable.interval(interval, timeUnit), { item, _ -> item }) 

Он работает одинаково, но это делает его многоразовым. Пример:

 Observable.range(1, 5) .delayEach(1, TimeUnit.SECONDS) 

Я думаю, вы этого хотите:

 Observable.range(1, 5) .delay(50, TimeUnit.MILLISECONDS) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first(); 

Таким образом, он будет задерживать числа, входящие в группу, а не задерживать сокращенный список на 5 секунд.

Вы можете добавить задержку между испущенными элементами, используя flatMap, maxConcurrent и delay ()

Вот пример – испустите 0..4 с задержкой

 @Test fun testEmitWithDelays() { val DELAY = 500L val COUNT = 5 val latch = CountDownLatch(1) val startMoment = System.currentTimeMillis() var endMoment : Long = 0 Observable .range(0, COUNT) .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1 .subscribe( { println("... value: $it, ${System.currentTimeMillis() - startMoment}") }, {}, { endMoment = System.currentTimeMillis() latch.countDown() }) latch.await() assertTrue { endMoment - startMoment >= DELAY * COUNT } } ... value: 0, 540 ... value: 1, 1042 ... value: 2, 1544 ... value: 3, 2045 ... value: 4, 2547 

вы сможете добиться этого, используя оператор Timer . Я пробовал с delay но не смог добиться желаемого результата. Обратите внимание, что вложенные операции выполняются в операторе flatmap .

  Observable.range(1,5) .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS) .map(y -> x)) // attach timestamp .timestamp() .subscribe(timedIntegers -> Log.i(TAG, "Timed String: " + timedIntegers.value() + " " + timedIntegers.time())); 
Давайте будем гением компьютера.