2012年04月11日

java.util.concurrent 〜Executor その3〜


さて、今回は複数の非同期処理を行う場合です。RunnableとThreadを用いて3本以上のスレッドの待ち合わせをするのは割と面倒だったりするのですが、ExecutorServiceを使用すると簡単です。また、ExecutorServiceはスレッドプールの機能も持ち合わせているので好きなようにリソースを最適化することができます。

では例によってサンプルソースを見ます。今回は非同期処理として複数の子スレッドでHTTP送信を行い、メインスレッドでは非同期処理が全て終わり次第結果を出力するようにします。HTTP送信など、コストが高い処理を順不同で大量に行う必要があり、その結果をマージしたい場合を想定します。

メインメソッド
public static void main(String[] args) {

	ExecutorService executor = null;
	try{
		//固定(3本)のスレッドプールを有するExecutorServiceを生成する
		executor = Executors.newFixedThreadPool(3);

		//タスクの生成しリストに詰める
		HttpGetTask task1 = new HttpGetTask("http://localhost:8080/DummyServer/test1.html");
		HttpGetTask task2 = new HttpGetTask("http://localhost:8080/DummyServer/test2.html");
		HttpGetTask task3 = new HttpGetTask("http://localhost:8080/DummyServer/test3.html");
		List<Callable<String>> tasks = new ArrayList<Callable<String>>();
		Collections.addAll(tasks, task1,task2,task3);

		List<Future<String>> resultList = null;
		try {
			//全てのタスクを実行
			resultList = executor.invokeAll(tasks);
		} catch (InterruptedException e) {
			e.printStackTrace();
			return;
		}

		//結果を表示
		for(Future<String> result : resultList){
			try {
				System.out.println(result.get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}

	}finally{
		//使い終わったらshutdownしておく
		if(executor != null){
			executor.shutdown();
		}
	}
}

非同期タスククラス
/**
 * HTTP GETを行う非同期処理。
 * @author ragtimer
 *
 */
public class HttpGetTask implements Callable<String> {

	/**
	 * 接続先URL
	 */
	private String url;

	/**
	 * コンストラクタ
	 */
	public HttpGetTask(String url){
		this.url = url;
	}


	@Override
	public String call() throws Exception {
		System.out.println("HttpGetTask start...");

		//url宛てにHTTPGETを送るだけ
		HttpClient httpclient = new DefaultHttpClient();
		HttpGet httpGet = null;
		String result = null;
		try {
			httpGet = new HttpGet(url);
			HttpResponse response = httpclient.execute(httpGet);
			HttpEntity httpEntity = response.getEntity();
			result = EntityUtils.toString(httpEntity);
		} finally {
            if (httpGet != null) {
                httpGet.abort();
            }
        }
		System.out.println("HttpGetTask end...");

		return result;
	}

}


ではソースを詳しく見ていきます。

・ExecutorServiceを作成
executor = Executors.newFixedThreadPool(3);
Executors.newFixedThreadPool(int)を使用します。引数はプールするスレッド数です。これでこのExecutorServiceでスレッドプールが勝手に適用されます。仮にスレッドが枯渇した場合はスレッドが空くまで待機してくれます。

・非同期処理を複数作成
HttpGetTask task1 = new HttpGetTask("http://localhost:8080/DummyServer/test1.html");
HttpGetTask task2 = new HttpGetTask("http://localhost:8080/DummyServer/test2.html");
HttpGetTask task3 = new HttpGetTask("http://localhost:8080/DummyServer/test3.html");
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
Collections.addAll(tasks, task1,task2,task3);
Callableを実装したHttpGetTaskを複数生成し、Listに詰めています。HttpGetTaskは生成時に渡されたurlにHTTP GETを送り、ボディを返却するだけの処理です。なお、HTTP送信にはapacheのHTTPClientを使用してます。対向のサーバローカルに適当に立てたTomcatです。

・非同期処理の実行
resultList = executor.invokeAll(tasks);
非同期処理を詰めたリストを引数にExecutorService.invokeAll(Collection>)を呼び出します。引数はFutureオブジェクトのリストで返却されます。

・結果の表示
for(Future<String> result : resultList){
	try {
		System.out.println(result.get());
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	}
}
Futureをループで回しながら結果を取得していきます。たったこれだけで待ち合わせができるのは素晴らしいですね。

次回はスケジュール処理をやります。
posted by sandman at 23:20| Comment(0) | Java | このブログの読者になる | 更新情報をチェックする

java.util.concurrent 〜Executor その2〜


さて、今回は戻り値を取得できるExecutorServiceのメソッドを使います。Runnableが提供するAPIはvoid run()ですので、Runnableでは非同期処理の戻り値を返すことができません。そこで、代わりにCallable<V>を使用します。Callableは引数を持たず、総称型の戻り値を持つcall()というメソッドを持ちます。Runnableと異なり、Exceptionを返すこともできます。このCallableをRunnableの代わりに使用します。

非同期処理の戻りを受け取るにはもうひとつ重要なインターフェースがあります。それはFutureです。Futureは非同期処理の結果を表します。あれ?非同期計算の結果はCallableの総称型で定義した戻り値じゃないの・・?と思いますが、Futureは非同期処理の終了をチェックし、自動的に同期をとってくれます。言葉ではわかりにくいのでサンプルを見てみます。

public static void main(String[] args) {

	//実行するタスクをCallableで作成
	Callable<Result> task = new Callable<Result>(){

		//Runnableと異なり、Callableは戻り値を返却することができます。
		//戻り値は総称型で定義します
		@Override
		public Result call() throws Exception {

			//三秒待つ(java.util.concurrent.TimeUnitを使用)
			try {
				TimeUnit.MILLISECONDS.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			Result result = new Result();
			Date date = new Date();
			result.setResultDate(date);
			return result;
		}

	};

	ExecutorService executor = null;
	try{
		//シングルスレッドを生成するためのExecutorServiceを作成する
		executor = Executors.newSingleThreadExecutor();

		//Futureオブジェクトはsubmitされた非同期計算の結果を取得するためのインターフェイス
		//非同期処理との同期が可能
		Future<Result> future = executor.submit(task);

		Result result = null;
		try {
			//非同期処理の結果を受け取る ※終了していない場合はここで待ってくれる
			result = future.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}

		//結果表示
		System.out.println(result.getResultDate());

	}finally{
		//使い終わったらshutdownしておく
		if(executor != null){
			executor.shutdown();
		}
	}
}
・処理結果クラス
/*
 * 処理結果を表すクラス
 *
 */
public class Result {

	//処理結果日時
	private Date resultDate;

	public Date getResultDate() {
		return resultDate;
	}

	public void setResultDate(Date resultDate) {
		this.resultDate = resultDate;
	}

}

ソースを詳しく見ていきます。
・非同期処理の生成
Callable<Result> task = new Callable<Result>(){

	//Runnableと異なり、Callableは戻り値を返却することができます。
	//戻り値は総称型で定義します
	@Override
	public Result call() throws Exception {

		//三秒待つ(java.util.concurrent.TimeUnitを使用)
		try {
			TimeUnit.MILLISECONDS.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		Result result = new Result();
		Date date = new Date();
		result.setResultDate(date);
		return result;
	}

};
前回とは異なり、Callableの匿名クラスで定義しています。総称型の戻り値はResultという処理結果を表すクラスです。単に非同期処理を行った時刻を保持しているだけのクラスですね。

・タスクを実行
Future<Result> future = executor.submit(task);
非同期処理の実行は、ExecutorService.submit(Callable)を使用します。このメソッドは戻り値にFutureを返しています。Vには、Callableに戻り値としてしかけたResultを指定してやります。もちろん、Futureが返却された時点では非同期処理そのものが終わっているわけではありません。

・非同期処理の結果を取得
result = future.get();
Future.get()で非同期処理の結果を受け取ります。このメソッドが呼び出されたタイミングで、非同期処理が完了している保証は全くありませんが、前述の通りFutureは自動的に同期をとり、完了していない場合は待機してくれます。戻り値はこの場合Resultになります。このFutureオブジェクトがあれば、好きな場所で非同期処理の結果を受け取ることができるわけです。これは非常に便利ですね。

次回は複数のタスクの処理です。
posted by sandman at 21:55| Comment(0) | Java | このブログの読者になる | 更新情報をチェックする

広告


この広告は60日以上更新がないブログに表示がされております。

以下のいずれかの方法で非表示にすることが可能です。

・記事の投稿、編集をおこなう
・マイブログの【設定】 > 【広告設定】 より、「60日間更新が無い場合」 の 「広告を表示しない」にチェックを入れて保存する。


×

この広告は1年以上新しい記事の投稿がないブログに表示されております。