時間が掛かる複数の処理をマルチスレッドで同時並行で動かすバッチを検討していて、エラー処理をどうするのかという点が気になったので調査を実施。
前提
以下のようなバッチにおいて、レコード管理に不整合を起こさせない制御が可能かを調査する。
- DBから取得したレコードに対し同時並行で処理を行い、元のレコードを更新する
- Springが提供するExcecutorで同時実行数を設定し、同時実行数いっぱいまで実行させる
- どのレコードを処理しているかを管理し、同じレコードの同時処理を避けると共に、エラーが発生した場合は管理対象から外す
考えられる例外
- タイムアウト
- 実行時例外
- 起動数超過
各例外についての調査結果
タイムアウト
こちらはWebMvCではタイムアウト時間が設定されており、その制御方法も提供されているようだが、Webアプリではない場合はそもそもタイムアウトしないように思われる(非同期処理を1300秒動かしてタイムアウトせず)。
実際のバッチでは外部サービスのAPIを呼ぶため、通信でタイムアウトするかも知れないが、今回は除外。
実行時例外
実行時例外が発生した場合にどのレコードがエラーになったか、非同期処理で把握できるかが心配だったが、AsyncUncaughtExceptionHandler
の実装クラスを作ることで対応可能。
起動数超過
Excecutorの実行可能数を超過した場合、同期処理で実行する非同期処理の起動処理にてRejectedExecutionException
が発生するため、対象レコードの特定は容易。
実装
テストプロジェクトはCommandLineRunner
の実装クラスとして作成。
springboot-study/async-exception-test at master · orimajp/springboot-study
MainRunner
テストシナリオ的には、例外が発生する非同期処理を最初に実行し、2秒後に多重起動例外を発生させる。2秒空けているのはログの混在を避けるため。
package com.example.asyncexceptiontest;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Logger;
@Component
@RequiredArgsConstructor
public class MainRunner implements CommandLineRunner {
private final Logger logger = Logger.getLogger(getClass().getName());
private final AsyncTask asyncTask;
@Override
public void run(String... args) throws Exception {
final Integer id = 10;
final Dummy dummy = new Dummy(20, "test");
logger.info("不正値エラー確認起動 開始");
asyncTask.exceptionProcess(id, dummy);
logger.info("不正値エラー確認起動 終了");
Thread.sleep(2000);
try {
logger.info("多重起動エラー確認起動 開始");
int i = 0;
while (i < 10) {
i++;
logger.info("起動 " + i + " 回目");
asyncTask.process(i, dummy);
}
} catch (RejectedExecutionException e) {
logger.warning("多重起動エラーです。");
} finally {
logger.info("多重起動エラー確認起動 終了");
}
}
}
AsyncTask
非同期処理実装クラス。
package com.example.asyncexceptiontest;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.logging.Logger;
@Component
public class AsyncTask {
final Logger logger = Logger.getLogger(getClass().getName());
@Async
public void process(Integer id, Dummy dummy) {
try {
logger.info("非同期起動 id=" + id);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Async
public void exceptionProcess(Integer id, Dummy dummy) {
if (id != 1) {
throw new IllegalArgumentException("不正値エラー");
}
}
}
Dummy
package com.example.asyncexceptiontest;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Dummy {
private Integer id;
private String name;
}
ExecutorConfig
プールサイズ2、キューサイズは1となっている。
package com.example.asyncexceptiontest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ExecutorConfig {
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(1);
executor.setThreadNamePrefix("DummyProcess-");
executor.initialize();
return executor;
}
}
CustomAsyncUncaughtExceptionHandler
ここで非同期処理実行時のメソッドとパラメータを取得できるので、特例レコードを管理対象外とする処理が組めると思われる。
package com.example.asyncexceptiontest;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import java.lang.reflect.Method;
import java.util.logging.Logger;
public class CustomAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
private final Logger logger = Logger.getLogger(getClass().getName());
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
logger.info("ex:" + ex);
logger.info("method:" + method);
for (Object obj: params) {
logger.info("param:" + obj);
}
}
}
AsyncUncaughtExceptionHandlerConfig
package com.example.asyncexceptiontest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AsyncUncaughtExceptionHandlerConfig {
@Bean
public CustomAsyncUncaughtExceptionHandler customAsyncUncaughtExceptionHandler() {
return new CustomAsyncUncaughtExceptionHandler();
}
}
AsyncConfig
正直言って、ThreadPoolTaskExecutor
とCustomAsyncUncaughtExceptionHandler
をわざわざBean定義する必要があるのかどうか分からない。
package com.example.asyncexceptiontest;
import lombok.RequiredArgsConstructor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@RequiredArgsConstructor
public class AsyncConfig extends AsyncConfigurerSupport {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private final CustomAsyncUncaughtExceptionHandler customAsyncUncaughtExceptionHandler;
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return customAsyncUncaughtExceptionHandler;
}
@Override
public Executor getAsyncExecutor() {
return threadPoolTaskExecutor;
}
}
動作結果
最初に不正値例外確認の非同期処理が実行され、例外発生を受けてCustomAsyncUncaughtExceptionHandler
による処理が行われる。
2秒後に多重起動エラー確認用非同期処理をループで起動し、2スレッド+1キュー分たまったところで例外発生。非同期処理は順次実行されている。
2017-11-26 23:16:29.335 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 不正値エラー確認起動 開始
2017-11-26 23:16:29.337 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 不正値エラー確認起動 終了
2017-11-26 23:16:29.340 INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : ex:java.lang.IllegalArgumentException: 不正値エラー
2017-11-26 23:16:29.341 INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : method:public void com.example.asyncexceptiontest.AsyncTask.exceptionProcess(java.lang.Integer,com.example.asyncexceptiontest.Dummy)
2017-11-26 23:16:29.346 INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : param:10
2017-11-26 23:16:29.346 INFO 74400 --- [ DummyProcess-1] .e.a.CustomAsyncUncaughtExceptionHandler : param:Dummy(id=20, name=test)
2017-11-26 23:16:31.341 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 多重起動エラー確認起動 開始
2017-11-26 23:16:31.343 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 起動 1 回目
2017-11-26 23:16:31.343 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 起動 2 回目
2017-11-26 23:16:31.343 INFO 74400 --- [ DummyProcess-2] c.example.asyncexceptiontest.AsyncTask : 非同期起動 id=1
2017-11-26 23:16:31.348 INFO 74400 --- [ DummyProcess-1] c.example.asyncexceptiontest.AsyncTask : 非同期起動 id=2
2017-11-26 23:16:31.348 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 起動 3 回目
2017-11-26 23:16:31.349 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 起動 4 回目
2017-11-26 23:16:31.350 WARN 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 多重起動エラーです。
2017-11-26 23:16:31.351 INFO 74400 --- [ main] c.example.asyncexceptiontest.MainRunner : 多重起動エラー確認起動 終了
2017-11-26 23:16:31.352 INFO 74400 --- [ main] c.e.a.AsyncExceptionTestApplication : Started AsyncExceptionTestApplication in 12.61 seconds (JVM running for 22.977)
2017-11-26 23:16:32.352 INFO 74400 --- [ DummyProcess-1] c.example.asyncexceptiontest.AsyncTask : 非同期起動 id=3