Многопоточная сортировка с использованием пула потоков на Java

В данном посте будет рассказано, как реализовать сортировку на Java c использованием ExecutorService. Общая суть сортировки в следующем:

  1. Массив разбивается на части
  2. Каждая часть массива сортируется
  3. Идем по упорядоченным массивам, сливаем их в один

Здесь применяются идеи сортировки слиянием, но массив разбивается только на две части (рекурсия не используется).

Для слияния можно использовать следующую функцию:

public static String[] merge( String[] leftPart, String[] rightPart ) {         int cursorLeft = 0, cursorRight = 0, counter = 0;         String[] merged = new String[leftPart.length + rightPart.length];         while ( cursorLeft < leftPart.length && cursorRight < rightPart.length ) {             if (leftPart[cursorLeft].compareTo(rightPart[cursorRight] ) < 0 ) {                 merged[counter] = leftPart[cursorLeft];                 cursorLeft+=1;             } else {                 merged[counter] = rightPart[cursorRight];                 cursorRight+=1;             }             counter++;         }         if ( cursorLeft < leftPart.length ) {             System.arraycopy( leftPart, cursorLeft, merged, counter, merged.length - counter );         }         if ( cursorRight < rightPart.length ) {             System.arraycopy( rightPart, cursorRight, merged, counter, merged.length - counter );         }         return merged;     }

Код функции слияния взят отсюда.

Суть слияния в следующем: в начале указатели находятся на первом элементе для обоих массивов. Далее значения элементов, соответствующих позициям указателя сравниваются и указатель для меньшего элемента сдвигается на следующий элемент, сам элемент добавляется в результирующий массив. Цикл продолжается до тех пор, пока не дойдем до конца одного из массивов, тогда оставшаяся часть второго массива будет скопирована в конец результирующего массива. Таким образом, на выходе получаем отсортированный массив.

Также был создан класс для многопоточной сортировки, в нем был создан метод run, который выполняется, когда к объекту типа Thread применяется метод start(). В нашем случае за это будет отвечать executorService. Приведем здесь код класса merge, объекты которого и будут создаваться для реализации многопоточной сортировки:

 public class Merger implements Runnable{     private String[] unsorted, sorted;     public Merger(String[] unsorted) {         this.unsorted = unsorted;     }      public void run() {         int middle;         String[] left, right;         // array is sorted         if ( unsorted.length <= 1 ) {             sorted = unsorted;         } else {             //             middle = unsorted.length / 2;             left = new String[middle];             right = new String[unsorted.length - middle];             //split array on two             System.arraycopy(unsorted, 0, left, 0, middle);             System.arraycopy(unsorted, middle, right, 0, unsorted.length - middle);             SimpleMerger leftSort = new SimpleMerger(left);             SimpleMerger rightSort = new SimpleMerger(right);             leftSort.sort();             rightSort.sort();             //sort and merge             sorted = SimpleMerger.merge(leftSort.getSorted(), rightSort.getSorted());          }         }     public String[] getSorted() {         return sorted;     } } 

Для сортировки частей массива была использована встроенная в java сортировка. Далее приведен код для сортировки с помощью пула потоков. Проводятся замеры времени для многопоточного и обычного варианта (спойлер: многопоточная дает ускорение только на большом объеме данных):

 public static void main(String[] args) throws Exception {         int arrSize = 1_000_000_0;         String[] unsorted = new String[arrSize];         Random randomizer = new Random();          for ( int i = 0; i < arrSize; i++ ) {             unsorted[i] = Integer.toString(randomizer.nextInt( 100_000_0 ));         }          List<Future> futures = new ArrayList<>();         int processorCount = Runtime.getRuntime().availableProcessors();         int batchSize = arrSize/processorCount;         long startTime = System.currentTimeMillis();         // create ExecutorService         final ExecutorService executorService = Executors                 .newFixedThreadPool(Runtime.getRuntime().availableProcessors());         ArrayList<Merger> mergers = new ArrayList<>();         for (int i = 0; i < processorCount; i++) {             String[] part = new String[batchSize];             System.arraycopy( unsorted, i*batchSize, part, 0, batchSize );             // create merger             Merger merger = new Merger(part);              futures.add(executorService.submit(merger));             //add merger to list to get result in future             mergers.add(merger);         }         for (Future<Double> future : futures) {             future.get();         }         executorService.shutdown();         int j = 0;         // array to get result         String[] mergered = new String[arrSize];         // sequential merge of all part of array         for (Merger merger:mergers){             if (j == 0) {                 mergered = merger.getSorted();                 j+=1;             }         else{                 String[] part = merger.getSorted();                 mergered = SimpleMerger.merge( mergered, part);             }    }         long timeSpent = System.currentTimeMillis() - startTime;         System.out.println("Program execution time is " + timeSpent + " milliseconds");         if (arrSize < 100) {System.out.print(Arrays.toString(mergered));}         startTime = System.currentTimeMillis();         Arrays.sort(unsorted);         timeSpent = System.currentTimeMillis() - startTime;         System.out.println("\n Program (non parallel )execution time is " + timeSpent + " milliseconds");     }

В начале основной функции массив заполняется произвольными строками, которые содержат числа от 0 до 10000000. В качестве количества потоков берется количество доступное на устройстве. Переменная batchSize отвечает за размерность массивов для сортировки в параллельном режиме. Затем создается executorService с фиксированным количеством потоков.

Для каждого потока создается свой объект класса merge, затем этот ставим задачу по сортировке в очередь на выполнение. С помощью future ждем, пока все посчитается, собираем все отсортированные части массива и сливаем их по очереди в результирующий массив. Останавливаем executorService и можем посмотреть на временные затраты последовательного и параллельного варианта реализации.

Код лежит здесь

FavoriteLoadingДобавить в избранное
Posted in Без рубрики

Добавить комментарий

Ваш e-mail не будет опубликован. Обязательные поля помечены *