执行器初始化流程
前面详细分析了xxl-job-admin的流程,从这章开始,我们分析xxl-job的执行器流程。我们之前运行的是xxl-job-executor-sample-springboot这个项目,所以我们就从这个项目开始分析。

这个项目的结构还是很简单的,主要看XxlJobConfig和SampleXxlJob。SampleXxlJob是使用XxlJobConfig配置扫描的,直接看XxlJobConfig文件代码吧。
@Configuration
public class XxlJobConfig {
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
// 注意,这些属性都是通过application.properties注入的
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}xxlJobSpringExecutor
可以看到声明了一个Bean,名字叫做xxlJobSpringExecutor,接下来我们这个类的代码。
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
// start
@Override
public void afterSingletonsInstantiated() {
// 初始化JobHandlerMethod,把有XxlJob注解的方法提取出来
initJobHandlerMethodRepository(applicationContext);
// 刷新GlueFactory
GlueFactory.refreshInstance(1);
// 启动XxlJobSpringExecutor
super.start();
}
// ---------------------- applicationContext ----------------------
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
XxlJobSpringExecutor.applicationContext = applicationContext;
}
}由于XxlJobSpringExecutor继承了SmartInitializingSingleton接口,所以会调用afterSingletonsInstantiated方法
JobHandler初始化
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 获取applicationContext所有的bean
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
// get bean
Object bean = null;
Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
// 有Lazy注解的先跳过
if (onBean!=null){
logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
continue;
}else {
bean = applicationContext.getBean(beanDefinitionName);
}
// filter method
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
// 获取bean上具有XxlJob注解的所有方法
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
// generate and regist method job handler
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
// 注册JobHandler方法
registJobHandler(xxlJob, bean, executeMethod);
}
}
}
}可以看到这个方法就是扫描所有具有XxlJob注解的方法,然后注册这些方法。接下来看registJobHandler这个方法的实现。
public class XxlJobExecutor {
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod) {
Class<?> clazz = bean.getClass();
String name = xxlJob.value();
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
// registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
return jobHandlerRepository.put(name, jobHandler);
}
}可以看到registJobHandler就是往jobHandlerRepository写到jobHandlerRepository里面,key就是xxlJob.value, value就是MethodJobHandler,这个类是封装了JobHandler的一个类。
GlueFactory初始化
接下来看一下GlueFactory#refreshInstance这个方法,主要是用来处理GLUE类型的定时任务的,GLUE任务就是一段可以执行的代码。
public class GlueFactory {
public static void refreshInstance(int type){
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
glueFactory = new SpringGlueFactory();
}
}
}由于是Spring环境,默认使用SpringGlueFactory。简单看一下GlueFactory这个工厂类的方法吧,这里就不细讲了。
public class GlueFactory {
// 通过codeSource获取class实例并执行
public IJobHandler loadNewInstance(String codeSource) throws Exception;
private Class<?> getCodeSourceClass(String codeSource);
public void injectService(Object instance);
}执行器初始化
public class XxlJobExecutor {
public void start() throws Exception {
// 初始化log的文件路径
XxlJobFileAppender.initLogPath(logPath);
// 初始化admin控制台的地址
initAdminBizList(adminAddresses, accessToken);
// 启动日志清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化CallbackThread线程
TriggerCallbackThread.getInstance().start();
// 初始化客户端服务器,基于netty
initEmbedServer(address, ip, port, appname, accessToken);
}
}初始化执行器服务
我们看一下initEmbedServer的执行方法,代码如下:
public class XxlJobExecutor {
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
}可以看到初始化了一个EmbedServer类,并且调用了EmbedServer的start方法,我们具体看一下对应的方法代码。
public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// 向服务端注册执行器
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
}可以看到初始化了一个线程池,并且使用netty建立了一个服务器。核心地方我们查看EmbedHttpServerHandler这个处理类
EmbedHttpServerHandler
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
}可以看到用刚才创建的线程池来处理admin下发的请求。
我们主要看/run分支的代码,可以看到调用了executorBiz#run方法,这个对应的实现是ExecutorBizImpl,我们看一下实现:
public class ExecutorBizImpl implements ExecutorBiz {
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 通过triggerParam的executor获取执行器方法
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// 如果jobThread不为空,表明同一个jobId已经有工作线程在处理了
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
// 丢弃
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
// 覆盖之前的
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
}可以看到是从XxlJobExecutor拿到jobId对应的jobThread,然后将triggerParam参数放入到jobThread里面。看一下jobThread#pushTriggerQueue的方法
public class JobThread extends Thread {
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// 如果已经存在了对应的logId
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
// 添加triggerParam
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
}最后让我们看一下jobThread的核心方法start。
public class JobThread extends Thread {
public void run() {
while(!toStop){
//1、从队列中触发参数
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam != null) {
//2、如果存在执行超时时间并大于0,则在规定的时间异步执行,否则立即执行
if (triggerParam.getExecutorTimeout() > 0) {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
//处理器执行方法
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
//等待结果
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
}else{
// just execute 立即执行
handler.execute();
}
} else {
// 如果空闲的次数超过30次,则销毁jobThread
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
}
}
}上述将任务执行的代码省略了很多,只将核心的代码抽取出来。任务的执行是不断执行的,只有当任务停止了(toStop设置为ture),才跳出while循环。首先从triggerQueue队列中弹出触发参数,如果存在执行超时时间并大于0,则在规定的时间异步调用handler的execute方法执行任务,否则立即调用handler的execute方法执行任务。
handler#execute的代码比较简单,这里就不详细讲了。
执行器注册
最后讲一下执行器的注册流程,在netty服务器初始化的代码中,可以看到一行注册执行器的代码。
public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) {
startRegistry(appname, address);
}
public void startRegistry(final String appname, final String address) {
// start registry
ExecutorRegistryThread.getInstance().start(appname, address);
}
}我们看一下ExecutorRegistryThread的源码,如下所示
public class ExecutorRegistryThread {
public void start(final String appname, final String address) {
registryThread = new Thread();
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
}可以看到注册了一个守护线程registryThread,我们看看这个线程做了什么事情
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
}
// 休眠30秒
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
// registry remove
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
}
}
});可以看到主要是每隔30秒,遍历adminBizList,然后调用adminBiz.registry注册和adminBiz.registryRemove移除注册。
对应的实现类是AdminBizClient,查看实现代码, 前面讲过postBody的实现,这里就不细说了。
public class AdminBizClient implements AdminBiz {
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
}