读书人

多线程跟同步队列

发布时间: 2012-09-16 17:33:17 作者: rapoo

多线程和同步队列

最近要割接个项目,要把另外一个公司的数据库里的一张表倒到我们库里,数据有一亿三千多万吧。正号也符合生产者和消费者的状况。以前用过点线程池和同步队列,,写个例子,让大家拍砖。不多说了,直接上代码,

1. 线程池

?

?

public class ThreadPool {

private ExecutorService exe = null;// 线程池

?

private int pool_size;

private?Service?service?;

public ThreadPool(int pool_size,Service??service???{

this.pool_size = pool_size;

this.service??=service??;

exe = Executors.newFixedThreadPool(pool_size);// 创建线程池

System.out.println("the server is ready");

}

?

/**

*?

* 运行循环实例线程,根据要实例的线程个数,传入条件ID

*?

* @param worknum

*/

public void server() {

int i = 0;

while (i < pool_size) {

// 实例指定个线程

InsertData t = new InsertData(service??);

exe.execute(t);// 放入线程池

i++;

}

}

}

2.线程public class InsertData extends Thread {private?Service??service?;public InsertData(Service??service??){this.service??=service??;}@Overridepublic void run() {SyncDataQueue syncDataQueue=SyncDataQueue.getInstance();while (true) {List<Mode?> list=new ArrayList<Mode>();//移除队列里的所有实体,并批量添加syncDataQueue.getBlockingQueue().drainTo(list);if (list!=null) {service.batchSvae(list);}}}}3.实现同步队列public class SyncDataQueue {
private static SyncDataQueue instance = null;private BlockingQueue<Mode?> blockingQueue = null;
public SyncDataQueue() {?//队列容量blockingQueue = new ArrayBlockingQueue<Mode?>(40000);}
public static SyncDataQueue getInstance() {if (instance == null) {instance = new SyncDataQueue();}return instance;}
public BlockingQueue<Mode?> getBlockingQueue() {return blockingQueue;}}4.导入文件,并启动线程池public class FileThread extends Thread {
private String path;public FileThread(String path) {this.path = path;}
@Overridepublic void run() {if (StringUtils.isNotBlank(path)) {try {getSmsListByFileThread(new File(path));} catch (FileNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}public void getSmsListByFileThread(File file)throws FileNotFoundException, IOException, InterruptedException {//初始化同步队列SyncDataQueue syncDataQueue = SyncDataQueue.getInstance();BlockingQueue<Mode?> blockingQueue = syncDataQueue.getBlockingQueue();// 缓冲大小为10Mfinal int BUFFER_SIZE = 10 * 1024 * 1024;BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));// 用10M的缓冲读取文本文件BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"), BUFFER_SIZE);String line = "";int i=0;while ((line = reader.readLine()) != null) {String[] data = line.split(",");Mode mode=new Mode();//填充实体//放进队列 让 线程去抢队列的里实体blockingQueue.put(mode);}}
public String getPath() {return path;}
public void setPath(String path) {this.path = path;}/** * ?不加索引每分钟百万上下, 加了索引有点惨不忍睹。 */public static void main(String[] args) {Service service=(Service)Syscontext.getBean("service?");//初始化线程池,并启动ThreadPool threadPool = new ThreadPool(30,smsHouseSmsService);threadPool.server();//每个文件都是1G的new FileThread("C:/Users/bjz/Desktop/mcexport/Msg_20120322104001_1_1.txt").start();........new FileThread("C:/Users/bjz/Desktop/mcexport/Msg_20120322104001_1_15.txt").start();}}

?

1 楼 netkiller.github.com 2012-03-30 :-) 我也刚刚做过类似的东西,我是造数据,python 实现比java 省事

就是开几个进程,在哪里守候,每个进程可以开启N个线程。

剩下就是往Queue里面赛东西,那些守护进程从queue里面拿任务。。

有兴趣看看
http://netkiller-github-com.iteye.com/blog/1453700

2 楼 jiangzhoubai 2012-03-30 netkiller.github.com 写道:-) 我也刚刚做过类似的东西,我是造数据,python 实现比java 省事

就是开几个进程,在哪里守候,每个进程可以开启N个线程。

剩下就是往Queue里面赛东西,那些守护进程从queue里面拿任务。。

有兴趣看看
http://netkiller-github-com.iteye.com/blog/1453700


好的
3 楼 gamehiboy 2012-04-09 Service类是啥, 4 楼 jiangzhoubai 2012-04-09 gamehiboy 写道Service类是啥,
插入数据的service 5 楼 gamehiboy 2012-04-10 求Service类的代码,

读书人网 >编程

热点推荐