多线程和同步队列
最近要割接个项目,要把另外一个公司的数据库里的一张表倒到我们库里,数据有一亿三千多万吧。正号也符合生产者和消费者的状况。以前用过点线程池和同步队列,,写个例子,让大家拍砖。不多说了,直接上代码,
1. 线程池
?
?
public class ThreadPool {
private ExecutorService exe = null;// 线程池
?
private int pool_size;
private?Service?service?;
public ThreadPool(int pool_size,Service??service???{
this.pool_size = pool_size;
this.service??=service??;
exe = Executors.newFixedThreadPool(pool_size);// 创建线程池
System.out.println("the server is ready");
}
?
/**
*?
* 运行循环实例线程,根据要实例的线程个数,传入条件ID
*?
* @param worknum
*/
public void server() {
int i = 0;
while (i < pool_size) {
// 实例指定个线程
InsertData t = new InsertData(service??);
exe.execute(t);// 放入线程池
i++;
}
}
}
2.线程public class InsertData extends Thread {private?Service??service?;public InsertData(Service??service??){this.service??=service??;}@Overridepublic void run() {SyncDataQueue syncDataQueue=SyncDataQueue.getInstance();while (true) {List<Mode?> list=new ArrayList<Mode>();//移除队列里的所有实体,并批量添加syncDataQueue.getBlockingQueue().drainTo(list);if (list!=null) {service.batchSvae(list);}}}}3.实现同步队列public class SyncDataQueue {private static SyncDataQueue instance = null;private BlockingQueue<Mode?> blockingQueue = null;
public SyncDataQueue() {?//队列容量blockingQueue = new ArrayBlockingQueue<Mode?>(40000);}
public static SyncDataQueue getInstance() {if (instance == null) {instance = new SyncDataQueue();}return instance;}
public BlockingQueue<Mode?> getBlockingQueue() {return blockingQueue;}}4.导入文件,并启动线程池public class FileThread extends Thread {
private String path;public FileThread(String path) {this.path = path;}
@Overridepublic void run() {if (StringUtils.isNotBlank(path)) {try {getSmsListByFileThread(new File(path));} catch (FileNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}public void getSmsListByFileThread(File file)throws FileNotFoundException, IOException, InterruptedException {//初始化同步队列SyncDataQueue syncDataQueue = SyncDataQueue.getInstance();BlockingQueue<Mode?> blockingQueue = syncDataQueue.getBlockingQueue();// 缓冲大小为10Mfinal int BUFFER_SIZE = 10 * 1024 * 1024;BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));// 用10M的缓冲读取文本文件BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"), BUFFER_SIZE);String line = "";int i=0;while ((line = reader.readLine()) != null) {String[] data = line.split(",");Mode mode=new Mode();//填充实体//放进队列 让 线程去抢队列的里实体blockingQueue.put(mode);}}
public String getPath() {return path;}
public void setPath(String path) {this.path = path;}/** * ?不加索引每分钟百万上下, 加了索引有点惨不忍睹。 */public static void main(String[] args) {Service service=(Service)Syscontext.getBean("service?");//初始化线程池,并启动ThreadPool threadPool = new ThreadPool(30,smsHouseSmsService);threadPool.server();//每个文件都是1G的new FileThread("C:/Users/bjz/Desktop/mcexport/Msg_20120322104001_1_1.txt").start();........new FileThread("C:/Users/bjz/Desktop/mcexport/Msg_20120322104001_1_15.txt").start();}}
?
1 楼 netkiller.github.com 2012-03-30 :-) 我也刚刚做过类似的东西,我是造数据,python 实现比java 省事就是开几个进程,在哪里守候,每个进程可以开启N个线程。
剩下就是往Queue里面赛东西,那些守护进程从queue里面拿任务。。
有兴趣看看
http://netkiller-github-com.iteye.com/blog/1453700
2 楼 jiangzhoubai 2012-03-30 netkiller.github.com 写道:-) 我也刚刚做过类似的东西,我是造数据,python 实现比java 省事
就是开几个进程,在哪里守候,每个进程可以开启N个线程。
剩下就是往Queue里面赛东西,那些守护进程从queue里面拿任务。。
有兴趣看看
http://netkiller-github-com.iteye.com/blog/1453700
好的
3 楼 gamehiboy 2012-04-09 Service类是啥, 4 楼 jiangzhoubai 2012-04-09 gamehiboy 写道Service类是啥,
插入数据的service 5 楼 gamehiboy 2012-04-10 求Service类的代码,