任务调度的简单实现
做了个简单的任务调度实例,主要思想功能如下:
模拟master和slave2台机器,master机器有一个任务队列,通过启动一个线程定时轮询改任务队列,如果其中有任务,则调用slave的接口将任务分配给slave机器处理。同时,slave机器配置了心跳检测,即slave启动一个线程,每隔一段时间调用master的接口修改master的全局变量lasterAliveTime的值为当前时间,同时,master也有一个线程用来定时计数当前时间与这个lasterAliveTime的时间差,如果超过2秒则认为slave机器死亡。
实现代码如下:
Master包下为master机器运行的类,Slave包下为slave机器运行的类。
模拟2个服务的方法如下;
1.模拟master服务:
在pom.xml里使用jetty插件,并设定端口为8081,如下:
?
<plugins>
<!-- jetty插件 -->
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<version>6.1.22</version>
<configuration>
<contextPath>/</contextPath>
<connectors>
<connector implementation="org.mortbay.jetty.nio.SelectChannelConnector">
<port>8081</port>
<maxIdleTime>60000</maxIdleTime>
</connector>
</connectors>
<requestLog implementation="org.mortbay.jetty.NCSARequestLog">
<filename>target/access.log</filename>
<retainDays>90</retainDays>
<append>false</append>
<extended>false</extended>
<logTimeZone>GMT+8:00</logTimeZone>
</requestLog>
<systemProperties>
<systemProperty>
<name>productionMode</name>
<value>false</value>
</systemProperty>
</systemProperties>
</configuration>
</plugin>
<plugins>
?
本实例通过实现ServletContextListener接口实现服务启动后自动启动相关类功能,所以配置web.xml如下:
?
<?xml version="1.0" encoding="GB2312"?>
<web-app id="WebApp_ID" version="2.4"
xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">
?
<servlet>
<servlet-name>test</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
?
<servlet-mapping>
<servlet-name>test</servlet-name>
<url-pattern>/service/*</url-pattern>
</servlet-mapping>
?
?
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
? ?classpath*:/spring/*.xml</param-value>
</context-param>
?
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
?
<!--随着服务启动自动运行-->
<listener>
? ? ? ? ? ? ? ?<listener-class>com.TaskScheduling.Master.AutoRun</listener-class>
? ? ? ? </listener>
?
</web-app>
运行mvn jetty:run命令启动master服务。2.模拟slave机器除了web.xml文件外其他代码无需改动,web.xml需修改的配置如下:
? ? ? ? <listener>
? ? ? ? ? ? ? ?<listener-class>com.TaskScheduling.Slave.SlaveAutoRun</listener-class>
? ? ? ? </listener>
?
运行mvn clean install ,将生成的war包放到jetty的%jetty_home%/webapps路径下,jetty默认的端口为8080,启动jetty,在%jetty_home%下执行命令:java -jar start.jar。
?
?
相关代码如下:
web.xml:
?
<?xml version="1.0" encoding="GB2312"?>
<web-app id="WebApp_ID" version="2.4"
xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">
?
<servlet>
<servlet-name>test</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
?
<servlet-mapping>
<servlet-name>test</servlet-name>
<url-pattern>/service/*</url-pattern>
</servlet-mapping>
?
?
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
? ?classpath*:/spring/*.xml</param-value>
</context-param>
?
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
?
? ? ? ? <welcome-file-list>
<welcome-file>/jsp/login.jsp</welcome-file>
</welcome-file-list>
<listener>
? ? <listener-class>com.TaskScheduling.Master.AutoRun</listener-class>
<!-- ? ?<listener-class>com.TaskScheduling.Slave.SlaveAutoRun</listener-class>-->
? ? </listener>
?
?
?
</web-app>
?
在%classpath%/spring/autorun.xml:
?
?
<?xml version="1.0" encoding="utf-8"?>
?
<beans default-autowire="byName"
xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans?
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd?
http://www.springframework.org/schema/aop?
http://www.springframework.org/schema/aop/spring-aop-2.0.xsd?
http://www.springframework.org/schema/util?
http://www.springframework.org/schema/util/spring-util-2.5.xsd ">
?
?
<bean id="jobService" />
<bean id="slaveHeartBeat" />
<bean id="heartBeat" />
?
? ? ? ? <bean id="testService" />
?
<bean name="/HeatBeat"
ref="slaveHeartBeat" />
<property name="serviceInterface" value="com.TaskScheduling.Master.SlaveHeartBeat" />
</bean>
?
<bean name="/JobProcess"
ref="jobProcess" />
<property name="serviceInterface" value="com.TaskScheduling.Slave.JobProcess" />
</bean>
?
<bean id="jobProcess" />
?
</beans>
?
master相关类:
com.TaskScheduling.Master.SlaveHeartBeat:
?
package com.TaskScheduling.Master;
?
public interface SlaveHeartBeat {
public String check() ;
}
com.TaskScheduling.Master.impl.SlaveHeartBeatImpl:package com.TaskScheduling.Master.impl;
import java.util.Date;
import com.TaskScheduling.Master.MasterGlobalResource;import com.TaskScheduling.Master.SlaveHeartBeat;
public class SlaveHeartBeatImpl implements SlaveHeartBeat {/** * slave定期调用master的该接口修改master的全局变量MasterGlobalResource的Lastalivetime值 */public String check() {MasterGlobalResource.setLastalivetime(new Date());System.out.println(MasterGlobalResource.getLastalivetime());return "OK";}}
com.TaskScheduling.Master.AutoRun:package com.TaskScheduling.Master;
import java.io.File;import java.io.FileWriter;import java.io.IOException;
import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import javax.servlet.http.HttpServlet;
import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;
@SuppressWarnings("serial")public class AutoRun extends HttpServlet implements ServletContextListener {/**?* 服务器停止前自动执行?*/@Overridepublic void contextDestroyed(ServletContextEvent arg0) { ?DeleteFile();}/** * 服务器启动后自动运行 */@Overridepublic void contextInitialized(ServletContextEvent arg0) { WriteFile();/*也可以通过?ApplicationContext?app = WebApplicationContextUtils.getWebApplicationContext(arg0.getServletContext()); 获取spring上下文,但不可以通过让AutoRun 类实现ApplicationContextAware接口来获取上下文,应为autorun类实现了ServletContextListener,所以在spring启动前启动,所以得到的上下文将为null*/? ? ? ? ? ? ? ? ?ApplicationContext app = new ClassPathXmlApplicationContext("classpath*:spring/*.xml"); //任务队列轮询 MasterGlobalResource.getExecutor().execute((JobService)app.getBean("jobService")); //检测slave机器是否存活 MasterGlobalResource.getExecutor().execute((HeartBeatCheck)app.getBean("heartBeatCheck"));} ?public void WriteFile() { ? ? ? ?try { ? ? ? ?System.out.println("write"); ? ? ? ? ? ?FileWriter fw = new FileWriter("c:/WriteData.txt"); ? ? ? ? ? ?// 将字符串写入文件 ? ? ? ? ? ?fw.write("Hello World!"); ? ? ? ? ? ?fw.write("Hello Everyone!"); ? ? ? ? ? ?fw.close(); ? ? ? ?} catch (IOException e) { ? ? ? ?} ? ?}
? ?public void DeleteFile() { ? ? ? ?File f = new File("c:/WriteData.txt"); ? ? ? ?// 检查文件是否存在,如果存在,直接删除文件 ? ? ? ?if (f.exists()) { ? ? ? ? ? ?f.delete(); ? ? ? ?} ? ?}
}
com.TaskScheduling.Master.HeartBeatCheck:package com.TaskScheduling.Master;
import java.util.Date;
public class HeartBeatCheck implements Runnable{@Overridepublic void run() {while(true){Date lastAliveTime = MasterGlobalResource.getLastalivetime();Date nowTime = new Date();System.out.println("nowTime="+nowTime.getTime()+" ? lastAliveTime="+lastAliveTime.getTime());//如果超过2秒slave机器没有调用master机器的接口修改Lastalivetime值则认为slave死亡if(nowTime.getTime()-lastAliveTime.getTime()>2000)System.out.println("slave has no feedback in 2 second...");try {//每1秒钟监测一次Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
}
com.TaskScheduling.Master.JobService:package com.TaskScheduling.Master;
import com.TaskScheduling.Slave.JobProcess;import com.caucho.hessian.client.HessianProxyFactory;/**?* 任务调度?*?*/public class JobService implements Runnable {private HessianProxyFactory factory = new HessianProxyFactory();@Overridepublic void run() {try {//添加10个任务到任务队列Thread.sleep(5000);for(int i=0;i<10;i++){ZephyrJobWrapper job=new ZephyrJobWrapper();job.setJobId(String.valueOf(i));job.setJobName(String.valueOf(i));job.setJobBeanName("testService");//任务bean名job.setJobMethodName("getJobInfo");//任务方法名job.setJobArgs(new Object[]{"hello","xj"+String.valueOf(i)});//方法的参数MasterGlobalResource.setWaitingJobQueue(job);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}//slave处理任务的接口JobProcess jobProcess=(JobProcess)factory.create(JobProcess.class, "http://localhost:8080/rmi.test/service/JobProcess");//轮询任务队列while(true){System.out.println("size="+MasterGlobalResource.getWaitingJobQueue().size());for(ZephyrJobWrapper job:MasterGlobalResource.getWaitingJobQueue()){//如果任务队列有任务则通过hessian接口交给slave机器执行任务jobProcess.process(job);} ? ?//轮询间隔为一秒钟Thread.sleep(1000);}} catch (Exception e) {System.out.println(e);System.out.println("job process failed");}}}
com.TaskScheduling.Master.MasterGlobalResource:package com.TaskScheduling.Master;
import java.util.Date;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;
/**?* master机器的全局变量?*?*/public ?class MasterGlobalResource {/** * slave最后心跳时间 */private ?static Date lastalivetime=new Date();/** * 线程池 */private static ThreadPoolExecutor ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?executor ? ? ? ? ? ? ? ? = new ThreadPoolExecutor(? ? ? ? ? ? ?10,? ? ? ? ? ? ?10,? ? ? ? ? ? ?0L,? ? ? ? ? ? ?TimeUnit.MILLISECONDS,? ? ? ? ? ? ?new LinkedBlockingQueue<Runnable>(),? ? ? ? ? ? ?new ThreadPoolExecutor.DiscardPolicy()); /** ?* 任务队列 ?*/ private static LinkedBlockingQueue<ZephyrJobWrapper> ? ? ? ? ? ? waitingJobQueue ? ? ? ? ?= new LinkedBlockingQueue<ZephyrJobWrapper>();?? public static ThreadPoolExecutor getExecutor() { ? ? ? ?return executor; ? ?}? public static LinkedBlockingQueue<ZephyrJobWrapper> getWaitingJobQueue() { return waitingJobQueue; }? public static void setWaitingJobQueue(ZephyrJobWrapper job) { waitingJobQueue.add(job); }
public static Date getLastalivetime() {return lastalivetime;}
public static void setLastalivetime(Date lastalivetime) {MasterGlobalResource.lastalivetime = lastalivetime;}
}
com.TaskScheduling.Master.SlaveHeartBeat:package com.TaskScheduling.Master;
public interface SlaveHeartBeat {public String check() ;}
com.TaskScheduling.Master.ZephyrJobWrapper:package com.TaskScheduling.Master;
import java.io.Serializable;/**?* job定义类?*?*/public class ZephyrJobWrapper implements Serializable{private static final long serialVersionUID = 6453296403466572073L;public String jobId;public String jobName;public String getJobId() {return jobId;}public void setJobId(String jobId) {this.jobId = jobId;}public String getJobName() {return jobName;}public void setJobName(String jobName) {this.jobName = jobName;}}
slave相关类:com.TaskScheduling.Slave.JobProcess:package com.TaskScheduling.Slave;
import com.TaskScheduling.Master.ZephyrJobWrapper;
public interface JobProcess {public String process(ZephyrJobWrapper job);
}
com.TaskScheduling.Slave.impl.JobProcessImpl:package com.TaskScheduling.Slave.impl;
import java.lang.reflect.Method;
import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;
import com.TaskScheduling.Master.ZephyrJobWrapper;import com.TaskScheduling.Slave.JobProcess;/**?* 处理master分配给slave的任务?*?*/public class JobProcessImpl implements JobProcess,ApplicationContextAware{private ApplicationContext ?applicationContext;
@Overridepublic String process(ZephyrJobWrapper job) {String result="success";Object jobBean = applicationContext.getBean(job.getJobBeanName());? ? ? ? Class<?> clazz = jobBean.getClass();? ? ? ? Method jobMethod = null;? ? ? ? Object[] args = null;? ? ? ? for (Method method : clazz.getMethods()) {? ? ? ? ? ? Class<?>[] paramTypes = method.getParameterTypes();
? ? ? ? ? ? if (method.getName().equals(job.getJobMethodName())) {? ? ? ? ? ? ? ? Object jobArgs = job.getJobArgs();
? ? ? ? ? ? ? ? /** 传入Object[]参数时,检查方法名和参数个数 **/? ? ? ? ? ? ? ? if (jobArgs != null && jobArgs.getClass().getSimpleName().equals("Object[]")) {? ? ? ? ? ? ? ? ? ? args = (Object[]) jobArgs;
? ? ? ? ? ? ? ? ? ? if (args.length == paramTypes.length) {? ? ? ? ? ? ? ? ? ? ? ? jobMethod = method;? ? ? ? ? ? ? ? ? ? ? ? break;? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? /** 不传参或者传入其它类型参数时,只检查方法名 **/? ? ? ? ? ? ? ? else {? ? ? ? ? ? ? ? ? ? jobMethod = method;? ? ? ? ? ? ? ? ? ? break;? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? ? ? try {if (jobMethod != null) { ? ?if (args == null) { ? ? ? ?jobMethod.invoke(jobBean, job.getJobArgs()); ? ?} else { ? ? ? ?jobMethod.invoke(jobBean, args); ? ?}} else {result="failed";}} catch (Exception e) {result="failed";}?System.out.println("complete process job "+job.getJobName()+"successful!");return result;}
@Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {this.applicationContext=applicationContext;}
}
com.TaskScheduling.Slave.HeartBeat:package com.TaskScheduling.Slave;
import com.TaskScheduling.Master.SlaveHeartBeat;import com.caucho.hessian.client.HessianProxyFactory;/**?* 每1秒调用master接口修改最后更新时间,用来心跳监测?*?*/public class HeartBeat implements Runnable {private HessianProxyFactory factory ? ? ? ? ? ? ? ?= new HessianProxyFactory();
@Overridepublic void run() {while(true){try {SlaveHeartBeat heatBeat=(SlaveHeartBeat)factory.create(SlaveHeartBeat.class, "http://localhost:8081/service/HeatBeat");String result=heatBeat.check();System.out.println(result);Thread.sleep(1000);} catch (Exception e) {System.out.println("slave cannot connect to master...check master run well");}}}}
com.TaskScheduling.Slave.SlaveAutoRun:package com.TaskScheduling.Slave;
import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import javax.servlet.http.HttpServlet;
import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;
@SuppressWarnings("serial")public class SlaveAutoRun extends HttpServlet implements ServletContextListener {public SlaveGlobalResource slaveGlobalResource;
@Overridepublic void contextDestroyed(ServletContextEvent arg0) {
}
@SuppressWarnings("static-access")@Overridepublic void contextInitialized(ServletContextEvent arg0) {ApplicationContext app=new ClassPathXmlApplicationContext("classpath*:spring/*.xml");//心跳检测slaveGlobalResource.getExecutor().execute((HeartBeat)app.getBean("heartBeat"));}
}
com.TaskScheduling.Slave.SlaveGlobalResource:package com.TaskScheduling.Slave;
import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/**?* slave的全局变量?*?*/public class SlaveGlobalResource {public static ThreadPoolExecutor executor=new ThreadPoolExecutor( 10,? ? ? ? ? ? 10,? ? ? ? ? ? 0L,? ? ? ? ? ? TimeUnit.MILLISECONDS,? ? ? ? ? ? new LinkedBlockingQueue<Runnable>(),? ? ? ? ? ? new ThreadPoolExecutor.DiscardPolicy());?public static ThreadPoolExecutor getExecutor(){return executor;}
}
com.TaskScheduling.Slave.service.TestService:package com.TaskScheduling.Slave.service;/**?* 需要slave处理的任务类?*?*/public class TestService {public void getJobInfo(String str1,String str2){System.out.println(str1+"---------"+str2);return ;}
}