Java线程池遇到的问题,实在搞不定了,求助?
下面的代码,是我写的简单统计宿舍局域网内,可以ping通的ip地址:
- Java code
package cn.zhou.ip;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.LineNumberReader;import java.net.InetAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class IpAddressGet { //private final ThreadPoolExecutor threadPool; public static final int THREAD_NUM=30; private final ExecutorService threadPool;//final成员变量在构造函数时,初始化 private final ConcurrentHashMap<String,Boolean> ping; // ping 后的结果集 public ConcurrentHashMap<String,Boolean> getPing() { // 用来得到ping后的结果集 return ping; } public IpAddressGet() { ping = new ConcurrentHashMap<String,Boolean>(); threadPool=Executors.newFixedThreadPool(THREAD_NUM); /*threadPool = new ThreadPoolExecutor(10, 30, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.DiscardOldestPolicy());*/ } //private int threadCount=0; public void Ping(String ip) throws Exception { /*while(threadCount>THREAD_NUM){ Thread.sleep(50); } threadCount++;*/ PingIp p=new PingIp(ip); //p.start(); threadPool.execute(p); } public void PingAll() throws Exception { // 首先得到本机的IP,得到网段 InetAddress host = InetAddress.getLocalHost(); String hostAddress = host.getHostAddress(); System.out.println(hostAddress); int k = 0; k = hostAddress.lastIndexOf("."); String ss = hostAddress.substring(0, k + 1); for (int i = 100; i <= 120; i++) { // 对所有局域网Ip String iip = ss + i; Ping(iip); } System.out.println(ping.size());//0 for(Map.Entry<String, Boolean> kv : getPing().entrySet()){ if(kv.getValue()==true){ System.out.println(kv.getKey()); } } threadPool.shutdown(); //等着所有Ping结束 /*while (threadCount > 0) Thread.sleep(50); */ } public static void main(String[] args) throws Exception{ IpAddressGet ipGet=new IpAddressGet(); ipGet.PingAll(); for(Map.Entry<String, Boolean> kv : ipGet.getPing().entrySet()){ if(kv.getValue()==true){ System.out.println(kv.getKey()); } } System.out.println(ipGet.getPing().size());//0 } class PingIp extends Thread { public String ip; // IP public PingIp(String ip) { this.ip = ip; } public synchronized void run() { Process p=null; try { p = Runtime.getRuntime() .exec("ping " + ip + " -w 300 -n 1"); } catch (IOException e) { System.err.println("ping exception!"); e.printStackTrace(); } // 读取结果行 if (isSuccess(p.getInputStream())) { ping.put(ip, true); } else { ping.put(ip, false); } //执行完毕 //threadCount--; } private boolean isSuccess(InputStream ips) { if(ips==null){ return false; } InputStreamReader ir = new InputStreamReader(ips); LineNumberReader input = new LineNumberReader(ir); // 读取结果行 try { for (int i = 1; i <= 2; i++) { input.readLine(); } String line = input.readLine(); if (line != null && !line.equals("timed out.") && !line.equals("请求超时。") && line.length()>=30) { return true; } } catch (IOException e) { return false; } finally { try { input.close(); ir.close(); ips.close(); } catch (IOException ex) { throw new RuntimeException("input stream close error!"); } } return false; } }}
为什么最后,哈希表中什么都没有,输出size=0?如果不用Java提供的线程池,如注释的代码,自己开约30个线程,得到结果是正确的。但是使用线程池之后出现类上述问题?实在不解。
[解决办法]
java个synchronized 让main 线程wait在一个静态的或者单实例情况的实例对象上,
然后对这个Integer进行处理,每做一次for循环线程处理完毕时对这个Integer做+1动作
当最后一个线程处理完时,Integer==某个值时,对睡在Integer上的所有线程进行notifyAll
比如:
- Java code
package com.ssj.test;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.LineNumberReader;import java.net.InetAddress;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class IpAddressGet { //private final ThreadPoolExecutor threadPool; public int count = 0 ; public Object lock = new Object(); public int pinSize = 20; public static final int THREAD_NUM=30; private final ExecutorService threadPool;//final成员变量在构造函数时,初始化 private final ConcurrentHashMap<String,Boolean> ping; // ping 后的结果集 public ConcurrentHashMap<String,Boolean> getPing() { // 用来得到ping后的结果集 return ping; } public IpAddressGet() { ping = new ConcurrentHashMap<String,Boolean>(); threadPool=Executors.newFixedThreadPool(THREAD_NUM); /*threadPool = new ThreadPoolExecutor(10, 30, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.DiscardOldestPolicy());*/ } //private int threadCount=0; public void Ping(String ip) throws Exception { /*while(threadCount>THREAD_NUM){ Thread.sleep(50); } threadCount++;*/ PingIp p=new PingIp(ip); //p.start(); threadPool.execute(p); } public void PingAll() throws Exception { // 首先得到本机的IP,得到网段 InetAddress host = InetAddress.getLocalHost(); String hostAddress = host.getHostAddress(); System.out.println(hostAddress); int k = 0; k = hostAddress.lastIndexOf("."); String ss = hostAddress.substring(0, k + 1); for (int i = 100; i <= 100+pinSize; i++) { // 对所有局域网Ip String iip = ss + i; Ping(iip); } System.out.println(ping.size());//0 for(Map.Entry<String, Boolean> kv : getPing().entrySet()){ if(kv.getValue()==true){ System.out.println(kv.getKey()); } } threadPool.shutdown(); //等着所有Ping结束 /*while (threadCount > 0) Thread.sleep(50); */ } public static void main(String[] args) throws Exception{ IpAddressGet ipGet=new IpAddressGet(); ipGet.PingAll(); for(Map.Entry<String, Boolean> kv : ipGet.getPing().entrySet()){ if(kv.getValue()==true){ System.out.println(kv.getKey()); } } synchronized(ipGet.lock) { ipGet.lock.wait(); } System.out.println(ipGet.getPing().size());//0 } class PingIp extends Thread { public String ip; // IP public PingIp(String ip) { this.ip = ip; } public synchronized void run() { Process p=null; try { p = Runtime.getRuntime() .exec("ping " + ip + " -w 300 -n 1"); } catch (IOException e) { System.err.println("ping exception!"); e.printStackTrace(); } // 读取结果行 if (isSuccess(p.getInputStream())) { ping.put(ip, true); } else { ping.put(ip, false); } synchronized(lock){ count++; if(count==pinSize) lock.notifyAll(); } //执行完毕 //threadCount--; } private boolean isSuccess(InputStream ips) { if(ips==null){ return false; } InputStreamReader ir = new InputStreamReader(ips); LineNumberReader input = new LineNumberReader(ir); // 读取结果行 try { for (int i = 1; i <= 2; i++) { input.readLine(); } String line = input.readLine(); if (line != null && !line.equals("timed out.") && !line.equals("请求超时。") && line.length()>=30) { return true; } } catch (IOException e) { return false; } finally { try { input.close(); ir.close(); ips.close(); } catch (IOException ex) { throw new RuntimeException("input stream close error!"); } } return false; } }}
[解决办法]
还有,顺便说下
class PingIp extends Thread
class PingIp implements Runnable
自己想清楚应该用哪个