時間が掛かる複数の処理をマルチスレッドで同時並行で動かすバッチを検討していて、エラー処理をどうするのかという点が気になったので調査を実施。

前提

以下のようなバッチにおいて、レコード管理に不整合を起こさせない制御が可能かを調査する。

  • DBから取得したレコードに対し同時並行で処理を行い、元のレコードを更新する
  • Springが提供するExcecutorで同時実行数を設定し、同時実行数いっぱいまで実行させる
  • どのレコードを処理しているかを管理し、同じレコードの同時処理を避けると共に、エラーが発生した場合は管理対象から外す

考えられる例外

  • タイムアウト
  • 実行時例外
  • 起動数超過

各例外についての調査結果

タイムアウト

こちらはWebMvCではタイムアウト時間が設定されており、その制御方法も提供されているようだが、Webアプリではない場合はそもそもタイムアウトしないように思われる(非同期処理を1300秒動かしてタイムアウトせず)。

実際のバッチでは外部サービスのAPIを呼ぶため、通信でタイムアウトするかも知れないが、今回は除外。

実行時例外

実行時例外が発生した場合にどのレコードがエラーになったか、非同期処理で把握できるかが心配だったが、AsyncUncaughtExceptionHandlerの実装クラスを作ることで対応可能。

起動数超過

Excecutorの実行可能数を超過した場合、同期処理で実行する非同期処理の起動処理にてRejectedExecutionExceptionが発生するため、対象レコードの特定は容易。

実装

テストプロジェクトはCommandLineRunnerの実装クラスとして作成。

springboot-study/async-exception-test at master · orimajp/springboot-study

MainRunner

テストシナリオ的には、例外が発生する非同期処理を最初に実行し、2秒後に多重起動例外を発生させる。2秒空けているのはログの混在を避けるため。

package com.example.asyncexceptiontest;

import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Logger;

@Component
@RequiredArgsConstructor
public class MainRunner implements CommandLineRunner {

	private final Logger logger = Logger.getLogger(getClass().getName());

	private final AsyncTask asyncTask;

	@Override
	public void run(String... args) throws Exception {
		final Integer id = 10;
		final Dummy dummy = new Dummy(20, "test");

		logger.info("不正値エラー確認起動 開始");

		asyncTask.exceptionProcess(id, dummy);

		logger.info("不正値エラー確認起動 終了");

		Thread.sleep(2000);

		try {
			logger.info("多重起動エラー確認起動 開始");
			int i = 0;
			while (i < 10) {
				i++;
				logger.info("起動 " + i + " 回目");
				asyncTask.process(i, dummy);
			}
		} catch (RejectedExecutionException e) {
			logger.warning("多重起動エラーです。");
		} finally {
			logger.info("多重起動エラー確認起動 終了");
		}

	}

}

AsyncTask

非同期処理実装クラス。

package com.example.asyncexceptiontest;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.logging.Logger;

@Component
public class AsyncTask {

	final Logger logger = Logger.getLogger(getClass().getName());

	@Async
	public void process(Integer id, Dummy dummy) {
		try {
			logger.info("非同期起動 id=" + id);
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	@Async
	public void exceptionProcess(Integer id, Dummy dummy) {
		if (id != 1) {
			throw new IllegalArgumentException("不正値エラー");
		}
	}

}

Dummy

package com.example.asyncexceptiontest;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Dummy {

	private Integer id;

	private String name;

}

ExecutorConfig

プールサイズ2、キューサイズは1となっている。

package com.example.asyncexceptiontest;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class ExecutorConfig {

	@Bean
	public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
		final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

		executor.setCorePoolSize(2);
		executor.setMaxPoolSize(2);
		executor.setQueueCapacity(1);
		executor.setThreadNamePrefix("DummyProcess-");
		executor.initialize();

		return executor;
	}

}

CustomAsyncUncaughtExceptionHandler

ここで非同期処理実行時のメソッドとパラメータを取得できるので、特例レコードを管理対象外とする処理が組めると思われる。

package com.example.asyncexceptiontest;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

import java.lang.reflect.Method;
import java.util.logging.Logger;

public class CustomAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

	private final Logger logger = Logger.getLogger(getClass().getName());

	@Override
	public void handleUncaughtException(Throwable ex, Method method, Object... params) {
		logger.info("ex:" + ex);
		logger.info("method:" + method);

		for (Object obj: params) {
			logger.info("param:" + obj);
		}
	}

}

AsyncUncaughtExceptionHandlerConfig

package com.example.asyncexceptiontest;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AsyncUncaughtExceptionHandlerConfig {

	@Bean
	public CustomAsyncUncaughtExceptionHandler customAsyncUncaughtExceptionHandler() {
		return new CustomAsyncUncaughtExceptionHandler();
	}

}

AsyncConfig

正直言って、ThreadPoolTaskExecutorCustomAsyncUncaughtExceptionHandlerをわざわざBean定義する必要があるのかどうか分からない。

package com.example.asyncexceptiontest;

import lombok.RequiredArgsConstructor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@RequiredArgsConstructor
public class AsyncConfig extends AsyncConfigurerSupport {

	private final ThreadPoolTaskExecutor threadPoolTaskExecutor;

	private final CustomAsyncUncaughtExceptionHandler customAsyncUncaughtExceptionHandler;

	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return customAsyncUncaughtExceptionHandler;
	}

	@Override
	public Executor getAsyncExecutor() {
		return threadPoolTaskExecutor;
	}

}

動作結果

最初に不正値例外確認の非同期処理が実行され、例外発生を受けてCustomAsyncUncaughtExceptionHandlerによる処理が行われる。

2秒後に多重起動エラー確認用非同期処理をループで起動し、2スレッド+1キュー分たまったところで例外発生。非同期処理は順次実行されている。

2017-11-26 23:16:29.335  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 不正値エラー確認起動 開始
2017-11-26 23:16:29.337  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 不正値エラー確認起動 終了
2017-11-26 23:16:29.340  INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : ex:java.lang.IllegalArgumentException: 不正値エラー
2017-11-26 23:16:29.341  INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : method:public void com.example.asyncexceptiontest.AsyncTask.exceptionProcess(java.lang.Integer,com.example.asyncexceptiontest.Dummy)
2017-11-26 23:16:29.346  INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : param:10
2017-11-26 23:16:29.346  INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : param:Dummy(id=20, name=test)
2017-11-26 23:16:31.341  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 多重起動エラー確認起動 開始
2017-11-26 23:16:31.343  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 起動 1 回目
2017-11-26 23:16:31.343  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 起動 2 回目
2017-11-26 23:16:31.343  INFO 74400 --- [ DummyProcess-2] c.example.asyncexceptiontest.AsyncTask   : 非同期起動 id=1
2017-11-26 23:16:31.348  INFO 74400 --- [ DummyProcess-1] c.example.asyncexceptiontest.AsyncTask   : 非同期起動 id=2
2017-11-26 23:16:31.348  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 起動 3 回目
2017-11-26 23:16:31.349  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 起動 4 回目
2017-11-26 23:16:31.350  WARN 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 多重起動エラーです。
2017-11-26 23:16:31.351  INFO 74400 --- [           main] c.example.asyncexceptiontest.MainRunner  : 多重起動エラー確認起動 終了
2017-11-26 23:16:31.352  INFO 74400 --- [           main] c.e.a.AsyncExceptionTestApplication      : Started AsyncExceptionTestApplication in 12.61 seconds (JVM running for 22.977)
2017-11-26 23:16:32.352  INFO 74400 --- [ DummyProcess-1] c.example.asyncexceptiontest.AsyncTask   : 非同期起動 id=3

参考