保证异步线程能够保证事务的使用工具
- 此工具是为了解决在使用
spring
框架下,解决使用异步线程执行任务出现的事务问题。 - 针对不同的业务使用提供相应
api
,即可达到目的 - 通过获取
spring
中bean
的方式引入即可
- 支持
批量异步任务
执行 - 支持接收异步任务执行结果的功能,调用api时,传入
List
类型集合参数即可 - 支持
批量异步任务
执行,且支持事务
,当某个异步任务出现异常
,事务
可回滚 - 支持
主任务
和批量异步任务
的执行,且支持事务
,当主任务
和批量异步任务
中只要有一个任务发生异常事务
都会回滚
- 目前异步任务最大数量为
100
- 底层采用的线程池为核心线程数为
0
,最大线程数为101
,因为如果核心数不为0
的话,当任务数大于了线程核心数会发生死锁情况。 - 底层采用的事务级别为
READ_COMMITTED
,如果采用REPEATABLE_READ
,由于mysql在REPEATABLE_READ
级别会产生间隙锁,所以可能发生死锁情况。
/**
* 异步任务运行
* @param taskList 需要异步执行的任务
*/
public void execute(List<Runnable> taskList)
/**
* 异步任务运行
* @param callTaskList 需要异步执行的任务(有返回值)
* @param resultList 承载异步执行任务结果的集合
* @param sequence 是否要求执行的任务 和 承载异步执行任务结果的集合保证顺序
*/
public <V> void call(List<Callable<V>> callTaskList,List<V> resultList,boolean sequence)
/**
* 异步任务独立事务运行(出现异常回滚本异步任务,其他异步任务不会回滚)
* @param taskList 需要异步执行的任务
*/
public void independenceTransactionExecute(List<Runnable> taskList)
/**
* 异步任务独立事务运行(出现异常回滚本异步任务,其他异步任务不会回滚)
* @param callTaskList 需要异步执行的任务(有返回值)
* @param resultList 承载异步执行任务结果的集合
*/
public <V> void independenceTransactionCall(List<Callable<V>> callTaskList,List<V> resultList)
/**
* @param taskList 需要异步执行的任务
*/
public void transactionExecute(List<Runnable> taskList)
/**
* @param callTaskList 需要异步执行的任务(有返回值)
* @param resultList 承载异步执行任务结果的集合
*/
public <T> void transactionCall(List<Callable<T>> callTaskList, List<T> resultList)
/**
* @param majorTaskRunnable 需要执行的主任务
* @param taskList 需要异步执行的任务
* @throws Throwable 抛出的异常
*/
public void transactionExecute(MajorTaskRunnable majorTaskRunnable, List<Runnable> taskList)
/**
* @param majorTaskRunnable 需要执行的主任务
* @param callTaskList 需要异步执行的任务(有返回值)
* @param resultList 承载异步执行任务结果的集合
* @throws Throwable 抛出的异常
*/
public <T> void transactionCall(MajorTaskRunnable majorTaskRunnable,
List<Callable<T>> callTaskList, List<T> resultList)
/**
* @param majorTaskCallable 需要执行的主任务(有返回值)
* @param taskList 需要异步执行的任务
* @return T 主任务的返回值
* @throws Throwable 抛出的异常
*/
public <T> T transactionExecute(MajorTaskCallable<T> majorTaskCallable, List<Runnable> taskList)
/**
* @param majorTaskCallable 需要执行的主任务(有返回值)
* @param callTaskList 需要异步执行的任务(有返回值)
* @param resultList 承载异步执行任务结果的集合
* @return T 主任务的返回值
* @throws Throwable 抛出的异常
*/
public <T,V> T transactionCall(MajorTaskCallable<T> majorTaskCallable,
List<Callable<V>> callTaskList, List<V> resultList)
/**
* @param majorTaskCallable 需要执行的主任务(有返回值)
* @param inputParamTaskList 需要异步执行的任务(需要有主任务中的返回值来做输入参数)
* @return T 主任务的返回值
* @throws Throwable 抛出的异常
*/
public <T> T transactionExecuteInputParamTask(MajorTaskCallable<T> majorTaskCallable,
List<InputParamRunnable<T>> inputParamTaskList)
/**
* @param majorTaskCallable 需要执行的主任务(有返回值)
* @param inputParamCallTaskList 需要异步执行的任务(有返回值)(需要有主任务中的返回值来做输入参数)
* @param resultList 承载异步执行任务结果的集合
* @return T 主任务的返回值
* @throws Throwable 抛出的异常
*/
public <T,V> T transactionCallInputParamCallTask(MajorTaskCallable<T> majorTaskCallable,
List<InputParamCallable<V,T>> inputParamCallTaskList, List<V> resultList)
异步任务运行
public Integer addAccount(int id){
List<Runnable> runnableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Runnable r = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
this.add(account);
};
runnableList.add(r);
}
threadTransactionTool.execute(runnableList);
return null;
}
异步任务运行,通过List类型入参
拿到异步任务结果。
public Object addAccount(int id){
List<Callable<Integer>> callableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Callable c = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
this.add(account);
return temp;
};
callableList.add(c);
}
List<Integer> resultList = new ArrayList<>();
threadTransactionTool.call(callableList,resultList,true);
return resultList;
}
异步任务独立事务
运行(出现异常回滚
本异步任务,其他异步任务不会回滚
)
public Integer addAccount(int id){
List<Runnable> runnableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Runnable r = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
this.add(account);
};
runnableList.add(r);
}
threadTransactionTool.independenceTransactionExecute(runnableList);
return null;
}
异步任务独立事务
运行(出现异常回滚
本异步任务,其他异步任务不会回滚
),通过List类型入参
拿到异步任务结果。
public Object addAccount(int id){
List<Callable<Integer>> callableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Callable c = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
this.add(account);
return temp;
};
callableList.add(c);
}
List<Integer> resultList = new ArrayList<>();
threadTransactionTool.independenceTransactionCall(callableList,resultList);
return resultList;
}
处理异步执行的任务
public Integer addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
Integer result = userMapper.add(user);
List<Runnable> runnableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Runnable r = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
accountService.add(account);
};
runnableList.add(r);
}
try {
threadTransactionTool.transactionExecute(runnableList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return result;
}
批量异步任务
执行时,当其中一个出现异常
后,其余的异步任务
都会回滚
。但是当主任务
添加user的操作出现异常,异步任务不会回滚
,需要使用threadTransactionTool
提供的另外方法api。
处理异步执行的任务,并拿到异步任务结果
public Object addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
Integer result = 0;
List<Callable<Integer>> callableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Callable call = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
accountService.add(account);
return temp;
};
callableList.add(call);
}
List<Integer> resultList = new ArrayList<>();
try {
threadTransactionTool.transactionCall(callableList,resultList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return resultList;
}
对介绍5
的api增强了接收异步结果
的功能,通过传入resultList
参数实现。
处理主任务和异步执行的任务
public Integer addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
Integer result = 0;
List<Runnable> runnableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Runnable r = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
accountService.add(account);
};
runnableList.add(r);
}
try {
threadTransactionTool.transactionExecute(() -> {
userMapper.add(user);
}, runnableList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return result;
}
批量异步任务
执行时,当其中一个出现异常
后,其余的异步任务
都会回滚
。并且主任务
添加user的操作出现异常
,异步任务
也能够回滚
。
8. public void transactionCall(MajorTaskRunnable majorTaskRunnable, List<Callable> callTaskList, List resultList)
处理主任务和异步执行的任务并拿到异步任务结果
public Object addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
Integer result = 0;
List<Callable<Integer>> callableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Callable call = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
accountService.add(account);
return temp;
};
callableList.add(call);
}
List<Integer> resultList = new ArrayList<>();
try {
threadTransactionTool.transactionCall(() -> {
userMapper.add(user);
}, callableList,resultList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return resultList;
}
对介绍7
的api增强了接收异步结果
的功能,通过传入resultList
参数实现。
处理主任务和异步执行的任务,并返回主任务结果
public Integer addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
Integer result = 0;
List<Runnable> runnableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Runnable r = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
accountService.add(account);
};
runnableList.add(r);
}
try {
result = threadTransactionTool.transactionExecute(() -> {
int insertResult = userMapper.add(user);
return insertResult;
}, runnableList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return result;
}
对介绍7
的api增强了返回主任务结果
的功能。
10. public <T,V> T transactionCall(MajorTaskCallable majorTaskCallable, List<Callable> callTaskList, List resultList)
处理主任务和异步执行的任务,并返回主任务结果
,并且拿到异步任务结果
public Object addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
Integer result = 0;
List<Callable<Integer>> callableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
Callable call = () -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
accountService.add(account);
return temp;
};
callableList.add(call);
}
List<Integer> resultList = new ArrayList<>();
try {
result = threadTransactionTool.transactionCall(() -> {
int insertResult = userMapper.add(user);
return insertResult;
}, callableList,resultList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return resultList;
}
对介绍7
的api增强了接收主任务结果
的功能。也增强了接收异步结果
的功能,通过传入resultList
参数实现。
11. public T transactionExecuteInputParamTask(MajorTaskCallable majorTaskCallable, List<InputParamRunnable> inputParamTaskList)
处理主任务和异步执行的任务(异步任务需要主任务的返回值
),并返回主任务结果
public Object addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
String userNameResult = null;
List<InputParamRunnable<String>> inputParamRunnableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
InputParamRunnable c = (InputParamRunnable<String>) userName -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
account.setUserName(userName);
accountService.add(account);
};
inputParamRunnableList.add(c);
}
try {
userNameResult = threadTransactionTool.transactionExecuteInputParamTask(() -> {
userMapper.add(user);
User userResult = userMapper.get(user.getId());
return userResult.getName();
},inputParamRunnableList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!"+e+"===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return userNameResult;
}
对介绍9
的api增加了异步任务需要主任务返回值
的功能。
12. public <T,V> T transactionCallInputParamCallTask(MajorTaskCallable majorTaskCallable, List<InputParamCallable<V,T>> inputParamCallTaskList, List resultList)
处理主任务和异步执行的任务(异步任务需要主任务的返回值
),并返回主任务结果
,以及拿到异步任务结果
public Object addUserAndAccount(User user){
long startTime = System.currentTimeMillis();
String userNameResult = null;
List<InputParamCallable<Integer,String>> inputParamCallableList = new ArrayList<>();
for(int i = 1; i<= 20; i++){
int temp = i;
InputParamCallable c = (InputParamCallable<Integer,String>) userName -> {
Account account = new Account();
account.setId(temp);
account.setAccountId(IdGeneratorUtil.getId());
account.setName("sss");
account.setPassword("www");
account.setAge(temp);
account.setUserName(userName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
accountService.add(account);
return temp;
};
inputParamCallableList.add(c);
}
List<Integer> resultList = new ArrayList<>();
try {
userNameResult = threadTransactionTool.transactionCallInputParamCallTask(() -> {
userMapper.add(user);
User userResult = userMapper.get(user.getId());
return userResult.getName();
},inputParamCallableList,resultList);
} catch (Throwable e) {
System.out.println("===检测到异常回滚!!!"+e+"===");
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
long endTime = System.currentTimeMillis();
System.out.println("===耗时:"+(endTime - startTime)+"===");
return resultList;
}