代码实现生产者消费者
生产者线程
?
public class ReadThread implements Runnable
{
??? BlockingQueue<String> blocking;
??? ReadThread(BlockingQueue<String> blocking)
??? {
??????? this.blocking = blocking;
??? }
??? @Override
??? public void run()
??? {
??????? read();
??? }
??? public void read()
??? {
??????? try
??????? {
??????????? BufferedReader br = new BufferedReader(new InputStreamReader(
??????????????????? new FileInputStream(new File("D://1.txt"))));
??????????? String line = "";
??????????? while ((line = br.readLine()) != null)
??????????? {
??????????????? System.out.println("向队列中添加:"+line);
??????????????? while (!blocking.offer(line))
??????????????? {
??????????????????? System.out.println("队列已满 进行等待");
??????????????????? Thread.sleep(100);
??????????????? }
??????????? }
??????????? ParseFileThread.idDone = true;
??????? }
??????? catch (Exception e)
??????? {
??????????? e.printStackTrace();
??????? }
??? }
}
?
消费者线程
public class ParseThread implements Runnable
{
??? BlockingQueue<String> blocking;
???
??? ParseThread(BlockingQueue<String> blocking)
??? {
??????? this.blocking = blocking;
??? }
???
??? @Override
??? public void run()
??? {
??????? Parse();
??? }
???
??? public void Parse()
??? {
??????? ExecutorService pool = Executors.newFixedThreadPool(3);
??????? try
??????? {
??????????? for (int i = 0; i < 3; i++)
??????????? {
??????????????? pool.execute(new Runnable()
??????????????? {
??????????????????? @Override
??????????????????? public void run()
??????????????????? {
??????????????????????? while (true)
??????????????????????? {
??????????????????????????? String line = blocking.poll();
??????????????????????????? if(ParseFileThread.idDone && line==null)
??????????????????????????? {
??????????????????????????????? //已经处理完
??????????????????????????????? System.out.println("任务已经出来完~ 进行退出");
??????????????????????????????? return;
??????????????????????????? }
??????????????????????????? else if(!ParseFileThread.idDone && line==null)
??????????????????????????? {
??????????????????????????????? System.out.println("队列已空!进行等待");
??????????????????????????????? try
??????????????????????????????? {
??????????????????????????????????? Thread.sleep(100);
??????????????????????????????? }
??????????????????????????????? catch (InterruptedException e)
??????????????????????????????? {
??????????????????????????????????? e.printStackTrace();
??????????????????????????????? }
??????????????????????????? }else if(!ParseFileThread.idDone && line!=null)
??????????????????????????? {
?????????????????????????????? System.out.println("从队列中获取:"+line);
??????????????????????????? }
??????????????????????? }
???????????????????????
??????????????????? }
??????????????? });
??????????? }
???????????
??????? }
??????? catch (Exception e)
??????? {
??????????? e.printStackTrace();
??????? }
??????? pool.shutdown();
??? }
???
}
?
?
主函数
?
public class ParseFileThread
{
??? private static BlockingQueue<String> blocking = new LinkedBlockingQueue<String>(3);
???
??? public static Boolean idDone = false;
???
??? public static void main(String[] args)
??? {
??????? ExecutorService executor = Executors.newFixedThreadPool(2);
???????
??????? executor.execute(new ReadThread(blocking));
??????? executor.execute(new ParseThread(blocking));
???????
??????? executor.shutdown();
??? }
}