博客信息

修改spring quartz源码支持集群的MethodInvokingJobDetailFactoryBean

发布时间:『 2016-12-16 17:43』  博客类别:Java  阅读(1677) 评论(0)

我们知道配置quartz集群的时候,可以用org.springframework.scheduling.quartz.JobDetailBean配置JobDetail,但是这样不能配置concurrent。不能阻止任务并发调度。但是Spring实现的MethodInvokingJobDetailFactoryBean不能序列化,无法进行集群。

重写MethodInvokingJobDetailFactoryBean类:

import java.lang.reflect.InvocationTargetException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.StatefulJob;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.MethodInvoker;
/**
 * 解决quartz集群问题:使用可持久化的JOB执行类
 *
 *
 */
@SuppressWarnings("rawtypes")
public class ClusterMethodInvokingJobDetailFactoryBean implements FactoryBean, BeanNameAware, InitializingBean, ApplicationContextAware{
/**
 * Set by <code>setApplicationContext</code> when a
 * BeanInvokingJobDetailFactoryBean is defined within a Spring
 * ApplicationContext as a bean. <p> Used by the <code>execute</code> method
 * of the BeanInvokingJob and StatefulBeanInvokingJob classes.
 * 
 * @see #setApplicationContext(ApplicationContext)
 * @see BeanInvokingJob#execute(JobExecutionContext)
 */
protected static ApplicationContext applicationContext;
private Log logger = LogFactory.getLog(getClass());
/**
 * The JobDetail produced by the <code>afterPropertiesSet</code> method of
 * this Class will be assigned to the Group specified by this property.
 * Default: Scheduler.DEFAULT_GROUP
 * 
 * @see #afterPropertiesSet()
 * @see Scheduler#DEFAULT_GROUP
 */
private String group = Scheduler.DEFAULT_GROUP;
/**
 * Indicates whether or not the Bean Method should be invoked by more than
 * one Scheduler at the specified time (like when deployed to a cluster,
 * and/or when there are multiple Spring ApplicationContexts in a single
 * JVM<i> - Tomcat 5.5 creates 2 or more instances of the DispatcherServlet
 * (a pool), which in turn creates a separate Spring ApplicationContext for
 * each instance of the servlet</i>) <p> Used by
 * <code>afterPropertiesSet</code> to set the JobDetail.jobClass to
 * BeanInvokingJob.class or StatefulBeanInvokingJob.class when true or
 * false, respectively. Default: true
 * 
 * @see #afterPropertiesSet()
 */
private boolean concurrent = true;
/**
 * Used to set the JobDetail.durable property. Default: false <p>Durability
 * - if a job is non-durable, it is automatically deleted from the scheduler
 * once there are no longer any active triggers associated with it.
 * 
 * @see <a
 *      href="http://www.opensymphony.com/quartz/wikidocs/TutorialLesson3.html">http://www.opensymphony.com/quartz/wikidocs/TutorialLesson3.html</a>
 * @see #afterPropertiesSet()
 */
private boolean durable = false;
/**
 * Used by <code>afterPropertiesSet</code> to set the JobDetail.volatile
 * property. Default: false <p>Volatility - if a job is volatile, it is not
 * persisted between re-starts of the Quartz scheduler. <p>I set the default
 * to false to be the same as the default for a Quartz Trigger. An exception
 * is thrown when the Trigger is non-volatile and the Job is volatile. If
 * you want volatility, then you must set this property, and the Trigger's
 * volatility property, to true.
 * 
 * @see <a
 *      href="http://www.opensymphony.com/quartz/wikidocs/TutorialLesson3.html">http://www.opensymphony.com/quartz/wikidocs/TutorialLesson3.html</a>
 * @see #afterPropertiesSet()
 */
private boolean volatility = false;
/**
 * Used by <code>afterPropertiesSet</code> to set the
 * JobDetail.requestsRecovery property. Default: false<BR>
 * <p>RequestsRecovery - if a job "requests recovery", and it is executing
 * during the time of a 'hard shutdown' of the scheduler (i.e. the process
 * it is running within crashes, or the machine is shut off), then it is
 * re-executed when the scheduler is started again. In this case, the
 * JobExecutionContext.isRecovering() method will return true.
 * 
 * @see <a
 *      href="http://www.opensymphony.com/quartz/wikidocs/TutorialLesson3.html">http://www.opensymphony.com/quartz/wikidocs/TutorialLesson3.html</a>
 * @see #afterPropertiesSet()
 */
private boolean shouldRecover = false;
/**
 * A list of names of JobListeners to associate with the JobDetail object
 * created by this FactoryBean.
 * 
 * @see #afterPropertiesSet()
 **/
private String[] jobListenerNames;
/**
 * The name assigned to this bean in the Spring ApplicationContext. Used by
 * <code>afterPropertiesSet</code> to set the JobDetail.name property.
 * 
 * @see afterPropertiesSet()
 * @see JobDetail#setName(String)
 **/
private String beanName;
/**
 * The JobDetail produced by the <code>afterPropertiesSet</code> method, and
 * returned by the <code>getObject</code> method of the Spring FactoryBean
 * interface.
 * 
 * @see #afterPropertiesSet()
 * @see #getObject()
 * @see FactoryBean
 **/
private JobDetail jobDetail;
/**
 * The name or id of the bean to invoke, as it is declared in the Spring
 * ApplicationContext.
 **/
private String targetBean;
/**
 * The method to invoke on the bean identified by the targetBean property.
 **/
private String targetMethod;
/**
 * The arguments to provide to the method identified by the targetMethod
 * property. These Objects must be Serializable when concurrent=="true".
 */
private Object[] arguments;
/**
 * Get the targetBean property.
 * 
 * @see #targetBean
 * @return targetBean
 */
public String getTargetBean() {
return targetBean;
}
/**
 * Set the targetBean property.
 * 
 * @see #targetBean
 */
public void setTargetBean(String targetBean) {
this.targetBean = targetBean;
}
/**
 * Get the targetMethod property.
 * 
 * @see #targetMethod
 * @return targetMethod
 */
public String getTargetMethod() {
return targetMethod;
}
/**
 * Set the targetMethod property.
 * 
 * @see #targetMethod
 */
public void setTargetMethod(String targetMethod) {
this.targetMethod = targetMethod;
}
/**
 * @return jobDetail - The JobDetail that is created by the
 *         afterPropertiesSet method of this FactoryBean
 * @see #jobDetail
 * @see #afterPropertiesSet()
 * @see FactoryBean#getObject()
 */
public Object getObject() throws Exception {
return jobDetail;
}
/**
 * @return JobDetail.class
 * @see FactoryBean#getObjectType()
 */
public Class getObjectType() {
return JobDetail.class;
}
/**
 * @return true
 * @see FactoryBean#isSingleton()
 */
public boolean isSingleton() {
return true;
}
/**
 * Set the beanName property.
 * 
 * @see #beanName
 * @see BeanNameAware#setBeanName(String)
 */
public void setBeanName(String beanName) {
this.beanName = beanName;
}
/**
 * Invoked by the Spring container after all properties have been set. <p>
 * Sets the <code>jobDetail</code> property to a new instance of JobDetail
 * <ul> <li>jobDetail.name is set to <code>beanName</code><br>
 * <li>jobDetail.group is set to <code>group</code><br>
 * <li>jobDetail.jobClass is set to BeanInvokingJob.class or
 * StatefulBeanInvokingJob.class depending on whether the
 * <code>concurrent</code> property is set to true or false,
 * respectively.<br> <li>jobDetail.durability is set to <code>durable</code>
 * <li>jobDetail.volatility is set to <code>volatility</code>
 * <li>jobDetail.requestsRecovery is set to <code>shouldRecover</code>
 * <li>jobDetail.jobDataMap["targetBean"] is set to <code>targetBean</code>
 * <li>jobDetail.jobDataMap["targetMethod"] is set to
 * <code>targetMethod</code> <li>jobDetail.jobDataMap["arguments"] is set to
 * <code>arguments</code> <li>Each JobListener name in
 * <code>jobListenerNames</code> is added to the <code>jobDetail</code>
 * object. </ul> <p> Logging occurs at the DEBUG and INFO levels; 4 lines at
 * the DEBUG level, and 1 line at the INFO level. <ul> <li>DEBUG: start
 * <li>DEBUG: Creating JobDetail <code>{beanName}</code> <li>DEBUG:
 * Registering JobListener names with JobDetail object
 * <code>{beanName}</code> <li>INFO: Created JobDetail:
 * <code>{jobDetail}</code>; targetBean: <code>{targetBean}</code>;
 * targetMethod: <code>{targetMethod}</code>; arguments:
 * <code>{arguments}</code>; <li>DEBUG: end </ul>
 * 
 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
 * @see JobDetail
 * @see #jobDetail
 * @see #beanName
 * @see #group
 * @see BeanInvokingJob
 * @see StatefulBeanInvokingJob
 * @see #durable
 * @see #volatility
 * @see #shouldRecover
 * @see #targetBean
 * @see #targetMethod
 * @see #arguments
 * @see #jobListenerNames
 */
public void afterPropertiesSet() throws Exception {
try {
logger.debug("start");
logger.debug("Creating JobDetail " + beanName);
jobDetail = new JobDetail();
jobDetail.setName(beanName);
jobDetail.setGroup(group);
jobDetail.setJobClass(concurrent ? BeanInvokingJob.class : StatefulBeanInvokingJob.class);
jobDetail.setDurability(durable);
jobDetail.setVolatility(volatility);
jobDetail.setRequestsRecovery(shouldRecover);
jobDetail.getJobDataMap().put("targetBean", targetBean);
jobDetail.getJobDataMap().put("targetMethod", targetMethod);
if(arguments != null){
jobDetail.getJobDataMap().put("arguments", arguments);
}
logger.debug("Registering JobListener names with JobDetail object " + beanName);
if(this.jobListenerNames != null) {
for(int i = 0; i < this.jobListenerNames.length; i++) {
this.jobDetail.addJobListener(this.jobListenerNames[i]);
}
}
logger.info("Created JobDetail: " + jobDetail + "; targetBean: " + targetBean + "; targetMethod: "
+ targetMethod + "; arguments: " + arguments + ";");
}finally {
logger.debug("end");
}
}
/**
 * Setter for the concurrent property.
 * 
 * @param concurrent
 * @see #concurrent
 */
public void setConcurrent(boolean concurrent) {
this.concurrent = concurrent;
}
/**
 * setter for the durable property.
 * 
 * @param durable
 * 
 * @see #durable
 */
public void setDurable(boolean durable) {
this.durable = durable;
}
/**
 * setter for the group property.
 * 
 * @param group
 * 
 * @see #group
 */
public void setGroup(String group) {
this.group = group;
}
/**
 * setter for the {@link #jobListenerNames} property.
 * 
 * @param jobListenerNames
 * @see #jobListenerNames
 */
public void setJobListenerNames(String[] jobListenerNames) {
this.jobListenerNames = jobListenerNames;
}
/**
 * setter for the {@link #shouldRecover} property.
 * 
 * @param shouldRecover
 * @see #shouldRecover
 */
public void setShouldRecover(boolean shouldRecover) {
this.shouldRecover = shouldRecover;
}
/**
 * setter for the {@link #volatility} property.
 * 
 * @param volatility
 * @see #volatility
 */
public void setVolatility(boolean volatility) {
this.volatility = volatility;
}
/**
 * Set the Spring ApplicationContext in which this class has been deployed.
 * <p> Invoked by Spring as a result of the ApplicationContextAware
 * interface implemented by this Class.
 * 
 * @see ApplicationContextAware#setApplicationContext(ApplicationContext)
 */
public void setApplicationContext(ApplicationContext context) throws BeansException {
applicationContext = context;
}
public void setArguments(Object[] arguments) {
this.arguments = arguments;
}
/**
 * This is a cluster safe Job designed to invoke a method on any bean
 * defined within the same Spring ApplicationContext. <p> The only entries
 * this Job expects in the JobDataMap are "targetBean" and
 * "targetMethod".<br> - It uses the value of the <code>targetBean</code>
 * entry to get the desired bean from the Spring ApplicationContext.<br> -
 * It uses the value of the <code>targetMethod</code> entry to determine
 * which method of the Bean (identified by targetBean) to invoke. <p> It
 * uses the static ApplicationContext in the
 * BeanInvokingJobDetailFactoryBean, which is ApplicationContextAware, to
 * get the Bean with which to invoke the method. <p> All Exceptions thrown
 * from the execute method are caught and wrapped in a
 * JobExecutionException.
 * 
 * @see BeanInvokingJobDetailFactoryBean#applicationContext
 * @see #execute(JobExecutionContext)
 * 
 * @author Stephen M. Wick
 */
public static class BeanInvokingJob implements Job {
protected Log logger = LogFactory.getLog(getClass());
/**
 * When invoked by a Quartz scheduler, <code>execute</code> invokes a
 * method on a bean deployed within the scheduler's Spring
 * ApplicationContext. <p> <b>Implementation</b><br> The bean is
 * identified by the "targetBean" entry in the JobDataMap of the
 * JobExecutionContext provided.<br> The method is identified by the
 * "targetMethod" entry in the JobDataMap of the JobExecutionContext
 * provided.<br> <p> The Quartz scheduler shouldn't start up correctly
 * if the bean identified by "targetBean" cannot be found in the
 * scheduler's Spring ApplicationContext. BeanFactory.getBean() throws
 * an exception if the targetBean doesn't exist, so I'm not going to
 * waste any code testing for the bean's existance in the
 * ApplicationContext. <p> Logging is provided at the DEBUG and INFO
 * levels; 5 lines at the DEBUG level, and 1 line at the INFO level.
 * 
 * @see Job#execute(JobExecutionContext)
 */
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
logger.debug("start");
String targetBean = context.getMergedJobDataMap().getString("targetBean");
logger.debug("targetBean is " + targetBean);
if(targetBean == null)
throw new JobExecutionException("targetBean cannot be null.", false);
String targetMethod = context.getMergedJobDataMap().getString("targetMethod");
logger.debug("targetMethod is " + targetMethod);
if(targetMethod == null)
throw new JobExecutionException("targetMethod cannot be null.", false);
// when org.quartz.jobStore.useProperties=="true" the arguments
// entry (which should be an Object[]) in the JobDataMap gets
// converted into a String.
Object argumentsObject = context.getMergedJobDataMap().get("arguments");
Object[] arguments = (argumentsObject instanceof String) ? null : (Object[])argumentsObject;
logger.debug("arguments array is " + arguments);
Object bean = applicationContext.getBean(targetBean);
logger.debug("applicationContext resolved bean name/id '" + targetBean + "' to " + bean);
MethodInvoker beanMethod = new MethodInvoker();
beanMethod.setTargetObject(bean);
beanMethod.setTargetMethod(targetMethod);
beanMethod.setArguments(arguments);
beanMethod.prepare();
logger.info("Invoking Bean: " + targetBean + "; Method: " + targetMethod + "; arguments: " + arguments
+ ";");
beanMethod.invoke();
}catch (JobExecutionException e) {
e.printStackTrace();
throw e;
}catch (Exception e) {
e.printStackTrace();
throw new JobExecutionException(e);
}finally {
logger.debug("end");
}
}
}
public static class StatefulBeanInvokingJob extends BeanInvokingJob implements StatefulJob {
// No additional functionality; just needs to implement StatefulJob.
}
public static void main(String[] args) {
Test test = new Test();
MethodInvoker beanMethod = new MethodInvoker();
beanMethod.setTargetObject(test);
beanMethod.setTargetMethod("sayHello");
beanMethod.setArguments(new Object[]{"dddd"});
try {
beanMethod.prepare();
beanMethod.invoke();
}catch (ClassNotFoundException e) {
e.printStackTrace();
}catch (NoSuchMethodException e) {
e.printStackTrace();
}catch (InvocationTargetException e) {
e.printStackTrace();
}catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}

Spring配置如下:

    <bean id="xxxxxDailyJob" class="com.x.x.x.x.ClusterMethodInvokingJobDetailFactoryBean">
        <property name="targetBean" value="xxxDailyJobBean"/>
        <property name="targetMethod" value="execute"/>
        <property name="concurrent" value="false"/>
        <!-- <property name="arguments">
           <list>
              <value>arg1</value>
              <value>arg2</value>
           </list>
        </property>  -->
        <property name="group">
            <value>${org.quartz.group.name}</value>
        </property>
    </bean>


关键字:   quartz     集群  
评论信息
暂无评论
发表评论
验证码: 
Powered by IMZHANGJIE.CN Copyright © 2015-2025 粤ICP备14056181号