2012年04月11日

java.util.concurrent 〜Executor その2〜


さて、今回は戻り値を取得できるExecutorServiceのメソッドを使います。Runnableが提供するAPIはvoid run()ですので、Runnableでは非同期処理の戻り値を返すことができません。そこで、代わりにCallable<V>を使用します。Callableは引数を持たず、総称型の戻り値を持つcall()というメソッドを持ちます。Runnableと異なり、Exceptionを返すこともできます。このCallableをRunnableの代わりに使用します。

非同期処理の戻りを受け取るにはもうひとつ重要なインターフェースがあります。それはFutureです。Futureは非同期処理の結果を表します。あれ?非同期計算の結果はCallableの総称型で定義した戻り値じゃないの・・?と思いますが、Futureは非同期処理の終了をチェックし、自動的に同期をとってくれます。言葉ではわかりにくいのでサンプルを見てみます。

public static void main(String[] args) {

	//実行するタスクをCallableで作成
	Callable<Result> task = new Callable<Result>(){

		//Runnableと異なり、Callableは戻り値を返却することができます。
		//戻り値は総称型で定義します
		@Override
		public Result call() throws Exception {

			//三秒待つ(java.util.concurrent.TimeUnitを使用)
			try {
				TimeUnit.MILLISECONDS.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			Result result = new Result();
			Date date = new Date();
			result.setResultDate(date);
			return result;
		}

	};

	ExecutorService executor = null;
	try{
		//シングルスレッドを生成するためのExecutorServiceを作成する
		executor = Executors.newSingleThreadExecutor();

		//Futureオブジェクトはsubmitされた非同期計算の結果を取得するためのインターフェイス
		//非同期処理との同期が可能
		Future<Result> future = executor.submit(task);

		Result result = null;
		try {
			//非同期処理の結果を受け取る ※終了していない場合はここで待ってくれる
			result = future.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}

		//結果表示
		System.out.println(result.getResultDate());

	}finally{
		//使い終わったらshutdownしておく
		if(executor != null){
			executor.shutdown();
		}
	}
}
・処理結果クラス
/*
 * 処理結果を表すクラス
 *
 */
public class Result {

	//処理結果日時
	private Date resultDate;

	public Date getResultDate() {
		return resultDate;
	}

	public void setResultDate(Date resultDate) {
		this.resultDate = resultDate;
	}

}

ソースを詳しく見ていきます。
・非同期処理の生成
Callable<Result> task = new Callable<Result>(){

	//Runnableと異なり、Callableは戻り値を返却することができます。
	//戻り値は総称型で定義します
	@Override
	public Result call() throws Exception {

		//三秒待つ(java.util.concurrent.TimeUnitを使用)
		try {
			TimeUnit.MILLISECONDS.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		Result result = new Result();
		Date date = new Date();
		result.setResultDate(date);
		return result;
	}

};
前回とは異なり、Callableの匿名クラスで定義しています。総称型の戻り値はResultという処理結果を表すクラスです。単に非同期処理を行った時刻を保持しているだけのクラスですね。

・タスクを実行
Future<Result> future = executor.submit(task);
非同期処理の実行は、ExecutorService.submit(Callable)を使用します。このメソッドは戻り値にFutureを返しています。Vには、Callableに戻り値としてしかけたResultを指定してやります。もちろん、Futureが返却された時点では非同期処理そのものが終わっているわけではありません。

・非同期処理の結果を取得
result = future.get();
Future.get()で非同期処理の結果を受け取ります。このメソッドが呼び出されたタイミングで、非同期処理が完了している保証は全くありませんが、前述の通りFutureは自動的に同期をとり、完了していない場合は待機してくれます。戻り値はこの場合Resultになります。このFutureオブジェクトがあれば、好きな場所で非同期処理の結果を受け取ることができるわけです。これは非常に便利ですね。

次回は複数のタスクの処理です。
posted by sandman at 21:55| Comment(0) | Java | このブログの読者になる | 更新情報をチェックする

2012年04月04日

java.util.concurrent 〜Executor その1〜


今日からはExecutorインターフェイス関連をやっていきます。Executorは非同期処理を実行するためのインターフェイスです。java.util.concurrentで一番重要そうなインターフェイスですね。まずは超簡単なサンプルから見ていきます。

ExecutorとRunnable
最初はExecutorとRunnableを用いた単純なサンプルから。
public static void main(String[] args) {

	//実行させるタスクをRunnableで作成
	Runnable task = new Runnable(){

		@Override
		public void run() {
			//三秒待つ(java.util.concurrent.TimeUnitを使用)
			try {
				TimeUnit.MILLISECONDS.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("Hello java.util.concurrent World");
		}

	};

	//シングルスレッドを生成するためのExcutorを作成する
	Executor executor = Executors.newSingleThreadExecutor();

	//タスクを実行
	executor.execute(task);

	//※注意
	//Executorインターフェースはタスク終了後もスレッドを上げ続けるので、このままだと
	//Mainスレッドは終わりません。

}

・非同期処理の作成
まずはRunnableを匿名クラスで生成して、非同期のタスクを作っています。内容は3秒待ってから「Hello java.util.concurrent World」と出力するだけです。

・Executorを作成
続いて主役であるExecutorを作成します。といっても、Executorはインターフェイスなので、Executorsというクラスに作成してもらいます。このExecutorsは後述するExecutorService、ScheduledExecutorService、ThreadFactory、および Callableを作成するためのファクトリです。ちなみにExecutorはExecutorServiceやScheduledExecutorServiceのスーパーインターフェイスです。Executorsは用途によって様々なメソッドが用意されていますが、今回は単純に「1つのスレッドを実行するExcutorService」を作成するExecutors.newSingleThreadExecutor()を使用します。

・タスクを実行
Executor.execute()を呼び出し、引数にRunnableを渡してやります。ここで気をつけて欲しいのはこのままだとmainスレッドは終了しないということです。コメントにも記しましたがExecutorインターフェースはタスク終了後もスレッドを上げ続けるからです。じゃあどうするか?というと、Executorではどうすることもできません。実はExecutorはexecuteというメソッドしか持っていないからです。この問題に対処するにはExecutorServiceが必要になってきます。

ExecutorServiceとRunnable
次はExecutorServiceを試してみます。ExecutorServiceはExecutorの拡張インターフェイスです。
public static void main(String[] args) {

	//実行させるタスクをRunnableで作成
	Runnable task = new Runnable(){

		@Override
		public void run() {
			//三秒待つ(java.util.concurrent.TimeUnitを使用)
			try {
				TimeUnit.MILLISECONDS.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("Hello java.util.concurrent World");
		}

	};

	//シングルスレッドを生成するためのExecutorServiceを作成する
	ExecutorService executor = Executors.newSingleThreadExecutor();

	//タスクを実行
	executor.execute(task);

	System.out.println(executor.isShutdown());

	//Excutorの処理が終了したらスレッドを終了させる命令
	executor.shutdown();

	//仮にこの時点でスレッドが終了していなくてもshutdownが呼ばれていればisShutdownの結果はtrue
	//「起動したスレッドが終了しているか」はisTerminated()で検査できる
	System.out.println(executor.isShutdown());
}

Executors.newSingleThreadExecutor()を呼び出し、execute()で実行するまでは全く前回と同じです。ただし受け取りがExecutorServiceなっていることに注意。
・非同期処理スレッドの終了
executor.shutdown()で非同期処理が終了したらスレッドをシャットダウンさせることができます。非同期処理がいつ終わるか判らないことを考えると「シャットダウン予約」のようなニュアンスでしょうか。コメントにあるとおり、「非同期処理が終了し、スレッドがシャットダウンされたかどうか」を調べるにはExecutorService.isTerminated()を使用します。

次回は非同期処理からの戻り値の取得についてです。
posted by sandman at 22:12| Comment(0) | Java | このブログの読者になる | 更新情報をチェックする

2012年03月30日

java.util.concurrent 〜CopyOnWriteArrayList〜


突然ですが、java.util.concurrent使ってますか?
僕は使ってません。また、使っている人をあんまり見たことがありません。

java.util.concurrentはJava5.0から追加された並列プログラミングの要求に応えるためのパッケージです。WEBアプリの開発者ならばほぼ間違いなく、そうでない人も結構な頻度で並列処理に注意を払わねばならない場面は多いと思います。しかしながら、前述の通り、java.util.concurrentを僕自身あまり使っていません。それはなぜか?真っ先に思いつく答えは「synchronizedで十分だから」というものでしょう。確かに、一般的な排他処理はsynchronizedで十分でしょう。しかし、明示的な排他処理は実に骨が折れる作業であるし、ソースの可読性も下がっていくことは確かです。

ただ、僕個人としてはjava.util.concurrentがなにがしか素敵な道具箱であることはわかっているものの「でもなんだかよく分からんから使いたくない」というのが正直なところです。ということで、java.util.concurrentをいろいろ試してみることにします。お勉強がてら。
初回はCopyOnWriteArrayListです。では行きます。

ArrayListがスレッドセーフでないのはよく知られています。ArrayListはリストの走査を行っている最中に要素の変更が加わるとConcurrentModificationExceptionが発生します。

以下のような場合です。

List<String> list = new ArrayList<String>();
Collections.addAll(list, "aa","bb","cc");
for(String h : list){
    System.out.println(h);
    list.remove(h);
}

実行結果は以下の通り。
aa
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
at java.util.AbstractList$Itr.next(AbstractList.java:343)
at test1.Main1.main(Main1.java:16)

ちょっと注意すべきなのが、このConcurrentModificationExceptionが発生するタイミングです。感覚的にはlist.remove(h)のような気もしますが、そうではなくfor(String h : list){の部分になります。なぜなら、このConcurrentModificationExceptionをスローするのはこのコレクションの反復子、つまりIteratorだからです。

上記の例では単一スレッドですが、当然マルチスレッドでも同じことが発生します。

public class ArrayListTest {

	//test対象となるリスト。ArrayListで宣言
	private static List<String> arrayList = new ArrayList<String>();

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		//適当にいっぱい要素追加
		long time = System.currentTimeMillis();
		for(int i = 0; i < 10000; i++){
			arrayList.add("hogehoge" + i);
		}
		System.out.print("所要時間:");
		System.out.print(System.currentTimeMillis()-time);
		System.out.println("ミリ秒");

		//別スレッドでリストを走査する
		Thread th = new Thread(new Runnable(){
			@Override
			public void run() {
				for(String h : arrayList){			//ConcurrentModificationExceptionはここで発生します。
				}
			}
		});
		th.start();

		//走査中に操作するためにちょろっと待つ
		try {
			Thread.sleep(5);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		//操作中に削除
		arrayList.remove("hogehoge0");
		System.out.println("削除!");

	}

}

実行結果はこちら
所要時間:20ミリ秒
削除!
Exception in thread "Thread-0" java.util.ConcurrentModificationException
at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
at java.util.AbstractList$Itr.next(AbstractList.java:343)
at copyOnWriteArrayListTest.ArrayListTest$1.run(ArrayListTest.java:30)
at java.lang.Thread.run(Thread.java:662)

この問題を解決すべく作られたのがCopyOnWriteArrayListさんです。CopyOnWriteArrayListは配列に対しておこなう全ての操作を配列を新規コピーすることで実装しているスレッドセーフなArrayListです。では上記のプログラムのListの宣言を以下のように変更してみましょう。

private static List<String> arrayList = new CopyOnWriteArrayList<String>();

実行結果はこちら。

所要時間:285ミリ秒
削除!

おお、うまくいきました。素晴らしいです。でもちょっと気になるのが処理時間。Stringの要素を10000件挿入するのにArrayListでは20ミリ秒だったのに大して、CopyOnWriteArrayListでは285ミリ秒もかかってます。10倍以上遅い・・・?というわけでもないのです。

試しに要素の挿入数を100000に増やしてみます。

ArrayListの結果がこちら。
所要時間:184ミリ秒
Exception in thread "Thread-0" java.util.ConcurrentModificationException
at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372)
at java.util.AbstractList$Itr.next(AbstractList.java:343)
at copyOnWriteArrayListTest.ArrayListTest$1.run(ArrayListTest.java:30)
at java.lang.Thread.run(Thread.java:662)削除!

対してCopyOnWriteArrayListがこれ
所要時間:37085ミリ秒
削除!

うへえ。超遅い。でもよく考えれば当たり前で、CopyOnWriteArrayListは「変更のたびに全体をコピーする」ものです。つまり、全体の要素数が増えれば増えるほど、また総称型の引数のサイズが大きければ大きいほど、操作の処理時間は累乗的に増えていきます。

以上より、CopyOnWriteArrayListを使用すべき場面は以下のようになるでしょう。

@排他処理が必要
A変更の操作がほとんどない
Bリストサイズがそれほど巨大でない(作成時に時間がかかるから)
C読み取りは多い

とりあえず今回はこれまで
posted by sandman at 15:29| Comment(0) | Java | このブログの読者になる | 更新情報をチェックする

広告


この広告は60日以上更新がないブログに表示がされております。

以下のいずれかの方法で非表示にすることが可能です。

・記事の投稿、編集をおこなう
・マイブログの【設定】 > 【広告設定】 より、「60日間更新が無い場合」 の 「広告を表示しない」にチェックを入れて保存する。


×

この広告は1年以上新しい記事の投稿がないブログに表示されております。