Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Modifying XXL-Job for Immediate Stop and Enhanced Initialization

Tech May 14 1

XXL-Job Source Code Modifications

This article discusses modifications to the XXL-Job distributed task scheduling framework's source code to enable immediate job termination and improved initialization handling.

Core Components

  • xxl-job-admin: Administration module
  • xxl-job-core: Core schedulnig engine

Immediate Job Termination Implementation

The administration module's XxlJobTrigger.java contains the execution scheduling logic. The runExecutor method uses HTTP protocol for task dispatch. To enable immediate job termination, we need to propagate the scheduling status to worker nodes.

Administration Module Modification

public ReturnT<String> terminateJob(int jobIdentifier) {
    XxlJobInfo jobDetails = xxlJobInfoDao.loadById(jobIdentifier);
    jobDetails.setTriggerStatus(0);
    jobDetails.setTriggerLastTime(0);
    jobDetails.setTriggerNextTime(0);
    jobDetails.setUpdateTime(new Date());
    xxlJobInfoDao.update(jobDetails);
    
    // Immediate worker thread termination
    JobTriggerPoolHelper.trigger(jobIdentifier, TriggerTypeEnum.MANUAL, -1, null, null, null);
    return ReturnT.SUCCESS;
}

Core Module Modification

The JobThread.java class handles task execution. We add initialization state tracking and immediate termination capability:

public void run() {
    while(!terminationRequested){
        executionActive = false;
        idleCounter++;

        TriggerParam triggerData = null;
        try {
            triggerData = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerData != null) {
                executionActive = true;
                idleCounter = 0;
                triggerLogIdSet.remove(triggerData.getLogId());

                String logFile = XxlJobFileAppender.makeLogFileName(
                    new Date(triggerData.getLogDateTime()), triggerData.getLogId());
                XxlJobContext jobContext = new XxlJobContext(
                    triggerData.getJobId(),
                    triggerData.getExecutorParams(),
                    logFile,
                    triggerData.getBroadcastIndex(),
                    triggerData.getBroadcastTotal());

                XxlJobContext.setXxlJobContext(jobContext);
                int currentStatus = triggerData.getTriggerStatus();
                
                // Enhanced initialization logic
                if(initializationState == 1 && currentStatus != 0){
                    try {
                        if(handler.initialize()){
                            initializationState = 2;
                            logger.info("Job initialization successful");
                        } else {
                            logger.error("Job initialization failed");
                        }
                    } catch (Throwable e) {
                        logger.error(e.getMessage(), e);
                    }
                }
                
                // Immediate termination handling
                if(currentStatus == 0){
                    XxlJobExecutor.removeJobThread(jobId, "Job terminated by administrator");
                    continue;
                }
                
                // Execute only if properly initialized
                if(initializationState == 2){
                    XxlJobHelper.log("Job execution started with parameters: " + jobContext.getJobParam());

                    if (triggerData.getExecutorTimeout() > 0) {
                        Thread executionThread = null;
                        try {
                            FutureTask<Boolean> timeoutTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                                @Override
                                public Boolean call() throws Exception {
                                    XxlJobContext.setXxlJobContext(jobContext);
                                    handler.execute();
                                    return true;
                                }
                            });
                            executionThread = new Thread(timeoutTask);
                            executionThread.start();

                            Boolean executionResult = timeoutTask.get(triggerData.getExecutorTimeout(), TimeUnit.SECONDS);
                        } catch (TimeoutException e) {
                            XxlJobHelper.log("Job execution timeout occurred");
                            XxlJobHelper.handleTimeout("Execution timeout exceeded");
                        } finally {
                            executionThread.interrupt();
                        }
                    } else {
                        handler.execute();
                    }

                    // Handle execution results
                    if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                        XxlJobHelper.handleFail("Execution result not available");
                    } else {
                        String resultMessage = XxlJobContext.getXxlJobContext().getHandleMsg();
                        resultMessage = (resultMessage != null && resultMessage.length() > 50000)
                            ? resultMessage.substring(0, 50000).concat("...")
                            : resultMessage;
                        XxlJobContext.getXxlJobContext().setHandleMsg(resultMessage);
                    }
                    XxlJobHelper.log("Job execution completed with result code: " + 
                        XxlJobContext.getXxlJobContext().getHandleCode());
                }
            } else {
                if (idleCounter > 30) {
                    if(triggerQueue.size() == 0) {
                        XxlJobExecutor.removeJobThread(jobId, "Excessive idle time detected");
                    }
                }
            }
        } catch (Throwable e) {
            if (terminationRequested) {
                XxlJobHelper.log("Thread termination requested: " + stopReason);
            }
            StringWriter errorOutput = new StringWriter();
            e.printStackTrace(new PrintWriter(errorOutput));
            String errorDetails = errorOutput.toString();
            XxlJobHelper.handleFail(errorDetails);
        } finally {
            if(triggerData != null) {
                if (!terminationRequested) {
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerData.getLogId(),
                        triggerData.getLogDateTime(),
                        XxlJobContext.getXxlJobContext().getHandleCode(),
                        XxlJobContext.getXxlJobContext().getHandleMsg())
                    );
                } else {
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerData.getLogId(),
                        triggerData.getLogDateTime(),
                        XxlJobContext.HANDLE_CODE_FAIL,
                        stopReason + " [Job terminated during execution]")
                    );
                }
            }
        }
    }

    // Cleanup remaining trigger requests
    while(triggerQueue != null && triggerQueue.size() > 0){
        TriggerParam remainingTrigger = triggerQueue.poll();
        if (remainingTrigger != null) {
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                remainingTrigger.getLogId(),
                remainingTrigger.getLogDateTime(),
                XxlJobContext.HANDLE_CODE_FAIL,
                stopReason + " [Job queued but not executed]")
            );
        }
    }

    try {
        handler.cleanup();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    logger.info("Job thread stopped: {}", Thread.currentThread());
}

Tags: xxl-jobJava

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.