«Parallel.For» для Java?

Мне было интересно, есть ли Parallel.For эквивалент версии .net для Java?

Если есть кто-то, пожалуйста, подайте пример? благодаря!

Думаю, самое близкое:

 ExecutorService exec = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); try { for (final Object o : list) { exec.submit(new Runnable() { @Override public void run() { // do stuff with o. } }); } } finally { exec.shutdown(); } 

Основываясь на комментариях TheLQ, вы должны установить SUM_NUM_THREADS в Runtime.getRuntime().availableProcessors();

Изменить: решил добавить базовую реализацию «Parallel.For»

 public class Parallel { private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final ExecutorService forPool = Executors.newFixedThreadPool(NUM_CORES * 2, new NamedThreadFactory("Parallel.For")); public static  void For(final Iterable elements, final Operation operation) { try { // invokeAll blocks for us until all submitted tasks in the call complete forPool.invokeAll(createCallables(elements, operation)); } catch (InterruptedException e) { e.printStackTrace(); } } public static  Collection> createCallables(final Iterable elements, final Operation operation) { List> callables = new LinkedList>(); for (final T elem : elements) { callables.add(new Callable() { @Override public Void call() { operation.perform(elem); return null; } }); } return callables; } public static interface Operation { public void perform(T pParameter); } } 

Пример использования Parallel.For

 // Collection of items to process in parallel Collection elems = new LinkedList(); for (int i = 0; i < 40; ++i) { elems.add(i); } Parallel.For(elems, // The operation to perform with each item new Parallel.Operation() { public void perform(Integer param) { System.out.println(param); }; }); 

Я предполагаю, что эта реализация действительно больше похожа на Parallel.ForEach

Редактировать. Я поставил это на GitHub, если кому-то это интересно. Параллельно для GitHub

Решение MLaw является очень практичным Parallel.ForEach. Я добавил немного модификации, чтобы сделать Parallel.For.

 public class Parallel { static final int iCPU = Runtime.getRuntime().availableProcessors(); public static  void ForEach(Iterable  parameters, final LoopBody loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List> futures = new LinkedList>(); for (final T param : parameters) { Future future = executor.submit(new Runnable() { public void run() { loopBody.run(param); } }); futures.add(future); } for (Future f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } public static void For(int start, int stop, final LoopBody loopBody) { ExecutorService executor = Executors.newFixedThreadPool(iCPU); List> futures = new LinkedList>(); for (int i=start; i future = executor.submit(new Runnable() { public void run() { loopBody.run(k); } }); futures.add(future); } for (Future f : futures) { try { f.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { } } executor.shutdown(); } } public interface LoopBody  { void run(T i); } public class ParallelTest { int k; public ParallelTest() { k = 0; Parallel.For(0, 10, new LoopBody () { public void run(Integer i) { k += i; System.out.println(i); } }); System.out.println("Sum = "+ k); } public static void main(String [] argv) { ParallelTest test = new ParallelTest(); } } 

Построенный по предложению mlaw, добавьте CountDownLatch. Добавьте chunksize, чтобы уменьшить submit ().

При тестировании с 4 миллионами массивов элементов это дает 5-кратное ускорение по сравнению с последовательным для () на моем процессоре Core i7 2630QM.

 public class Loop { public interface Each { void run(int i); } private static final int CPUs = Runtime.getRuntime().availableProcessors(); public static void withIndex(int start, int stop, final Each body) { int chunksize = (stop - start + CPUs - 1) / CPUs; int loops = (stop - start + chunksize - 1) / chunksize; ExecutorService executor = Executors.newFixedThreadPool(CPUs); final CountDownLatch latch = new CountDownLatch(loops); for (int i=start; i 

Fork join framework в Java 7 предназначен для поддержки параллелизма. Но я не знаю о точном эквиваленте для Parallel.For .

Вот мой вклад в эту тему https://github.com/pablormier/parallel-loops . Использование очень просто:

 Collection upperCaseWords = Parallel.ForEach(words, new Parallel.F() { public String apply(String s) { return s.toUpperCase(); } }); 

Также возможно изменить некоторые аспекты поведения, такие как количество streamов (по умолчанию используется пул кэшированных streamов):

 Collection upperCaseWords = new Parallel.ForEach(words) .withFixedThreads(4) .apply(new Parallel.F() { public String apply(String s) { return s.toUpperCase(); } }).values(); 

Весь код является самодостаточным в одном classе java и не имеет больше зависимостей, чем JDK. Я также рекомендую вам проверить новый способ распараллеливания в стиле функционального стиля с помощью Java 8

Более простой вариант

 // A thread pool which runs for the life of the application. private static final ExecutorService EXEC = Executors.newFixedThreadPool(SOME_NUM_OF_THREADS); //later EXEC.invokeAll(tasks); // you can optionally specify a timeout. 

Существует эквивалент для Parallel.For доступен как расширение Java. Он называется Ateji PX, у них есть бесплатная версия, с которой вы можете играть. http://www.ateji.com/px/index.html

Это точный эквивалент parallel.for и похож на.

For ||

Дополнительные примеры и пояснения по википедии: http://en.wikipedia.org/wiki/Ateji_PX

Закрытая вещь в Java IMO

Синхронизация часто убивает ускорение параллельных циклов. Поэтому для параллельных контуров часто требуются их личные данные и механизм сокращения, чтобы сократить все личные данные streamов, чтобы они содержали один результат.

Поэтому я расширил Parallel.For версию Weimin Xiao механизмом сокращения.

 public class Parallel { public static interface IntLoopBody { void run(int i); } public static interface LoopBody { void run(T i); } public static interface RedDataCreator { T run(); } public static interface RedLoopBody { void run(int i, T data); } public static interface Reducer { void run(T returnData, T addData); } private static class ReductionData { Future future; T data; } static final int nCPU = Runtime.getRuntime().availableProcessors(); public static  void ForEach(Iterable  parameters, final LoopBody loopBody) { ExecutorService executor = Executors.newFixedThreadPool(nCPU); List> futures = new LinkedList>(); for (final T param : parameters) { futures.add(executor.submit(() -> loopBody.run(param) )); } for (Future f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static void For(int start, int stop, final IntLoopBody loopBody) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List> futures = new LinkedList>(); for (int i=start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; futures.add(executor.submit(() -> { for (int j = iStart; j < iStop; j++) loopBody.run(j); })); } for (Future f : futures) { try { f.get(); } catch (InterruptedException | ExecutionException e) { System.out.println(e); } } executor.shutdown(); } public static  void For(int start, int stop, T result, final RedDataCreator creator, final RedLoopBody loopBody, final Reducer reducer) { final int chunkSize = (stop - start + nCPU - 1)/nCPU; final int loops = (stop - start + chunkSize - 1)/chunkSize; ExecutorService executor = Executors.newFixedThreadPool(loops); List> redData = new LinkedList>(); for (int i = start; i < stop; ) { final int iStart = i; i += chunkSize; final int iStop = (i < stop) ? i : stop; final ReductionData rd = new ReductionData(); rd.data = creator.run(); rd.future = executor.submit(() -> { for (int j = iStart; j < iStop; j++) { loopBody.run(j, rd.data); } }); redData.add(rd); } for (ReductionData rd : redData) { try { rd.future.get(); if (rd.data != null) { reducer.run(result, rd.data); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); } } 

Ниже приведен простой пример: параллельный счетчик символов с использованием несинхронизированной карты.

 import java.util.*; public class ParallelTest { static class Counter { int cnt; Counter() { cnt = 1; } } public static void main(String[] args) { String text = "More formally, if this map contains a mapping from a key k to a " + "value v such that key compares equal to k according to the map's ordering, then " + "this method returns v; otherwise it returns null."; Map charCounter1 = new TreeMap(); Map charCounter2 = new TreeMap(); // first sequentially for(int i=0; i < text.length(); i++) { char c = text.charAt(i); Counter cnt = charCounter1.get(c); if (cnt == null) { charCounter1.put(c, new Counter()); } else { cnt.cnt++; } } for(Map.Entry entry: charCounter1.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue().cnt); } // now parallel without synchronization Parallel.For(0, text.length(), charCounter2, // Creator () -> new TreeMap(), // Loop Body (i, map) -> { char c = text.charAt(i); Counter cnt = map.get(c); if (cnt == null) { map.put(c, new Counter()); } else { cnt.cnt++; } }, // Reducer (result, map) -> { for(Map.Entry entry: map.entrySet()) { Counter cntR = result.get(entry.getKey()); if (cntR == null) { result.put(entry.getKey(), entry.getValue()); } else { cntR.cnt += entry.getValue().cnt; } } } ); // compare results assert charCounter1.size() == charCounter2.size() : "wrong size: " + charCounter1.size() + ", " + charCounter2.size(); Iterator> it2 = charCounter2.entrySet().iterator(); for(Map.Entry entry: charCounter1.entrySet()) { Map.Entry entry2 = it2.next(); assert entry.getKey() == entry2.getKey() && entry.getValue().cnt == entry2.getValue().cnt : "wrong content"; } System.out.println("Well done!"); } } 

У меня есть обновленный Java Parallel class, который может выполнять Parallel.For, Parallel.ForEach, Parallel.Tasks и секционированный параллельный цикл. Исходный код выглядит следующим образом:

Примеры использования этих параллельных циклов следующие:

 public static void main(String [] argv) { //sample data final ArrayList ss = new ArrayList(); String [] s = {"a", "b", "c", "d", "e", "f", "g"}; for (String z : s) ss.add(z); int m = ss.size(); //parallel-for loop System.out.println("Parallel.For loop:"); Parallel.For(0, m, new LoopBody() { public void run(Integer i) { System.out.println(i +"\t"+ ss.get(i)); } }); //parallel for-each loop System.out.println("Parallel.ForEach loop:"); Parallel.ForEach(ss, new LoopBody() { public void run(String p) { System.out.println(p); } }); //partitioned parallel loop System.out.println("Partitioned Parallel loop:"); Parallel.ForEach(Parallel.create(0, m), new LoopBody() { public void run(Partition p) { for(int i=p.start; i 

Это то, что я использую для Java 7 и меньше.

Для Java 8 вы можете использовать forEach ()

[ОБНОВИТЬ ]

Параллельный class:

 private static final int NUM_CORES = Runtime.getRuntime().availableProcessors(); private static final int MAX_THREAD = NUM_CORES*2; public static  void For(final Iterable elements, final Operation operation) { if (elements != null) { final Iterator iterator = elements.iterator(); if (iterator.hasNext()) { final Throwable[] throwable = new Throwable[1]; final Callable callable = new Callable() { boolean first = true; @Override public final Void call() throws Exception { if ((first || operation.follow()) && iterator.hasNext()) { T result; result = iterator.next(); operation.perform(result); if (first) { synchronized (this) { first = false; } } } return null; } }; final Runnable runnable = new Runnable() { @Override public final void run() { while (iterator.hasNext()) { try { synchronized (callable) { callable.call(); } if (!operation.follow()) { break; } } catch (Throwable t) { t.printStackTrace(); synchronized (throwable) { throwable[0] = t; } throw new RuntimeException(t); } } } }; final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD); for (int threadIndex=0; threadIndex { void perform(T pParameter); boolean follow(); } 

пример

 @Test public void test() { List longList = new ArrayList(); for (long i = 0; i < 1000000; i++) { longList.add(i); } final List integerList = new LinkedList<>(); Parallel.For((Iterable) longList, new Parallel.Operation() { @Override public void perform(Number pParameter) { System.out.println(pParameter); integerList.add(pParameter.intValue()); } @Override public boolean follow() { return true; } }); for (Number num : integerList) { System.out.println(num); } } 

мониторинг

java- параллельная multithreading

  • Запустить сценарий, который использует несколько сеансов MATLAB
  • Как запускать задачи параллельно в MSBuild
  • Разложение Холецкого с помощью OpenMP
  • отправка блоков 2D-массива в C с использованием MPI
  • Ускорение преобразования точечного изображения в bitmap, является ли OpenMP опцией в C #?
  • Отсутствие повышения производительности после использования openMP в программе оптимизируется для последовательного запуска
  • Есть ли хороший способ извлечь куски данных из streamа java 8?
  • Почему параллельный пакет работает медленнее, чем просто использовать?
  • Как дождаться завершения всех streamов, используя ExecutorService?
  • Как поставить задачу для сна (или задержки) в C # 4.0?
  • Как создать streamи в nodejs
  • Давайте будем гением компьютера.