Modifying XXL-Job for Immediate Stop and Enhanced Initialization
XXL-Job Source Code Modifications
This article discusses modifications to the XXL-Job distributed task scheduling framework's source code to enable immediate job termination and improved initialization handling.
Core Components
- xxl-job-admin: Administration module
- xxl-job-core: Core schedulnig engine
Immediate Job Termination Implementation
The administration module's XxlJobTrigger.java contains the execution scheduling logic. The runExecutor method uses HTTP protocol for task dispatch. To enable immediate job termination, we need to propagate the scheduling status to worker nodes.
Administration Module Modification
public ReturnT<String> terminateJob(int jobIdentifier) {
XxlJobInfo jobDetails = xxlJobInfoDao.loadById(jobIdentifier);
jobDetails.setTriggerStatus(0);
jobDetails.setTriggerLastTime(0);
jobDetails.setTriggerNextTime(0);
jobDetails.setUpdateTime(new Date());
xxlJobInfoDao.update(jobDetails);
// Immediate worker thread termination
JobTriggerPoolHelper.trigger(jobIdentifier, TriggerTypeEnum.MANUAL, -1, null, null, null);
return ReturnT.SUCCESS;
}
Core Module Modification
The JobThread.java class handles task execution. We add initialization state tracking and immediate termination capability:
public void run() {
while(!terminationRequested){
executionActive = false;
idleCounter++;
TriggerParam triggerData = null;
try {
triggerData = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerData != null) {
executionActive = true;
idleCounter = 0;
triggerLogIdSet.remove(triggerData.getLogId());
String logFile = XxlJobFileAppender.makeLogFileName(
new Date(triggerData.getLogDateTime()), triggerData.getLogId());
XxlJobContext jobContext = new XxlJobContext(
triggerData.getJobId(),
triggerData.getExecutorParams(),
logFile,
triggerData.getBroadcastIndex(),
triggerData.getBroadcastTotal());
XxlJobContext.setXxlJobContext(jobContext);
int currentStatus = triggerData.getTriggerStatus();
// Enhanced initialization logic
if(initializationState == 1 && currentStatus != 0){
try {
if(handler.initialize()){
initializationState = 2;
logger.info("Job initialization successful");
} else {
logger.error("Job initialization failed");
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
// Immediate termination handling
if(currentStatus == 0){
XxlJobExecutor.removeJobThread(jobId, "Job terminated by administrator");
continue;
}
// Execute only if properly initialized
if(initializationState == 2){
XxlJobHelper.log("Job execution started with parameters: " + jobContext.getJobParam());
if (triggerData.getExecutorTimeout() > 0) {
Thread executionThread = null;
try {
FutureTask<Boolean> timeoutTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
XxlJobContext.setXxlJobContext(jobContext);
handler.execute();
return true;
}
});
executionThread = new Thread(timeoutTask);
executionThread.start();
Boolean executionResult = timeoutTask.get(triggerData.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobHelper.log("Job execution timeout occurred");
XxlJobHelper.handleTimeout("Execution timeout exceeded");
} finally {
executionThread.interrupt();
}
} else {
handler.execute();
}
// Handle execution results
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
XxlJobHelper.handleFail("Execution result not available");
} else {
String resultMessage = XxlJobContext.getXxlJobContext().getHandleMsg();
resultMessage = (resultMessage != null && resultMessage.length() > 50000)
? resultMessage.substring(0, 50000).concat("...")
: resultMessage;
XxlJobContext.getXxlJobContext().setHandleMsg(resultMessage);
}
XxlJobHelper.log("Job execution completed with result code: " +
XxlJobContext.getXxlJobContext().getHandleCode());
}
} else {
if (idleCounter > 30) {
if(triggerQueue.size() == 0) {
XxlJobExecutor.removeJobThread(jobId, "Excessive idle time detected");
}
}
}
} catch (Throwable e) {
if (terminationRequested) {
XxlJobHelper.log("Thread termination requested: " + stopReason);
}
StringWriter errorOutput = new StringWriter();
e.printStackTrace(new PrintWriter(errorOutput));
String errorDetails = errorOutput.toString();
XxlJobHelper.handleFail(errorDetails);
} finally {
if(triggerData != null) {
if (!terminationRequested) {
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerData.getLogId(),
triggerData.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg())
);
} else {
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerData.getLogId(),
triggerData.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [Job terminated during execution]")
);
}
}
}
}
// Cleanup remaining trigger requests
while(triggerQueue != null && triggerQueue.size() > 0){
TriggerParam remainingTrigger = triggerQueue.poll();
if (remainingTrigger != null) {
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
remainingTrigger.getLogId(),
remainingTrigger.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [Job queued but not executed]")
);
}
}
try {
handler.cleanup();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info("Job thread stopped: {}", Thread.currentThread());
}