异步任务AsyncIAE
# 1 项目简介
提示
目前 AsyncIAE
项目暂时仅实现了异步导出功能,异步导入功能待开发。
AsyncIAE
项目的出现主要是为了解决过去导出服务实现较繁琐,开发人员额外花在导出功能的工时较多的问题。
项目包括服务端 async-iae-server
和客户端 async-iae-client
两部分。
AsyncIAE
项目包括以下特性:
- 支持在查询接口上添加简单注解即可实现通用导出功能。
- 支持自定义动态列头;支持多行列头。
- 支持自定义导出逻辑,需在服务端实现指定接口。
- 支持超过指定条数导出压缩包。
- 支持通过指定阈值控制各业务类型的导出速度。
- 支持小于指定条数时直接同步导出到浏览器。
# 2 设计要点
# 2.1 库表设计
CREATE TABLE "TAB_ASYNC_JOB"
("ID" NUMBER(20,0) NOT NULL,
"JOB_NAME" NVARCHAR2(100) DEFAULT NULL,
"JOB_DESC" NVARCHAR2(255) DEFAULT NULL,
"JOB_STATUS" NUMBER(1,0) DEFAULT 0,
"JOB_META_INFO" NVARCHAR2(2000) DEFAULT NULL,
"JOB_FAIL_MSG" NVARCHAR2(255) DEFAULT NULL,
"JOB_CONDITION" NVARCHAR2(500) DEFAULT NULL,
"JOB_TYPE" NUMBER(1,0) DEFAULT NULL,
"BUSINESS_TYPE" NVARCHAR2(60) DEFAULT NULL,
"BUSINESS_TYPE_NAME" NVARCHAR2(100) DEFAULT NULL,
"BUSINESS_REQUEST_URL" NVARCHAR2(100) DEFAULT NULL,
"FILE_PATH" NVARCHAR2(255) DEFAULT NULL,
"CREATE_ID" NUMBER(20,0) DEFAULT NULL,
"CREATE_CODE" NVARCHAR2(30) DEFAULT NULL,
"CREATE_NAME" NVARCHAR2(60) DEFAULT NULL,
"CREATE_TIME" DATE DEFAULT NULL,
"UPDATE_TIME" DATE DEFAULT NULL,
CONSTRAINT "TAB_ASYNC_JOB" PRIMARY KEY ("ID")
);
COMMENT ON TABLE TAB_ASYNC_JOB IS '异步任务表(导入/导出)';
COMMENT ON COLUMN TAB_ASYNC_JOB.ID IS 'ID';
COMMENT ON COLUMN TAB_ASYNC_JOB.JOB_NAME IS '任务名';
COMMENT ON COLUMN TAB_ASYNC_JOB.JOB_DESC IS '任务描述';
COMMENT ON COLUMN TAB_ASYNC_JOB.JOB_STATUS IS '任务状态(0.初始/1.进行中/2.成功/3.失败/4.暂停)';
COMMENT ON COLUMN TAB_ASYNC_JOB.JOB_FAIL_MSG IS '任务失败信息';
COMMENT ON COLUMN TAB_ASYNC_JOB.JOB_CONDITION IS '任务查询条件';
COMMENT ON COLUMN TAB_ASYNC_JOB.JOB_META_INFO IS '数据查询返回元信息';
COMMENT ON COLUMN TAB_ASYNC_JOB.JOB_TYPE IS '任务类型(1.导出/2.导入/3.批量更新)';
COMMENT ON COLUMN TAB_ASYNC_JOB.BUSINESS_TYPE IS '业务类型';
COMMENT ON COLUMN TAB_ASYNC_JOB.BUSINESS_TYPE_NAME IS '业务类型名称';
COMMENT ON COLUMN TAB_ASYNC_JOB.BUSINESS_REQUEST_URL IS '数据接口请求地址';
COMMENT ON COLUMN TAB_ASYNC_JOB.FILE_PATH IS '文件路径(导入结果/导出文件/批量更新结果)';
COMMENT ON COLUMN TAB_ASYNC_JOB.CREATE_ID IS '任务提交人ID';
COMMENT ON COLUMN TAB_ASYNC_JOB.CREATE_CODE IS '任务提交人CODE';
COMMENT ON COLUMN TAB_ASYNC_JOB.CREATE_NAME IS '任务提交人NAME';
COMMENT ON COLUMN TAB_ASYNC_JOB.CREATE_TIME IS '任务创建时间';
COMMENT ON COLUMN TAB_ASYNC_JOB.UPDATE_TIME IS '任务更新时间';
CREATE INDEX TYPE_CODE_IDX ON TAB_ASYNC_JOB (JOB_TYPE,BUSINESS_TYPE,CREATE_TIME);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 2.2 核心SQL
通过该SQL获取待执行的导出任务,导出任务按业务类型分组,限制不同组只能同时进行指定数量的导出。
<select id="selectNotStartedJobs" resultType="com.xxx.iae.entity.AsyncJobEnhance">
SELECT C.*
FROM (
SELECT B.*
<if test="exportThresholdList != null and exportThresholdList.size() > 0">
<foreach collection="exportThresholdList" separator=" " open=", CASE " close="ELSE 0 END AS DISABLE" item="item">
WHEN B.BUSINESS_TYPE = #{item.businessType} AND B.ROW_NUM > #{item.threshold} THEN 1
</foreach> --定制指定业务类型可执行的任务数
</if>
FROM (SELECT A.ID,
A.JOB_NAME,
A.JOB_DESC,
A.JOB_STATUS,
A.JOB_CONDITION,
A.JOB_TYPE,
A.BUSINESS_TYPE,
A.FILE_PATH,
A.JOB_META_INFO,
A.BUSINESS_REQUEST_URL,
A.CREATE_TIME,
--根据业务类型分组,并按任务状态倒序、创建时间倒序来排序
ROW_NUMBER() OVER(PARTITION BY A.BUSINESS_TYPE ORDER BY A.JOB_STATUS DESC,A.CREATE_TIME DESC) ROW_NUM
FROM TAB_ASYNC_JOB A
WHERE A.JOB_TYPE = 1 --类型为导出
AND A.JOB_STATUS IN (0, 1) --查出未开始和进行中的任务
) B
WHERE B.ROW_NUM <![CDATA[ <= ]]> 10 --每个不同业务类型的导出默认可同时进行10个任务,通过exportThresholdList可调小
) C
WHERE C.JOB_STATUS = 0 --只查询未开始的任务
<if test="exportThresholdList != null and exportThresholdList.size() > 0">
AND C.DISABLE = 0 --定制指定业务类型可执行的任务数
</if>
order by C.CREATE_TIME
</select>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 2.3 服务端
# 2.3.1 线程池使用
线程使用数达到poolSize后,有新任务则拒绝。等待定时任务下一次执行时,又会去数据库查询待执行的任务。
@Autowired
private XxlJobConstant xxlJobConstant;
private static ExecutorService executorService;
@PostConstruct
public void initialize() {
executorService = new ThreadPoolExecutor(xxlJobConstant.poolSize,
xxlJobConstant.poolSize,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
}
2
3
4
5
6
7
8
9
10
11
12
13
# 2.3.2 ApplicationRunner使用
容器启动后缓存所有导出接口实现类,以InterfaceImpl自定义注解的value值作为key。定时任务触发时,根据消费端指定的interfaceImpl值,与缓存key匹配,获取到具体导出实现类并执行相关逻辑。
@Slf4j
@Component
public class ExportServiceImplFactory implements ApplicationRunner, ApplicationContextAware {
//运行上下文
private ApplicationContext applicationContext;
private Map<String, ExportService> exportServiceImplMap = new HashMap<>();
@Override
public void run(ApplicationArguments args) throws Exception {
Map<String, ExportService> beansOfType = applicationContext.getBeansOfType(ExportService.class);
for(ExportService exportService : beansOfType.values()) {
String interfaceImpl = exportService.getClass().getAnnotation(InterfaceImpl.class).value();
exportServiceImplMap.put(interfaceImpl, exportService);
if ("default".equals(interfaceImpl)) {
exportServiceImplMap.put("", exportService);
}
}
}
public ExportService getExportServiceImpl(String interfaceImpl) {
return exportServiceImplMap.get(interfaceImpl == null ? "" : interfaceImpl);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 2.3.3 压缩文件
导出数据超过指定条数,输出流改为zip输出流。
public ByteArrayOutputStream genZipOutputStream(AsyncJobEnhance asyncJobEnhance, ByteArrayOutputStream out) {
if (asyncJobEnhance.isNeedCompress()) {
ByteArrayOutputStream zipOut = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(zipOut)) {
try (BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(out.toByteArray()))) {
//指定zip文件夹
ZipEntry zipEntry = new ZipEntry(asyncJobEnhance.getJobName() + asyncJobEnhance.getExcelFileSuffix());
zos.putNextEntry(zipEntry);
int len;
byte[] buffer = new byte[1024 * 10];
while ((len = bis.read(buffer, 0, buffer.length)) != -1) {
zos.write(buffer, 0, len);
zos.flush();
}
} catch (Exception e) {
log.error("genZipOutputStream:BufferedInputStream", e);
throw new AsyncJobException(ResultCodeEnum.DATA_ZIP_CHANGE);
}
} catch (Exception e) {
log.error("genZipOutputStream:ZipOutputStream", e);
throw new AsyncJobException(ResultCodeEnum.DATA_ZIP_CHANGE);
}
out = zipOut;
}
return out;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 2.4 客户端
# 2.4.1 自定义注解+切面
拦截有声明了@AsyncExport的方法,当前端发起的请求,消息体或者请求头上带有async=1的标识时,该方法会调整为去构建导出任务,然后返回,不再走之前查询数据的逻辑。
private static final String HEADER_ASYNC = "async";
// 申明切点
@Pointcut("@annotation(com.xxx.iae.annotation.AsyncExport)")
private void serviceAspect() {
}
@Around(value = "serviceAspect()")
public Object methodAround(ProceedingJoinPoint joinPoint) throws Throwable {
AsyncJobEnhance asyncJobEnhance = ExportUtil.exportInfoInit(joinPoint);
if (ExportUtil.isJson(asyncJobEnhance.getRequest()) && ExportUtil.hasAsyncFlag(asyncJobEnhance.getRequest())) {
//调用原有方法获取数据总条数,current设置为Integer.MAX_VALUE是为了不让mybatis再去查多一次明细
Page page = null;
if (syncMaxSize > 0) {
page = invokeMethod(asyncJobEnhance, new Page().setCurrent(Integer.MAX_VALUE));
}
try {
//构建导出的必要信息
buildAsyncJobEnhance(asyncJobEnhance);
if (page == null || page.getTotal() > syncMaxSize) {
//异步导出
asyncHandle(asyncJobEnhance);
} else {
//同步导出
syncHandle(asyncJobEnhance);
}
return JSONObject.parseObject(JSONObject.toJSONString(Result.success(true)), asyncJobEnhance.getReturnOuterTypeClass());
} catch (AsyncJobException asyncJobException) {
asyncJobException.printStackTrace();
Result errorResult = Result.error(asyncJobException.getCode(), asyncJobException.getMessage());
errorResult.setData(false);
return JSONObject.parseObject(JSONObject.toJSONString(errorResult), asyncJobEnhance.getReturnOuterTypeClass());
}
}
// 执行目标方法
Object result = joinPoint.proceed();
return result;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 2.4.2 后置处理器
通过后置处理器的方式,实现自定义列头的设置,及任务创建人信息设置。
public interface ExportColumnsPostProcessor {
/**
* 自定义列头处理,@CustomColumns设置在字段属性上时的处理
* @param customColumns 自定义注解对应的字段对象,需开发者自己强制类型转换
* @param columnHeadList 方法返回vo对应的列头
* @return 方法处理后返回标准列头格式
*/
List<ExportColumnHead> columnsPropHandle(Object customColumns, List<ExportColumnHead> columnHeadList);
/**
* 自定义列头处理,@CustomColumns设置在方法上时的处理
* @param valueArg 注解@CustomColumns中的value值
* @return 方法处理后返回标准列头格式
*/
List<ExportColumnHead> columnsMethodHandle(String valueArg);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface ExportUserInfoPostProcessor {
/**
* @param request
* @return 返回任务创建人的信息(将更新到数据库的任务记录中)
*/
ExportUserInfo userInfoHandle(HttpServletRequest request);
}
2
3
4
5
6
7
8
9
private ExportUserInfo getExportUserInfo(HttpServletRequest request) {
//获取用户信息设置后置处理器
Map<String, ExportUserInfoPostProcessor> userInfoPostProcessorImplMap = SpringBeanUtils.getApplicationContext().getBeansOfType(ExportUserInfoPostProcessor.class);
if (userInfoPostProcessorImplMap.size() == 0) {
throw new AsyncJobException(ResultCodeEnum.ERROR_GET_USERINFO);
}
ExportUserInfoPostProcessor exportUserInfoPostProcessor = userInfoPostProcessorImplMap.values().iterator().next();
//获取用户信息
return exportUserInfoPostProcessor.userInfoHandle(request);
}
2
3
4
5
6
7
8
9
10
# 2.4.3 构建列头
public class ExportUtil {
//解析类对象,将带有ExportProperty自定义注解的字段属性,转换为列头元信息
private static List<ExportColumnHead> getExportColumnHeadList(Class clazz) {
List<ExportColumnHead> exportColumnHeadList = new ArrayList<>();
for (Field field : clazz.getDeclaredFields()) {
ExportProperty annotation = field.getAnnotation(ExportProperty.class);
if (annotation == null) {
continue;
}
exportColumnHeadList.add(new ExportColumnHead(field.getName(), annotation.value(), annotation.hide(), annotation.index()));
}
return exportColumnHeadList;
}
//解析列头信息,进行排序
private static void buildHeaderList(List<ExportColumnHead> exportColumnHeadList, List<String> headPropList, List<List<String>> headLabelList) {
if (CollectionUtils.isEmpty(exportColumnHeadList)) {
throw new AsyncJobException(ResultCodeEnum.ERROR_COLUMNHEAD_EMPTY);
}
exportColumnHeadList.stream()
.filter(dto -> (dto.getHide() == null || !dto.getHide()))
.sorted(Comparator.comparing(ExportColumnHead::getSortNum, Comparator.nullsLast(Integer::compareTo)))
.forEach(sortDTO -> {
headPropList.add(sortDTO.getProp());
headLabelList.add(Arrays.asList(sortDTO.getLabel()));
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 3 使用说明
# 3.1 对象属性添加注解
# 3.1.1 设置导出列头信息
提示
在列表查询的返回参数类中添加 @ExportProperty
注解
也支持使用swagger的 @ApiModelProperty
注解。
优先使用 @ExportProperty
,没有 @ExportProperty
则再去找 @ApiModelProperty
注解。
@ExportProperty
示例
- value:列头名称,支持多行列头,所以value为数组类型。
- index:序号,按升序排序。
- hide:是否不导出,默认是导出
public class User {
@ExportProperty(value = "id", hide = true)
private String id;
@ExportProperty(value = "用户id", index = 2)
private String userId;
@ExportProperty(value = "用户名", index = 1)
private String username;
@ExportProperty(value = {"用户姓名", "第二行列头"}, index = 3)
private String nickname;
@ExportProperty(value = "性别", index = 4)
private String gender;
@ExportProperty(value = {"手机", "第二行列头", "第三行列头"}, index = 5)
private String mobile;
@ExportProperty(value = "状态", index = 6)
private String status;
@ExportProperty(value = "邮件", index = 7)
private String email;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 3.1.2 标识自定义列
提示
在列表查询的请求参数类中添加 @CustomColumns
注解。
添加该注解后需实现
ExportColumnsPostProcessor#columnsPropHandle(Object customColumns, List
返回参数为 List<ExportColumnHead>
。
其中 customColumns
对应以下示例中的 formatList
。开发者需自己实现接口方法,将 customColumns
转换为标准的 List<ExportColumnHead>
。
@CustomColumns
示例
@Data
public class TestVO {
private String name;
private Integer age;
private Date birthday;
@CustomColumns
private List<FormatColumn> formatList;
}
2
3
4
5
6
7
8
9
# 3.2 方法添加注解
# 3.2.1 列表查询方法转换为导出任务方法
@AsyncExport
@AsyncExport(value = "导出任务名",
jobDesc = "任务描述",
businessType = "queryUserList",
businessTypeName = "查询用户列表",
interfaceImpl = "指定服务端自定义接口实现")
- value:当前异步导出任务的任务名,提交到数据库(默认为方法名+时间戳)。
- jobDesc:当前异步导出任务的任务描述,提交到数据库(默认为方法名+时间戳)。
- businessType:业务类型(默认为方法名)。
- businessTypeName:业务类型名称(默认为方法名)。
- interfaceImpl:自定义导出逻辑,需在服务端实现具体逻辑(服务端实现类用@InterfaceImpl声明)。
@RequestMapping("/queryUserList")
@AsyncExport(value = "导出任务名", interfaceImpl = "指定服务端自定义接口实现")
public Result<Page<User>> queryUserList(@RequestParam(value = "current", required = false, defaultValue = "1") Long current,
@RequestParam(value = "size", required = false, defaultValue = "20") Long size,
@RequestBody TestVO testVO){
Page<User> page = new Page<>();
page.setCurrent(current);
page.setSize(size);
// page.setCurrent(testVO.getCurrent());
// page.setSize(testVO.getSize());
return Result.success(userService.page(page));
}
2
3
4
5
6
7
8
9
10
11
12
@Data
public class TestVO {
private String name;
private Integer age;
private Date birthday;
private Long current;
private Long size;
private List<FormatColumn> formatList;
}
2
3
4
5
6
7
8
9
10
自定义导出逻辑
服务端实现
@Component
@InterfaceImpl("default")
public class StandardExportImpl extends AbstractExportServiceImpl {
// ... ...
}
2
3
4
5
@Component
@InterfaceImpl("demo")
public class DemoExportImpl extends AbstractExportServiceImpl {
@Override
public void writeExcel(AsyncJobEnhance asyncJobEnhance, ByteArrayOutputStream out) {
log.info("DemoExportImpl : writeExcel.asyncJobEnhance : {}", JSONObject.toJSON(asyncJobEnhance));
}
@Override
public String uploadExcel(AsyncJobEnhance asyncJobEnhance, ByteArrayOutputStream out) {
log.info("DemoExportImpl : uploadExcel.asyncJobEnhance : {}", JSONObject.toJSON(asyncJobEnhance));
return "Demo URL";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 3.2.2 标识自定义列
提示
在方法上添加 @CustomColumns
注解。
添加该注解后需实现ExportColumnsPostProcessor#columnsMethodHandle(String valueArg)。返回参数为 List<ExportColumnHead>
。
开发者需自己实现接口方法,其中valueArg为@CustomColumns中的value值。例如:接口方法中可查询redis中的自定义数据,最终返回标准的 List<ExportColumnHead>
。
@CustomColumns("abc")
示例
@RequestMapping("/queryUserList")
@AsyncExport
@CustomColumns("userCustomColumn")
public Result<Page<User>>queryUserList(@RequestBody TestVO testVO){
Page<User> page=new Page<>();
page.setCurrent(testVO.getCurrent());
page.setSize(testVO.getSize());
return Result.success(userService.page(page));
}
2
3
4
5
6
7
8
9
# 3.3 实现后置处理器
通过实现ExportPropPostProcessor后置处理器来设置用户信息及自定义列信息等。
示例
@Component
public class ExportPostProcessor implements ExportPropPostProcessor {
private static final String HEADER_LOGIN_USER = "X-UPS-USER";
@Override
public ExportUserInfo userInfoHandle(ProceedingJoinPoint joinPoint, HttpServletRequest request) {
String loginUserJson = request.getHeader(HEADER_LOGIN_USER);
if (StringUtils.isNotEmpty(loginUserJson)) {
try {
loginUserJson = URLDecoder.decode(loginUserJson, "UTF-8");
} catch (UnsupportedEncodingException var4) {
throw new RuntimeException(var4);
}
return JSONObject.parseObject(loginUserJson, ExportUserInfo.class);
}
return new ExportUserInfo();
}
// columnsPropHandle 和 columnsMethodHandle 一般只需实现其中一种即可
@Override
public List<ExportColumnHead> columnsPropHandle(Object customColumns, Class clazz) {
List<FormatColumn> formatList = (List<FormatColumn>) customColumns;
List<ExportColumnHead> columnHeadList = new ArrayList<>();
if (formatList == null) {
return null;
}
formatList.forEach(formatColumn -> {
ExportColumnHead exportColumnHead = new ExportColumnHead();
exportColumnHead.setProp(formatColumn.getProp());
exportColumnHead.setSortNum(formatColumn.getSortNum());
exportColumnHead.setLabel(ExportUtil.getColumnHeadLabel(clazz, formatColumn.getProp()));
columnHeadList.add(exportColumnHead);
});
return columnHeadList;
}
@Override
public List<ExportColumnHead> columnsMethodHandle(String valueArg) {
//redis get(valueArg)
ExportColumnHead exportColumnHead = new ExportColumnHead();
exportColumnHead.setProp("testprop").setLabel(new String[]{"aaaaa"}).setSortNum(1);
List<ExportColumnHead> testList = new ArrayList<>();
testList.add(exportColumnHead);
return testList;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 3.4 配置XXL-JOB定时任务
新增执行器
新增任务