Параллельные streamи, коллекторы и безопасность streamов

См. Простой пример ниже, который подсчитывает количество вхождений каждого слова в список:

Stream words = Stream.of("a", "b", "a", "c"); Map wordsCount = words.collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

В конце слова wordsCount является {a=2, b=1, c=1} .

Но мой stream очень большой, и я хочу распараллелить работу, поэтому я пишу:

 Map wordsCount = words.parallel() .collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

Однако я заметил, что wordsCount – это простой HashMap поэтому я задаюсь вопросом, нужно ли мне явно запрашивать параллельную карту для обеспечения безопасности streamов:

 Map wordsCount = words.parallel() .collect(toConcurrentMap(s -> s, s -> 1, (i, j) -> i + j)); 

Можно ли безопасно использовать неконкурентные коллекторы с параллельным streamом или использовать только параллельные версии при сборе из параллельного streamа?

    Можно ли безопасно использовать неконкурентные коллекторы с параллельным streamом или использовать только параллельные версии при сборе из параллельного streamа?

    Безопасно использовать неконкурентный коллектор в операции collect параллельного streamа.

    В спецификации интерфейса Collector в разделе с полдюжины пулевых точек находится следующее:

    Для неконкурентных коллекционеров любой результат, возвращаемый функциями поставщика результата, аккумулятора или объединителя, должен быть последовательно ограниченным streamом. Это позволяет собирать коллекцию параллельно, не имея коллектора, нуждающегося в реализации любой дополнительной синхронизации. Реализация сокращения должна управлять тем, что вход правильно разбит на разделы, что разделы обрабатываются изолированно, а объединение происходит только после завершения накопления.

    Это означает, что различные реализации, предоставляемые classом Collectors могут использоваться с параллельными streamами, хотя некоторые из этих реализаций могут быть не параллельными коллекторами. Это также относится к любому из ваших собственных несовпадающих коллекционеров, которые вы можете реализовать. Они могут безопасно использоваться с параллельными streamами, если ваши коллекторы не мешают источнику streamа, свободны от побочных эффектов, независимы от заказа и т. Д.

    Я также рекомендую прочитать раздел Mutable Reduction документации пакета java.util.stream. В середине этого раздела приведен пример, который считается параллелизуемым, но который собирает результаты в ArrayList , который не является streamобезопасным.

    Способ, которым это работает, заключается в том, что параллельный stream, заканчивающийся в неконкурентном коллекторе, гарантирует, что разные streamи всегда работают в разных экземплярах промежуточных коллекций результатов. Вот почему у коллекционера есть функция Supplier , для создания как можно большего количества промежуточных коллекций, так как есть streamи, поэтому каждый stream может накапливаться в отдельности. Когда промежуточные результаты должны быть объединены, они безопасно передаются между streamами, и в любой момент времени только один stream объединяет любую пару промежуточных результатов.

    Все коллекторы, если они следуют правилам в спецификации, безопасны для запуска параллельно или последовательно. Параллельная готовность является ключевой частью дизайна здесь.

    Различие между параллельными и неконкурентными коллекторами связано с подходом к распараллеливанию.

    Обычный (неконкурентный) сборщик работает путем слияния суб-результатов. Таким образом, источник разбивается на кучу кусков, каждый fragment собран в контейнер результатов (например, список или карту), а затем суб-результаты объединяются в контейнер большего результата. Это безопасно и сохраняет порядок, но для некоторых видов контейнеров, особенно карт, может быть дорого, так как слияние двух карт по ключу часто бывает дорогостоящим.

    Параллельный коллектор вместо этого создает один контейнер результатов, операции вставки которого гарантируют streamобезопасность и взрывают в него элементы из нескольких streamов. С очень параллельным контейнером результата, таким как ConcurrentHashMap, этот подход может работать лучше, чем слияние обычных HashMaps.

    Таким образом, параллельные коллекторы строго оптимизируют свои обычные коллеги. И они не приходят без затрат; потому что элементы взорваны из многих streamов, параллельные коллекторы обычно не могут сохранить порядок встреч. (Но часто вам все равно – при создании гистограммы подсчета слов вам все равно, какой экземпляр «foo» вы считали первым.)

    Безопасно использовать неконкурентные коллекции и неатомарные счетчики с параллельными streamами.

    Если вы посмотрите на документацию Stream :: collect , вы найдете следующий абзац:

    Подобно reduce(Object, BinaryOperator) , операции сбора могут быть распараллелены без дополнительной синхронизации.

    А для метода Stream :: reduce :

    Хотя это может показаться более окольным способом выполнения агрегации по сравнению с простому изменением общего количества в цикле, операции сокращения распараллеливаются более грациозно, без необходимости дополнительной синхронизации и с значительно меньшим риском сбоев данных.

    Это может быть немного удивительно. Тем не менее, обратите внимание, что параллельные streamи основаны на модели объединения fork . Это означает, что одновременное выполнение работает следующим образом:

    • разделить на две части с примерно одинаковым размером
    • обрабатывать каждую часть по отдельности
    • собрать результаты обеих частей и объединить их в один результат

    На втором этапе три этапа рекурсивно применяются к подпоследовательностям.

    Пример должен сделать это понятным.

     IntStream.range(0, 4) .parallel() .collect(Trace::new, Trace::accumulate, Trace::combine); 

    Единственной целью classа Trace является запись вызовов конструктора и метода. Если вы выполните этот оператор, он напечатает следующие строки:

     thread: 9 / operation: new thread: 10 / operation: new thread: 10 / operation: accumulate thread: 1 / operation: new thread: 1 / operation: accumulate thread: 1 / operation: combine thread: 11 / operation: new thread: 11 / operation: accumulate thread: 9 / operation: accumulate thread: 9 / operation: combine thread: 9 / operation: combine 

    Вы можете видеть, что были созданы четыре объекта Trace , накопление было вызвано один раз на каждом объекте, и объединение было использовано три раза, чтобы объединить четыре объекта в один. Доступ к каждому объекту возможен только по одному streamу за раз. Это делает код streamобезопасным, и то же самое относится к методу Collectors :: toMap .

    Давайте будем гением компьютера.