Ограничить stream предикатом
Есть ли операция streamа Java 8, которая ограничивает (потенциально бесконечный) Stream
до первого элемента, который не соответствует предикату? Что-то, что выглядит как (несуществующий) takeWhile
в примере ниже и будет печатать все числа меньше 10?
IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);
Если такой операции нет, то каким образом ее можно реализовать в общем виде?
- Список групп Java 8 lambdas на карте
- Почему java.util.Collection не реализует новый интерфейс Stream?
- Обратный порядок streamа Java 8
- Поддерживает ли Stream.forEce порядок столкновений последовательных streamов?
- В чем разница между методами map и flatMap в Java 8?
- Параллельные streamи, коллекторы и безопасность streamов
- Как создать stream из массива?
- Самый эффективный способ получить последний элемент streamа
- В streamах Java просматривается действительно только для отладки?
- Можно ли использовать функцию объединителя коллектора для последовательных streamов?
- Java8: сумма значений из определенного поля объектов в списке
- В Java, как эффективно и элегантно потопить потомков узла дерева?
- Как обеспечить порядок обработки в streamах java8?
Такая операция должна быть возможна с помощью Java 8 Stream
, но это не обязательно может быть сделано эффективно – например, вы не можете распараллеливать такую операцию, поскольку вы должны смотреть на элементы по порядку.
API не обеспечивает простой способ сделать это, но, пожалуй, самый простой способ – взять Stream.iterator()
, обернуть Iterator
чтобы реализовать «взятие-в то время», а затем вернуться к Spliterator
а затем Stream
. Или, может быть, оберните Spliterator
, хотя в этой реализации он больше не может быть разделен.
Вот непроверенная реализация takeWhile
на Spliterator
:
static Spliterator takeWhile( Spliterator splitr, Predicate predicate) { return new Spliterators.AbstractSpliterator (splitr.estimateSize(), 0) { boolean stillGoing = true; @Override public boolean tryAdvance(Consumer consumer) { if (stillGoing) { boolean hadNext = splitr.tryAdvance(elem -> { if (predicate.test(elem)) { consumer.accept(elem); } else { stillGoing = false; } }); return hadNext && stillGoing; } return false; } }; } static Stream takeWhile(Stream stream, Predicate predicate) { return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); }
Операции takeWhile
и dropWhile
были добавлены в JDK 9. Ваш примерный код
IntStream .iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);
будет вести себя точно так, как вы ожидаете, при компиляции и запуске под JDK 9.
JDK 9 выпущен. Его можно скачать здесь: http://jdk.java.net/9/
allMatch()
– функция короткого замыкания, поэтому вы можете использовать ее для прекращения обработки. Главный недостаток заключается в том, что вам нужно дважды выполнить свой тест: один раз, чтобы проверить, следует ли его обрабатывать, и снова посмотреть, продолжать ли идти.
IntStream .iterate(1, n -> n + 1) .peek(n->{if (n<10) System.out.println(n);}) .allMatch(n->n < 10);
В ответ на ответ @StuartMarks ответ . Моя библиотека StreamEx имеет операцию takeWhile
которая совместима с текущей реализацией JDK-9. При работе под JDK-9 он просто делегирует реализацию JDK (через MethodHandle.invokeExact
который очень быстро). При работе под JDK-8 будет использоваться реализация «polyfill». Поэтому, используя мою библиотеку, проблема может быть решена следующим образом:
IntStreamEx.iterate(1, n -> n + 1) .takeWhile(n -> n < 10) .forEach(System.out::println);
takeWhile
– одна из функций, предоставляемых библиотекой protonpack .
Stream infiniteInts = Stream.iterate(0, i -> i + 1); Stream finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10); assertThat(finiteInts.collect(Collectors.toList()), hasSize(10));
Вы можете использовать java8 + rxjava .
import java.util.stream.IntStream; import rx.Observable; // Example 1) IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> { System.out.println(n); return n < 10; } ).subscribe() ; // Example 2 IntStream intStream = IntStream.iterate(1, n -> n + 1); Observable.from(() -> intStream.iterator()) .takeWhile(n -> n < 10) .forEach( n -> System.out.println(n));
Обновление: Java 9 Stream
теперь поставляется с методом takeWhile .
Нет необходимости в хаках или других решениях. Просто используйте это!
Я уверен, что это может быть значительно улучшено: (кто-то может сделать его streamобезопасным, возможно)
Stream stream = Stream.iterate(0, n -> n + 1); TakeWhile.stream(stream, n -> n < 10000) .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));
Взломать наверняка … Не изящно – но он работает ~: D
class TakeWhile implements Iterator { private final Iterator iterator; private final Predicate predicate; private volatile T next; private volatile boolean keepGoing = true; public TakeWhile(Stream s, Predicate p) { this.iterator = s.iterator(); this.predicate = p; } @Override public boolean hasNext() { if (!keepGoing) { return false; } if (next != null) { return true; } if (iterator.hasNext()) { next = iterator.next(); keepGoing = predicate.test(next); if (!keepGoing) { next = null; } } return next != null; } @Override public T next() { if (next == null) { if (!hasNext()) { throw new NoSuchElementException("Sorry. Nothing for you."); } } T temp = next; next = null; return temp; } public static Stream stream(Stream s, Predicate p) { TakeWhile tw = new TakeWhile(s, p); Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED); return StreamSupport.stream(split, false); } }
Вот версия, сделанная на ints – как задано в вопросе.
Применение:
StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);
Вот код для StreamUtil:
import java.util.PrimitiveIterator; import java.util.Spliterators; import java.util.function.IntConsumer; import java.util.function.IntPredicate; import java.util.stream.IntStream; import java.util.stream.StreamSupport; public class StreamUtil { public static IntStream takeWhile(IntStream stream, IntPredicate predicate) { return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false); } private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator { private final PrimitiveIterator.OfInt iterator; private final IntPredicate predicate; public PredicateIntSpliterator(IntStream stream, IntPredicate predicate) { super(Long.MAX_VALUE, IMMUTABLE); this.iterator = stream.iterator(); this.predicate = predicate; } @Override public boolean tryAdvance(IntConsumer action) { if (iterator.hasNext()) { int value = iterator.nextInt(); if (predicate.test(value)) { action.accept(value); return true; } } return false; } } }
Это источник, скопированный из JDK 9 java.util.stream.Stream.takeWhile (Predicate). Небольшая разница для работы с JDK 8.
static Stream takeWhile(Stream stream, Predicate p) { class Taking extends Spliterators.AbstractSpliterator implements Consumer { private static final int CANCEL_CHECK_COUNT = 63; private final Spliterator s; private int count; private T t; private final AtomicBoolean cancel = new AtomicBoolean(); private boolean takeOrDrop = true; Taking(Spliterator s) { super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)); this.s = s; } @Override public boolean tryAdvance(Consumer action) { boolean test = true; if (takeOrDrop && // If can take (count != 0 || !cancel.get()) && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Comparator getComparator() { return s.getComparator(); } @Override public void accept(T t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } @Override public Spliterator trySplit() { return null; } } return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close); }
Пойдите, чтобы получить библиотеку AbacusUtil . Он предоставляет точный API, который вы хотите, и многое другое:
IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);
Декларация: Я разработчик AbacusUtil.
Вы не можете прервать stream, за исключением операции с коротким замыканием, которая оставила бы некоторые значения streamа необработанными независимо от их значения. Но если вы просто хотите избежать операций над streamом, вы можете добавить преобразование и фильтр в stream:
import java.util.Objects; class ThingProcessor { static Thing returnNullOnCondition(Thing thing) { return( (*** is condition met ***)? null : thing); } void processThings(Collection thingsCollection) { thingsCollection.stream() *** regular stream processing *** .map(ThingProcessor::returnNullOnCondition) .filter(Objects::nonNull) *** continue stream processing *** } } // class ThingProcessor
Это преобразует stream вещей в нулевое значение, когда вещи соответствуют некоторому условию, а затем отфильтровывает нули. Если вы готовы побаловать побочные эффекты, вы можете установить значение условия true, как только произойдет что-то, поэтому все последующие вещи отфильтровываются независимо от их значения. Но даже если вы не можете сэкономить много (если не совсем) обработку, отфильтровывая значения из streamа, который вы не хотите обрабатывать.
На самом деле есть два способа сделать это в Java 8 без каких-либо дополнительных библиотек или с использованием Java 9.
Если вы хотите печатать цифры от 2 до 20 на консоли, вы можете сделать это:
IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);
или
IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);
Вывод в обоих случаях:
2 4 6 8 10 12 14 16 18 20
Никто еще не упомянул anyMatch . Это и есть причина этого сообщения.
Даже у меня было аналогичное требование – вызовите веб-сервис, если он не работает, повторите попытку 3 раза. Если это не удается даже после этих многочисленных испытаний, отправьте уведомление по электронной почте. После anyMatch()
пришел как спаситель. Мой пример кода выглядит следующим образом. В следующем примере, если метод webServiceCall возвращает true в первой итерации, stream не выполняет итерацию по мере того, как мы вызвали anyMatch()
. Полагаю, это то, что вы ищете.
import java.util.stream.IntStream; import io.netty.util.internal.ThreadLocalRandom; class TrialStreamMatch { public static void main(String[] args) { if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){ //Code for sending email notifications } } public static boolean webServiceCall(int i){ //For time being, I have written a code for generating boolean randomly //This whole piece needs to be replaced by actual web-service client code boolean bool = ThreadLocalRandom.current().nextBoolean(); System.out.println("Iteration index :: "+i+" bool :: "+bool); //Return success status -- true or false return bool; }
У меня есть еще одно быстрое решение, реализуя это (что на самом деле нечисто, но вы поняли):
public static void main(String[] args) { System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15) .map(o -> o.toString()).collect(Collectors.joining(", "))); } static interface TerminatedStream { Stream terminateOn(T e); } static class StreamUtil { static TerminatedStream iterate(T seed, UnaryOperator op) { return new TerminatedStream () { public Stream terminateOn(T e) { Builder builder = Stream. builder().add(seed); T current = seed; while (!current.equals(e)) { current = op.apply(current); builder.add(current); } return builder.build(); } }; } }
Если у вас есть другая проблема, может понадобиться другое решение, но для вашей текущей проблемы я просто буду:
IntStream .iterate(1, n -> n + 1) .limit(10) .forEach(System.out::println);
Может быть немного не по теме, но это то, что у нас есть для List
а не Stream
.
Сначала вам нужно использовать метод take
. Эти методы принимают первые n
элементов:
static List take(List l, int n) { if (n <= 0) { return newArrayList(); } else { int takeTo = Math.min(Math.max(n, 0), l.size()); return l.subList(0, takeTo); } }
он просто работает как scala.List.take
assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3)); assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5)); assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1)); assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));
теперь будет довольно просто написать метод takeWhile
основанный на
static List takeWhile(List l, Predicate p) { return l.stream(). filter(p.negate()).findFirst(). // find first element when p is false map(l::indexOf). // find the index of that element map(i -> take(l, i)). // take up to the index orElse(l); // return full list if p is true for all elements }
он работает следующим образом:
assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));
эта реализация повторяет список частично в течение нескольких раз, но не добавляет добавления O(n^2)
операций. Надеюсь, это приемлемо.
Вот моя попытка использовать только библиотеку Java Stream.
IntStream.iterate(0, i -> i + 1) .filter(n -> { if (n < 10) { System.out.println(n); return false; } else { return true; } }) .findAny();