读书人

这样的情况如何使用多线程来处理

发布时间: 2013-07-08 14:13:00 作者: rapoo

这样的情况怎么使用多线程来处理?
本帖最后由 matrix1984 于 2013-06-17 18:37:44 编辑 需求:表A里有很多数据,程序需要从表A里取数据,并对每一条数据进行逻辑处理。
如果是单线程,可能是这样的逻辑:

...
...
long start = System.currentTimeMillis();
while(true) {
DataBean[] beans = queryData(5000); //每次取5000条
if(beans.length > 0) {
for(DataBean b:beans) { //业务处理
processBean(b); //业务处理
} //业务处理
} else break;

if(beans.length < 5000) break; //没有更多的数据
}
long end = System.currentTimeMillis();
System.out.println("使用时间: " + (end - start) + "ms"); //打印当所有任务都完成时所用时间
...
...

public DataBean[] queryData(int size) {
//查询数据并返回
}

public void processBean(DataBean b) throws Exception {
// 处理数据对象b,处理完毕会更新状态(不会再次被处理)
}

由于数据量很大,就想采用多线程/线程池来处理,每次取5000条放到一个线程,最多5个线程,当5个线程都起来了,就等待,任务可以加入队列中,知道有空闲的线程出来。
并且有个要求,只要有线程或者队列有任务,主线程不能结束,直至所有任务都跑完,并打印最终使用的总时间。

上网有看到使用ThreadPoolExecutor来做,但具体到这个例子,该怎么使用呢?请有经验的大大指导一下。


[解决办法]
5个线程取数据,那么数据的处理呢?也使用5个线程吗???
[解决办法]
ThreadPool不是重点,重点是要保证异步线程取bean时,如何并发、互斥的获取数据。

我们可以把beans放到队列或堆栈中,任意线程来了,只取第一个bean(这个“取”操作要synchronized),然后run方法只做单一bean的处理即可。

最后,我们初始化的任意个(LZ这里要求5个就5个吧,其实ThreadPool更适合不定个数个异步线程的处理)线程,使用ThreadPool实例对象(单例)的excute(Runnable r)方法加入到线程池即可。
[解决办法]
楼主你好,看到你这个需求,我想我们是不是可以换个思路。
既然那么多数据都要做处理,不如,我们把这些数据用后台,比如shell程序去处理,将数据后的结果集添加到新的数据库(表)中。这样我们只需要在新的数据库(表)中去读就可以了。
楼主你觉得呢。
[解决办法]

引用:

Quote: 引用:


ThreadPool不是重点,重点是要保证异步线程取bean时,如何并发、互斥的获取数据。

我们可以把beans放到队列或堆栈中,任意线程来了,只取第一个bean(这个“取”操作要synchronized),然后run方法只做单一bean的处理即可。

最后,我们初始化的任意个(LZ这里要求5个就5个吧,其实ThreadPool更适合不定个数个异步线程的处理)线程,使用ThreadPool实例对象(单例)的excute(Runnable r)方法加入到线程池即可。


1. 关于获取bean的互斥,我是那么做的,一个线程获取5000条数据后,将其状态更新为“processing”表示正在被处理,这样下一个thread就不会再取这部分数据;
2. 你的建议是一个线程只去处理一个bean,这样跟我一个线程处理5000个,效率方面能解释下为什么吗?

另外,我现在的处理方法类似如下:



ThreadPoolExecutor executor = new ThreadPoolExecutor(PROCESS_THREADS_COUNT, // core size
PROCESS_THREADS_COUNT_MAX, // max size
10 * 60, // idle timeout
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(PROCESS_THREADS_COUNT_MAX),
new ThreadPoolExecutor.CallerRunsPolicy());

...
...
long start = System.currentTimeMillis();
while(true) {
DataBean[] beans = queryData(5000); //每次取5000条
if(beans.length > 0) {

updateDataStatusToProcessing(beans); //状态更新为正在处理
ProcessTask subtask = new ProcessTask(result); //具体业务处理Thread
executor.execute(subtask);

} else break;

if(beans.length < 5000) break; //没有更多的数据
}

executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
long end = System.currentTimeMillis();
System.out.println("使用时间: " + (end - start) + "ms"); //打印当所有任务都完成时所用时间
...
...


第一点,如果数据量很大,可以这样用数据库的标志位(这样要注意一点:在你查询出来,再更新的时候,有没有并发的一个查询操作。建议对查询结果的主键做个排序,查询的时候直接first 5000这样查),如果表不大(字段不多)或数据量不大,建议一次性全部查出来,放到beansQueue中(要保证内存足够),再起线程处理。

第二点,一个线程处理5000条,个人感觉这是个串行的。一个线程只处理一个,从queue里取出来首节点,新启一个线程放到ThreadPool中处理,单个处理完毕线程自动结束,这样效率会高。

你的单个处理5000条的方法没问题,建议更新状态的操作,在每个bean处理成功后更新,失败的bean不更新,下次查询还可以查出来再次处理。
[解决办法]
m个线程从表中读取,n个线程进行业务处理

m和n之间比例值的确定,我觉得应该由单线程读取一条数据和单线程处理一条数据业务的时间比例决定



比如一个线程读取所有数据如果快过n个线程处理所有数据业务,那根本无需多线程读取,因为即使多线程读取快过单线程读取,还是会受制于n个线程业务处理时间的瓶颈。

n的确定可以单独对数据处理做多线程测试来确定。

读书人网 >J2EE开发

热点推荐