多线程测试对比infobright取数
#! /usr/bin/env python#coding=utf-8import threading,sysimport randomimport timefrom Queue import Queueimport MySQLdb,time,datetimeisFinish=Falsecount_num_sql='''select count(1) t from fact_user_msg '''#con_sql='''where ( d_o_lastordertime>='2012-06-19' and s_usr_level>60 and i_u_verified=1 ) '''con_sql='''where ( i_u_sex=1 and s_o_usermob_type=2 and i_o_ordesnum_3m=4 and i_u_verified=1 ) '''#con_sq3='''where ( i_u_sex=1 and s_o_usermob_type=2 and i_o_ordesnum_3m=4 and i_u_verified=1 and f_o_total_spend_3m<200 and f_o_kdj_3m>300 and i_o_sendNumber_3m<10) '''fetch_data_sql='''select s_o_usermob from fact_user_msg '''pageSize=50000condition = threading.Condition()class Producer(threading.Thread): def __init__(self, threadname, queue): threading.Thread.__init__(self, name = threadname) self.sharedata = queue def run(self): #建立和数据库系统的连接 conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3307) #conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3306) #获取操作游标 cursor_src = conn_src.cursor() #cursor_src.execute("set autocommit = 1") startTime=datetime.datetime.now() print "----start time:"+startTime.strftime("%a, %d %b %Y %H:%M:%S +0000") cursor_src.execute(count_num_sql+con_sql) result = cursor_src.fetchone() endTime=datetime.datetime.now() print "----end time:"+endTime.strftime("%a, %d %b %Y %H:%M:%S +0000") print "count cost time(s):"+str((endTime-startTime).seconds) total=result[0] print "data total lines:"+str(total) maxNum=500000 if(total>maxNum): total=maxNum for i in range(0,total,pageSize): #print self.getName()+'add to queue'+str(i) self.sharedata.put(i) #print self.getName()+'add end to queue'+str(i) #time.sleep(random.randrange(10)/10.0) #time.sleep(8) for j in range(1,5): #print 'add None to queue'+str(j) self.sharedata.put(None) #print '======== NEW ===========' isFinish=True conn_src.close(); if condition.acquire(): condition.notify() condition.release() print self.getName()+'Finished' # Consumer threadclass Consumer(threading.Thread): def __init__(self, threadname, queue): threading.Thread.__init__(self, name = threadname) self.sharedata = queue def run(self): conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3307,charset='utf8') #conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3306,charset='utf8') #获取操作游标 cursor_src = conn_src.cursor() while True: try: item=self.sharedata.get() if item==None: #print self.getName()+"is empty2" self.sharedata.task_done() break #print self.getName()+'got a value:'+str(item) print "exec fetcch start"+str(item)+datetime.datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0000") cursor_src.execute(fetch_data_sql+con_sql+" limit "+str(item)+","+str(pageSize)) cursor_src.fetchone() endTime1=datetime.datetime.now() print "fetcch----end time:"+endTime1.strftime("%a, %d %b %Y %H:%M:%S +0000") self.sharedata.task_done() except Queue.empty: print sys.exc_info()[:2] break except : print "over" break #time.sleep(random.randrange(10)/10.0) print self.getName()+'Finished' #self.sharedata.task_done() conn_src.close(); return# Main threaddef main(): queue = Queue() producer = Producer('Producer', queue) consumer1 = Consumer('Consumer1', queue) consumer2 = Consumer('Consumer2', queue) consumer3 = Consumer('Consumer3', queue) consumer4 = Consumer('Consumer4', queue) print 'Starting threads ...' producer.start() startTime1=datetime.datetime.now() print "----start1 time:"+startTime1.strftime("%a, %d %b %Y %H:%M:%S +0000") consumer1.start() consumer2.start() consumer3.start() consumer4.start() if condition.acquire(): condition.wait() queue.join() endTime1=datetime.datetime.now() print "----end1 time:"+endTime1.strftime("%a, %d %b %Y %H:%M:%S +0000") print "fetcch data cost time(s):"+str((endTime1-startTime1).seconds) print 'All threads have terminated.'if __name__ == '__main__': main()