Subscribed unsubscribe Subscribe Subscribe

Kengo's blog

Technical articles about original projects, JVM, Static Analysis and JavaScript.

「並行コンピューティング技法」より並列クイックソートを実装してみた

ここ数カ月、並行処理およびその実装方法について地道に調べていました。具体的な内容としては、java.util.concurrentパッケージの使い方やデザインパターンの理解、サンプルプログラムの作成と性能テストです。
こうした自主学習の総まとめ*1として、オライリーの「並行コンピューティング技法」を読みはじめました。現在ソートの章まで読み進めています。

並行コンピューティング技法 ―実践マルチコア/マルチスレッドプログラミング

並行コンピューティング技法 ―実践マルチコア/マルチスレッドプログラミング


この本には理論と実践をちょうど良い配分で混ぜている印象を持っています。
私は計算機科学分野を学んでいないためRAM(Random Access Machine)をはじめとした理論モデルに縁がなく、とても興味深く読むことができました。意外だったのはソートの章にある、バブルソートを並列処理に置き換えようという箇所です。「えっ、無駄じゃないの?」と思いながら読んでみると、並列ウェイブフロント法という初めて聞くアルゴリズムが出てきて勉強になりました。無駄には変わりないものの、新しい着眼点を得ることができました。

Java版並列クイックソートの実装

さてソートの章で並列クイックソートを紹介している箇所があったため、それを参考にJavaで実装してみました。以下がそのコードです。
クイックソートが対象とする領域(Partitionクラス)をBlockingQueueにて管理し、ExecutorService内にプールしたスレッドに読み書きさせるものです。producer-consumerパターンに似ていますが、producerとconsumerを同じSortWorkerクラスが担っています。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

// -Xm256m -Xms256m
// -Xrunhprof:cpu=times,file=hprof_1threads.txt
public class ParallelQuickSort {
	private static final int TEST_NUM = 100;
	private static final int THREADS_NUM = 2;
	private static final int ARRAY_LENGTH = 500000;

	public static void main(String[] args) {
		final ExecutorService executor = Executors.newFixedThreadPool(THREADS_NUM);
		final int[] array = createRandomArray();

		System.out.printf("ParallelSort(%d threads) [millis second]%n", THREADS_NUM);
		try {
			for (int i = 0; i < TEST_NUM; ++i) {
				final long startParallel = System.currentTimeMillis();

				new ParallelQuickSort(array.clone()).sort(executor, THREADS_NUM);

				System.out.println(System.currentTimeMillis() - startParallel);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			executor.shutdown();
		}
	}

	private static int[] createRandomArray() {
		final int[] array = new int[ARRAY_LENGTH];
		final Random r = new Random();
		for (int i = 0; i < array.length; ++i) {
			array[i] = r.nextInt();
		}
		return array;
	}

	private final int[] target;
	private final AtomicInteger fixed = new AtomicInteger(0);
	private final BlockingQueue<Partition> tasks = new LinkedBlockingQueue<Partition>(Integer.MAX_VALUE);

	public ParallelQuickSort(int[] target) {
		this.target = target;
	}

	public void sort(ExecutorService executor, int threadsNum) throws InterruptedException {
		tasks.offer(new Partition(0, target.length));

		final List<Callable<Object>> workers = new ArrayList<Callable<Object>>(threadsNum);
		for (int i = 0; i < threadsNum; ++i) {
			workers.add(Executors.callable(new SortWorker()));
		}

		executor.invokeAll(workers);
	}

	class SortWorker implements Runnable {
		public void run() {
			while (fixed.get() < target.length) {
				final Partition partition = tasks.poll();
				if (partition == null) continue;

				fixed.incrementAndGet();

				final int pivotIndex = sort(partition.begin, partition.end);
				if (partition.begin != pivotIndex)
					while (!tasks.offer(new Partition(partition.begin, pivotIndex))) {};
				if (pivotIndex + 1 != partition.end)
					while (!tasks.offer(new Partition(pivotIndex + 1, partition.end))) {};
			}
		}

		protected int sort(int begin, int end) {
			final int pivot = target[begin];
			int left = begin;
			int right = end;
			do { ++left; } while (left < end && target[left] <= pivot);
			do { --right; } while (target[right] > pivot);
			while (left < right) {
				swap(target, left, right);
				do { ++left; } while (target[left] <= pivot);
				do { --right; } while (target[right] > pivot);
			}
			swap(target, begin, right);
			return right;
		}

		protected final void swap(int[] array, int a, int b) {
			final int tmp = target[a];
			target[a] = target[b];
			target[b] = tmp;
		}
	}

	// キューへのアクセス回数を減らすことで高速化
	final class FastWorker extends SortWorker {
		public void run() {
			while (fixed.get() < target.length) {
				Partition partition = tasks.poll();
				if (partition == null) continue;

				Partition nextPartition;
				do {
					fixed.incrementAndGet();
		
					final int pivotIndex = sort(partition.begin, partition.end);
					nextPartition = null;
					
					if (partition.begin != pivotIndex)
						nextPartition = new Partition(partition.begin, pivotIndex);
		
					if (pivotIndex + 1 != partition.end) {
						if (nextPartition == null) {
							nextPartition = new Partition(pivotIndex + 1, partition.end);
						} else {
							while (!tasks.offer(new Partition(pivotIndex + 1, partition.end))) {};
						}
					}
				} while ((partition = nextPartition) != null);
			}
		}
	}

	static class Partition {
		final int begin;
		final int end;

		Partition(int begin, int end) {
			this.begin = begin;
			this.end = end;
		}
	}
}

課題と対策

はじめにSortWorkerクラスによるソートを2コアのMacBook Proで実行したところ、スレッドを増やすほど遅くなるという残念な結果になりました。これでは並行処理にした意味がありません。
hprofとVisualVMを使ったプロファイリングの結果、LinkedBlockingQueueに原因があることがわかりました。オブジェクト生成とロックによるCPUへの負荷です。ロックの負荷を減らすことはできませんので、キューに対するアクセス回数を減らす必要がありました。


これを実現したのがSortWorkerのサブクラスであるFastWorkerクラスです。次に処理する範囲を表すインスタンスをキューに入れずにローカル変数に持つことで、キューに対するアクセスを減らしています。これで処理速度を約2倍にすることができ、2スレッドの方が高速になりました。

他に遅い理由としては以下が挙げられます。

  • java.util.concurrent.locks.LockSupport.parkが実行時間の1/3を占めている
  • インスタンス生成とGCの負担が大きい
  • そもそもpivotを選択する方法が悪い

感想

今回のコーディングで、並列コンピューティング技法が紹介しているアルゴリズムや並列化のアプローチを実践することができました。キャッシュラインや分散メモリまで意識した実装はまだ遠い話ですが、簡単なプログラムならかなり身近なものになったと思います。
なお今回最も時間を要したのはソートが正常にできない理由を調査するフェーズと低速な理由を追求するフェーズでしたが、これらは並行コンピューティング独特ではなくプログラミング全般に共通の問題です。時代が変わってマルチコアが普通になっても、開発上重要な技術や知識は減らないということの現れと考えています。基礎の重要性を改めて認識しました。

*1:自主学習における総まとめは、網羅性と方向性を確認する観点から非常に重要と考えています。何かをテーマにして学ぶ場合、最後に資格試験や問題集を解くことを意識しています。