Java: как масштабировать streamи в соответствии с ядрами процессора?

Я не хороший Java-программист, это просто мое хобби, но я очень хочу узнать больше, чем средний материал.

Я хочу решить математическую проблему с несколькими streamами в java. моя математическая проблема может быть разделена на рабочие единицы, которые я хочу решить в нескольких streamах.

но я не хочу иметь фиксированное количество streamов, работающих на нем, но вместо этого соответствующее количество streamов на количество ядер процессора. и моя проблема заключается в том, что я не мог найти для этого легкий учебник в Интернете. все, что я нашел, – это примеры с фиксированными streamами.

Так вы могли бы помочь мне со ссылкой на хороший учебный курс или могли бы дать мне простой и хороший пример? Это было бы очень приятно 🙂

Вы можете определить количество процессов, доступных для виртуальной машины Java, используя статический метод Runtime, доступныепроцессоры . После того, как вы определили количество доступных процессоров, создайте это количество streamов и соответственно разделите свою работу.

Обновление . Чтобы уточнить, Thread – это просто объект в Java, поэтому вы можете создать его так же, как и любой другой объект. Итак, предположим, что вы вызываете вышеупомянутый метод и обнаруживаете, что он возвращает 2 процессора. Потрясающие. Теперь вы можете создать цикл, который генерирует новый stream, и разделяет работу на этот stream и запускает stream. Вот несколько psuedocode, чтобы продемонстрировать, что я имею в виду:

int processors = Runtime.getRuntime().availableProcessors(); for(int i=0; i < processors; i++) { Thread yourThread = new AThreadYouCreated(); // You may need to pass in parameters depending on what work you are doing and how you setup your thread. yourThread.start(); } 

Дополнительные сведения о создании собственного streamа см. В этом уроке . Кроме того, вы можете посмотреть пул streamов для создания streamов.

Вероятно, вы захотите посмотреть на инфраструктуру java.util.concurrent для этого материала. Что-то вроде:

 ExecutorService e = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // Do work using something like either e.execute(new Runnable() { public void run() { // do one task } }); 

или

  Future future = pool.submit(new Callable() { public String call() throws Exception { return null; } }); future.get(); // Will block till result available 

Это намного лучше, чем справляться с вашими пулами streamов и т. Д.

Дуг Ли (автор параллельного пакета) имеет этот документ, который может иметь значение: http://gee.cs.oswego.edu/dl/papers/fj.pdf

В Java SE 7 добавлена ​​структура Fork Join. Ниже приведены несколько ссылок:

http://www.ibm.com/developerworks/java/library/j-jtp11137/index.html Статья Брайана Гетца

http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

Опция 1:

newWorkStealingPool от Executors

 public static ExecutorService newWorkStealingPool() 

Создает пул streamов, обрабатывающих работу, используя все доступные процессоры в качестве целевого уровня параллелизма.

С помощью этого API вам не нужно передавать количество ядер в ExecutorService .

Реализация этого API из grepcode

 /** * Creates a work-stealing thread pool using all * {@link Runtime#availableProcessors available processors} * as its target parallelism level. * @return the newly created thread pool * @see #newWorkStealingPool(int) * @since 1.8 */ public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } 

Вариант 2:

newFixedThreadPool API от Executors или other newXXX constructors , который возвращает ExecutorService

 public static ExecutorService newFixedThreadPool(int nThreads) 

замените nThreads на Runtime.getRuntime().availableProcessors()

Вариант 3:

ThreadPoolExecutor

 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) 

передать Runtime.getRuntime().availableProcessors() как параметр для maximumPoolSize .

В classе Runtime существует метод, называемый доступнымиProcessors (). Вы можете использовать это, чтобы выяснить, сколько у вас процессоров. Поскольку ваша программа связана с процессором, вы, вероятно, захотите иметь (не более) один stream на каждый ansible CPU.

Стандартным способом является метод Runtime.getRuntime (). AvailableProcessors (). На большинстве стандартных процессоров вы вернете оптимальный подсчет streamа (который не является фактическим количеством ядер процессора). Поэтому это то, что вы ищете.

Пример:

 ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 

Не забудьте закрыть службу исполнителя, как это (или ваша программа не выйдет):

 service.shutdown(); 

Вот только краткое описание того, как настроить будущий MT-код (offtopic, для иллюстрации):

 CompletionService completionService = new ExecutorCompletionService(service); ArrayList> futures = new ArrayList>(); for (String computeMe : elementsToCompute) { futures.add(completionService.submit(new YourCallableImplementor(computeMe))); } 

Затем вам нужно отслеживать, сколько ожидаемых результатов и получить их так:

 try { int received = 0; while (received < elementsToCompute.size()) { Future resultFuture = completionService.take(); YourCallableImplementor result = resultFuture.get(); received++; } } finally { service.shutdown(); } 
Давайте будем гением компьютера.