读书人

HttpCore组件案例程序(Java描述) (Htt

发布时间: 2012-09-08 10:48:07 作者: rapoo

HttpCore组件案例程序(Java描述) (Http Components-- HttpCore Examples)

一、基本概念 HttpCore是一套实现了HTTP协议最基础方面的组件,尽管HTTP协议在使用最小占用来开发全功能的客户端和服务器的HTTP服务是足够的。
HttpCore有如下的范围和目标(@Author:南磊):1. HttpCore范围
 ■构建客户端/代理/服务器端HTTP服务一致的API
 ■构建同步和异步HTTP服务一致的API
 ■基于阻塞(经典的)和非阻塞(NIO)I/O模型的一套低等级组件2. HttpCore目标
 ■实现最基本的HTTP传输方面
 ■良好性能和清晰度&表现力之间的平衡
 ■小的(预测)内存占用
 ■自我包含的类库(没有超越JRE的额外依赖)3. 什么是HttpCore不能做的
 ■HttpClient的替代
 ■Servlet容器或Servlet API竞争对手的替代

二、源代码详解1.基本的Http获取程序(基于同步或加锁的I/O模型)This example demonstrates how to execute a series of synchronous (blocking) HTTP GET requests.
/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation.  For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */package org.apache.http.examples;import java.net.Socket;import org.apache.http.ConnectionReuseStrategy;import org.apache.http.HttpHost;import org.apache.http.HttpRequestInterceptor;import org.apache.http.HttpResponse;import org.apache.http.HttpVersion;import org.apache.http.impl.DefaultConnectionReuseStrategy;import org.apache.http.impl.DefaultHttpClientConnection;import org.apache.http.message.BasicHttpRequest;import org.apache.http.params.HttpParams;import org.apache.http.params.HttpProtocolParams;import org.apache.http.params.SyncBasicHttpParams;import org.apache.http.protocol.HttpContext;import org.apache.http.protocol.BasicHttpContext;import org.apache.http.protocol.ExecutionContext;import org.apache.http.protocol.HttpProcessor;import org.apache.http.protocol.HttpRequestExecutor;import org.apache.http.protocol.ImmutableHttpProcessor;import org.apache.http.protocol.RequestConnControl;import org.apache.http.protocol.RequestContent;import org.apache.http.protocol.RequestExpectContinue;import org.apache.http.protocol.RequestTargetHost;import org.apache.http.protocol.RequestUserAgent;import org.apache.http.util.EntityUtils;/** * Elemental example for executing multiple GET requests sequentially. * <p> * Please note the purpose of this application is demonstrate the usage of HttpCore APIs. * It is NOT intended to demonstrate the most efficient way of building an HTTP client. */public class ElementalHttpGet {    public static void main(String[] args) throws Exception {        HttpParams params = new SyncBasicHttpParams();        HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);        HttpProtocolParams.setContentCharset(params, "UTF-8");        HttpProtocolParams.setUserAgent(params, "HttpComponents/1.1");        HttpProtocolParams.setUseExpectContinue(params, true);        HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {                // Required protocol interceptors                new RequestContent(),                new RequestTargetHost(),                // Recommended protocol interceptors                new RequestConnControl(),                new RequestUserAgent(),                new RequestExpectContinue()});        HttpRequestExecutor httpexecutor = new HttpRequestExecutor();        HttpContext context = new BasicHttpContext(null);        HttpHost host = new HttpHost("localhost", 8080);        DefaultHttpClientConnection conn = new DefaultHttpClientConnection();        ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy();        context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);        context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host);        try {            String[] targets = {                    "/",                    "/servlets-examples/servlet/RequestInfoExample",                    "/somewhere%20in%20pampa"};            for (int i = 0; i < targets.length; i++) {                if (!conn.isOpen()) {                    Socket socket = new Socket(host.getHostName(), host.getPort());                    conn.bind(socket, params);                }                BasicHttpRequest request = new BasicHttpRequest("GET", targets[i]);                System.out.println(">> Request URI: " + request.getRequestLine().getUri());                request.setParams(params);                httpexecutor.preProcess(request, httpproc, context);                HttpResponse response = httpexecutor.execute(request, conn, context);                response.setParams(params);                httpexecutor.postProcess(response, httpproc, context);                System.out.println("<< Response: " + response.getStatusLine());                System.out.println(EntityUtils.toString(response.getEntity()));                System.out.println("==============");                if (!connStrategy.keepAlive(response, context)) {                    conn.close();                } else {                    System.out.println("Connection kept alive...");                }            }        } finally {            conn.close();        }    }}

2.基本的Http发送程序(基于同步或加锁的I/O模型)This example demonstrates how to execute a series of synchronous (blocking) HTTP POST requests that enclose entity content of various types: a string, a byte array, an arbitrary input stream.
/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation.  For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */package org.apache.http.examples;import java.io.ByteArrayInputStream;import java.net.Socket;import org.apache.http.ConnectionReuseStrategy;import org.apache.http.HttpEntity;import org.apache.http.HttpHost;import org.apache.http.HttpRequestInterceptor;import org.apache.http.HttpResponse;import org.apache.http.HttpVersion;import org.apache.http.entity.ByteArrayEntity;import org.apache.http.entity.InputStreamEntity;import org.apache.http.entity.StringEntity;import org.apache.http.impl.DefaultConnectionReuseStrategy;import org.apache.http.impl.DefaultHttpClientConnection;import org.apache.http.message.BasicHttpEntityEnclosingRequest;import org.apache.http.params.HttpParams;import org.apache.http.params.HttpProtocolParams;import org.apache.http.params.SyncBasicHttpParams;import org.apache.http.protocol.HttpContext;import org.apache.http.protocol.BasicHttpContext;import org.apache.http.protocol.ExecutionContext;import org.apache.http.protocol.HttpProcessor;import org.apache.http.protocol.HttpRequestExecutor;import org.apache.http.protocol.ImmutableHttpProcessor;import org.apache.http.protocol.RequestConnControl;import org.apache.http.protocol.RequestContent;import org.apache.http.protocol.RequestExpectContinue;import org.apache.http.protocol.RequestTargetHost;import org.apache.http.protocol.RequestUserAgent;import org.apache.http.util.EntityUtils;/** * Elemental example for executing multiple POST requests sequentially. * <p> * Please note the purpose of this application is demonstrate the usage of HttpCore APIs. * It is NOT intended to demonstrate the most efficient way of building an HTTP client. */public class ElementalHttpPost {    public static void main(String[] args) throws Exception {        HttpParams params = new SyncBasicHttpParams();        HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);        HttpProtocolParams.setContentCharset(params, "UTF-8");        HttpProtocolParams.setUserAgent(params, "Test/1.1");        HttpProtocolParams.setUseExpectContinue(params, true);        HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {                // Required protocol interceptors                new RequestContent(),                new RequestTargetHost(),                // Recommended protocol interceptors                new RequestConnControl(),                new RequestUserAgent(),                new RequestExpectContinue()});        HttpRequestExecutor httpexecutor = new HttpRequestExecutor();        HttpContext context = new BasicHttpContext(null);        HttpHost host = new HttpHost("localhost", 8080);        DefaultHttpClientConnection conn = new DefaultHttpClientConnection();        ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy();        context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);        context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host);        try {            HttpEntity[] requestBodies = {                    new StringEntity(                            "This is the first test request", "UTF-8"),                    new ByteArrayEntity(                            "This is the second test request".getBytes("UTF-8")),                    new InputStreamEntity(                            new ByteArrayInputStream(                                    "This is the third test request (will be chunked)"                                    .getBytes("UTF-8")), -1)            };            for (int i = 0; i < requestBodies.length; i++) {                if (!conn.isOpen()) {                    Socket socket = new Socket(host.getHostName(), host.getPort());                    conn.bind(socket, params);                }                BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST",                        "/servlets-examples/servlet/RequestInfoExample");                request.setEntity(requestBodies[i]);                System.out.println(">> Request URI: " + request.getRequestLine().getUri());                request.setParams(params);                httpexecutor.preProcess(request, httpproc, context);                HttpResponse response = httpexecutor.execute(request, conn, context);                response.setParams(params);                httpexecutor.postProcess(response, httpproc, context);                System.out.println("<< Response: " + response.getStatusLine());                System.out.println(EntityUtils.toString(response.getEntity()));                System.out.println("==============");                if (!connStrategy.keepAlive(response, context)) {                    conn.close();                } else {                    System.out.println("Connection kept alive...");                }            }        } finally {            conn.close();        }    }}

3.基本的Http服务器程序(基于同步或加锁的I/O模型) This is an example of an HTTP/1.1 file server based on a synchronous (blocking) I/O model.
/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation.  For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */package org.apache.http.examples;import java.io.File;import java.io.IOException;import java.io.InterruptedIOException;import java.net.ServerSocket;import java.net.Socket;import java.net.URLDecoder;import java.nio.charset.Charset;import java.util.Locale;import org.apache.http.ConnectionClosedException;import org.apache.http.HttpEntity;import org.apache.http.HttpEntityEnclosingRequest;import org.apache.http.HttpException;import org.apache.http.HttpRequest;import org.apache.http.HttpResponse;import org.apache.http.HttpResponseInterceptor;import org.apache.http.HttpServerConnection;import org.apache.http.HttpStatus;import org.apache.http.MethodNotSupportedException;import org.apache.http.entity.ContentType;import org.apache.http.entity.FileEntity;import org.apache.http.entity.StringEntity;import org.apache.http.impl.DefaultConnectionReuseStrategy;import org.apache.http.impl.DefaultHttpResponseFactory;import org.apache.http.impl.DefaultHttpServerConnection;import org.apache.http.params.CoreConnectionPNames;import org.apache.http.params.HttpParams;import org.apache.http.params.CoreProtocolPNames;import org.apache.http.params.SyncBasicHttpParams;import org.apache.http.protocol.HttpContext;import org.apache.http.protocol.BasicHttpContext;import org.apache.http.protocol.HttpProcessor;import org.apache.http.protocol.HttpRequestHandler;import org.apache.http.protocol.HttpRequestHandlerRegistry;import org.apache.http.protocol.HttpService;import org.apache.http.protocol.ImmutableHttpProcessor;import org.apache.http.protocol.ResponseConnControl;import org.apache.http.protocol.ResponseContent;import org.apache.http.protocol.ResponseDate;import org.apache.http.protocol.ResponseServer;import org.apache.http.util.EntityUtils;/** * Basic, yet fully functional and spec compliant, HTTP/1.1 file server. * <p> * Please note the purpose of this application is demonstrate the usage of HttpCore APIs. * It is NOT intended to demonstrate the most efficient way of building an HTTP file server. * * */public class ElementalHttpServer {    public static void main(String[] args) throws Exception {        if (args.length < 1) {            System.err.println("Please specify document root directory");            System.exit(1);        }        Thread t = new RequestListenerThread(8080, args[0]);        t.setDaemon(false);        t.start();    }    static class HttpFileHandler implements HttpRequestHandler  {        private final String docRoot;        public HttpFileHandler(final String docRoot) {            super();            this.docRoot = docRoot;        }        public void handle(                final HttpRequest request,                final HttpResponse response,                final HttpContext context) throws HttpException, IOException {            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {                throw new MethodNotSupportedException(method + " method not supported");            }            String target = request.getRequestLine().getUri();            if (request instanceof HttpEntityEnclosingRequest) {                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();                byte[] entityContent = EntityUtils.toByteArray(entity);                System.out.println("Incoming entity content (bytes): " + entityContent.length);            }            final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));            if (!file.exists()) {                response.setStatusCode(HttpStatus.SC_NOT_FOUND);                StringEntity entity = new StringEntity(                        "<html><body><h1>File" + file.getPath() +                        " not found</h1></body></html>",                        ContentType.create("text/html", "UTF-8"));                response.setEntity(entity);                System.out.println("File " + file.getPath() + " not found");            } else if (!file.canRead() || file.isDirectory()) {                response.setStatusCode(HttpStatus.SC_FORBIDDEN);                StringEntity entity = new StringEntity(                        "<html><body><h1>Access denied</h1></body></html>",                        ContentType.create("text/html", "UTF-8"));                response.setEntity(entity);                System.out.println("Cannot read file " + file.getPath());            } else {                response.setStatusCode(HttpStatus.SC_OK);                FileEntity body = new FileEntity(file, ContentType.create("text/html", (Charset) null));                response.setEntity(body);                System.out.println("Serving file " + file.getPath());            }        }    }    static class RequestListenerThread extends Thread {        private final ServerSocket serversocket;        private final HttpParams params;        private final HttpService httpService;        public RequestListenerThread(int port, final String docroot) throws IOException {            this.serversocket = new ServerSocket(port);            this.params = new SyncBasicHttpParams();            this.params                .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)                .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)                .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)                .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)                .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpComponents/1.1");            // Set up the HTTP protocol processor            HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {                    new ResponseDate(),                    new ResponseServer(),                    new ResponseContent(),                    new ResponseConnControl()            });            // Set up request handlers            HttpRequestHandlerRegistry reqistry = new HttpRequestHandlerRegistry();            reqistry.register("*", new HttpFileHandler(docroot));            // Set up the HTTP service            this.httpService = new HttpService(                    httpproc,                    new DefaultConnectionReuseStrategy(),                    new DefaultHttpResponseFactory(),                    reqistry,                    this.params);        }        @Override        public void run() {            System.out.println("Listening on port " + this.serversocket.getLocalPort());            while (!Thread.interrupted()) {                try {                    // Set up HTTP connection                    Socket socket = this.serversocket.accept();                    DefaultHttpServerConnection conn = new DefaultHttpServerConnection();                    System.out.println("Incoming connection from " + socket.getInetAddress());                    conn.bind(socket, this.params);                    // Start worker thread                    Thread t = new WorkerThread(this.httpService, conn);                    t.setDaemon(true);                    t.start();                } catch (InterruptedIOException ex) {                    break;                } catch (IOException e) {                    System.err.println("I/O error initialising connection thread: "                            + e.getMessage());                    break;                }            }        }    }    static class WorkerThread extends Thread {        private final HttpService httpservice;        private final HttpServerConnection conn;        public WorkerThread(                final HttpService httpservice,                final HttpServerConnection conn) {            super();            this.httpservice = httpservice;            this.conn = conn;        }        @Override        public void run() {            System.out.println("New connection thread");            HttpContext context = new BasicHttpContext(null);            try {                while (!Thread.interrupted() && this.conn.isOpen()) {                    this.httpservice.handleRequest(this.conn, context);                }            } catch (ConnectionClosedException ex) {                System.err.println("Client closed connection");            } catch (IOException ex) {                System.err.println("I/O error: " + ex.getMessage());            } catch (HttpException ex) {                System.err.println("Unrecoverable HTTP protocol violation: " + ex.getMessage());            } finally {                try {                    this.conn.shutdown();                } catch (IOException ignore) {}            }        }    }}

4.异步的Http获取程序(基于非阻塞I/O模型:NIO框架) This example demonstrates how HttpCore NIO can be used to execute multiple HTTP requests asynchronously using only one I/O thread.
注释:I. I/O反应器

HttpCore NIO是基于由Doug Lea定义描述的反应器模式的。I/O反应器的目标是反应I/O事件然后分发事件通知到独立的I/O会话中。I/O反应器模式的主要思想是从每次连接模型摆脱一个由经典阻塞I/O模型强加的线程。IOReactor接口代表了反应器模式抽象的对象实现。在其内部,IOReactor实现封装了了NIO java.nio.channels.Selector的功能。

I/O反应器通常使用一小部分分发线程(通常是一个)来分发I/O事件通知到一个很大数量(通常是好几千)的I/O会话或连接。通常建议每个CPU核心只有一个分发线程。

II.I/O分发器

IOReactor实现使用了IOEventDispatch接口来通知事件客户端为特定的会话挂起。IOEventDispatch的所有方法在I/O反应器的分发线程上执行。因此,在事件方法上的处理不会阻塞分发线程很长时间,这一点是很重要的,而且I/O反应器也不可能对其它事件做出反应。

由IOEventDispatch接口定义的通用I/O事件:

connected:当一个新的会话创建时触发。inputReady:当新的会话等待输入时触发。outputReady:当会话准备输出时触发。timeout:当会话超时时触发。disconnected:当会话终止时触发。
/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation.  For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */package org.apache.http.examples.nio;import java.io.IOException;import java.io.InterruptedIOException;import java.util.concurrent.CountDownLatch;import org.apache.http.HttpHost;import org.apache.http.HttpRequestInterceptor;import org.apache.http.HttpResponse;import org.apache.http.concurrent.FutureCallback;import org.apache.http.impl.DefaultConnectionReuseStrategy;import org.apache.http.impl.nio.DefaultHttpClientIODispatch;import org.apache.http.impl.nio.pool.BasicNIOConnPool;import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;import org.apache.http.impl.nio.reactor.IOReactorConfig;import org.apache.http.message.BasicHttpRequest;import org.apache.http.nio.protocol.BasicAsyncRequestProducer;import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;import org.apache.http.nio.protocol.HttpAsyncRequester;import org.apache.http.nio.reactor.ConnectingIOReactor;import org.apache.http.nio.reactor.IOEventDispatch;import org.apache.http.params.CoreConnectionPNames;import org.apache.http.params.CoreProtocolPNames;import org.apache.http.params.HttpParams;import org.apache.http.params.SyncBasicHttpParams;import org.apache.http.protocol.BasicHttpContext;import org.apache.http.protocol.HttpProcessor;import org.apache.http.protocol.ImmutableHttpProcessor;import org.apache.http.protocol.RequestConnControl;import org.apache.http.protocol.RequestContent;import org.apache.http.protocol.RequestExpectContinue;import org.apache.http.protocol.RequestTargetHost;import org.apache.http.protocol.RequestUserAgent;/** * Asynchronous HTTP/1.1 client. */public class NHttpClient {    public static void main(String[] args) throws Exception {        // HTTP parameters for the client        HttpParams params = new SyncBasicHttpParams();        params            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)            .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)            .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)            .setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");        // Create HTTP protocol processing chain        HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {                // Use standard client-side protocol interceptors                new RequestContent(),                new RequestTargetHost(),                new RequestConnControl(),                new RequestUserAgent(),                new RequestExpectContinue()});        // Create client-side HTTP protocol handler        HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();        // Create client-side I/O event dispatch        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);        // Create client-side I/O reactor        IOReactorConfig config = new IOReactorConfig();        config.setIoThreadCount(1);        final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);        // Create HTTP connection pool        BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, params);        // Limit total number of connections to just two        pool.setDefaultMaxPerRoute(2);        pool.setMaxTotal(2);        // Run the I/O reactor in a separate thread        Thread t = new Thread(new Runnable() {            public void run() {                try {                    // Ready to go!                    ioReactor.execute(ioEventDispatch);                } catch (InterruptedIOException ex) {                    System.err.println("Interrupted");                } catch (IOException e) {                    System.err.println("I/O error: " + e.getMessage());                }                System.out.println("Shutdown");            }        });        // Start the client thread        t.start();        // Create HTTP requester        HttpAsyncRequester requester = new HttpAsyncRequester(                httpproc, new DefaultConnectionReuseStrategy(), params);        // Execute HTTP GETs to the following hosts and        HttpHost[] targets = new HttpHost[] {                new HttpHost("www.apache.org", 80, "http"),                new HttpHost("www.verisign.com", 443, "https"),                new HttpHost("www.google.com", 80, "http")        };        final CountDownLatch latch = new CountDownLatch(targets.length);        for (final HttpHost target: targets) {            BasicHttpRequest request = new BasicHttpRequest("GET", "/");            requester.execute(                    new BasicAsyncRequestProducer(target, request),                    new BasicAsyncResponseConsumer(),                    pool,                    new BasicHttpContext(),                    // Handle HTTP response from a callback                    new FutureCallback<HttpResponse>() {                public void completed(final HttpResponse response) {                    latch.countDown();                    System.out.println(target + "->" + response.getStatusLine());                }                public void failed(final Exception ex) {                    latch.countDown();                    System.out.println(target + "->" + ex);                }                public void cancelled() {                    latch.countDown();                    System.out.println(target + " cancelled");                }            });        }        latch.await();        System.out.println("Shutting down I/O reactor");        ioReactor.shutdown();        System.out.println("Done");    }}

5.异步的Http服务器程序(基于非阻塞I/O模型:NIO框架)

This example demonstrates the use of HttpCore NIO to build an asynchronous (non-blocking) HTTP server capable of direct channel (zero copy) data transfer.

/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation.  For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */package org.apache.http.examples.nio;import java.io.File;import java.io.IOException;import java.io.InterruptedIOException;import java.net.InetSocketAddress;import java.net.URL;import java.net.URLDecoder;import java.security.KeyStore;import java.util.Locale;import javax.net.ssl.KeyManager;import javax.net.ssl.KeyManagerFactory;import javax.net.ssl.SSLContext;import org.apache.http.HttpException;import org.apache.http.HttpRequest;import org.apache.http.HttpResponse;import org.apache.http.HttpResponseInterceptor;import org.apache.http.HttpStatus;import org.apache.http.MethodNotSupportedException;import org.apache.http.entity.ContentType;import org.apache.http.impl.DefaultConnectionReuseStrategy;import org.apache.http.impl.nio.DefaultNHttpServerConnection;import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;import org.apache.http.impl.nio.DefaultHttpServerIODispatch;import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;import org.apache.http.nio.NHttpConnection;import org.apache.http.nio.NHttpConnectionFactory;import org.apache.http.nio.NHttpServerConnection;import org.apache.http.nio.entity.NFileEntity;import org.apache.http.nio.entity.NStringEntity;import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;import org.apache.http.nio.protocol.BasicAsyncResponseProducer;import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;import org.apache.http.nio.protocol.HttpAsyncRequestHandler;import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;import org.apache.http.nio.protocol.HttpAsyncExchange;import org.apache.http.nio.protocol.HttpAsyncService;import org.apache.http.nio.reactor.IOEventDispatch;import org.apache.http.nio.reactor.ListeningIOReactor;import org.apache.http.params.CoreConnectionPNames;import org.apache.http.params.CoreProtocolPNames;import org.apache.http.params.HttpParams;import org.apache.http.params.SyncBasicHttpParams;import org.apache.http.protocol.ExecutionContext;import org.apache.http.protocol.HttpContext;import org.apache.http.protocol.HttpProcessor;import org.apache.http.protocol.ImmutableHttpProcessor;import org.apache.http.protocol.ResponseConnControl;import org.apache.http.protocol.ResponseContent;import org.apache.http.protocol.ResponseDate;import org.apache.http.protocol.ResponseServer;/** * HTTP/1.1 file server based on the non-blocking I/O model and capable of direct channel * (zero copy) data transfer. */public class NHttpServer {    public static void main(String[] args) throws Exception {        if (args.length < 1) {            System.err.println("Please specify document root directory");            System.exit(1);        }        // Document root directory        File docRoot = new File(args[0]);        int port = 8080;        if (args.length >= 2) {            port = Integer.parseInt(args[1]);        }        // HTTP parameters for the server        HttpParams params = new SyncBasicHttpParams();        params            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)            .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)            .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");        // Create HTTP protocol processing chain        HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {                // Use standard server-side protocol interceptors                new ResponseDate(),                new ResponseServer(),                new ResponseContent(),                new ResponseConnControl()        });        // Create request handler registry        HttpAsyncRequestHandlerRegistry reqistry = new HttpAsyncRequestHandlerRegistry();        // Register the default handler for all URIs        reqistry.register("*", new HttpFileHandler(docRoot));        // Create server-side HTTP protocol handler        HttpAsyncService protocolHandler = new HttpAsyncService(                httpproc, new DefaultConnectionReuseStrategy(), reqistry, params) {            @Override            public void connected(final NHttpServerConnection conn) {                System.out.println(conn + ": connection open");                super.connected(conn);            }            @Override            public void closed(final NHttpServerConnection conn) {                System.out.println(conn + ": connection closed");                super.closed(conn);            }        };        // Create HTTP connection factory        NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory;        if (port == 8443) {            // Initialize SSL context            ClassLoader cl = NHttpServer.class.getClassLoader();            URL url = cl.getResource("my.keystore");            if (url == null) {                System.out.println("Keystore not found");                System.exit(1);            }            KeyStore keystore  = KeyStore.getInstance("jks");            keystore.load(url.openStream(), "secret".toCharArray());            KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(                    KeyManagerFactory.getDefaultAlgorithm());            kmfactory.init(keystore, "secret".toCharArray());            KeyManager[] keymanagers = kmfactory.getKeyManagers();            SSLContext sslcontext = SSLContext.getInstance("TLS");            sslcontext.init(keymanagers, null, null);            connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);        } else {            connFactory = new DefaultNHttpServerConnectionFactory(params);        }        // Create server-side I/O event dispatch        IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);        // Create server-side I/O reactor        ListeningIOReactor ioReactor = new DefaultListeningIOReactor();        try {            // Listen of the given port            ioReactor.listen(new InetSocketAddress(port));            // Ready to go!            ioReactor.execute(ioEventDispatch);        } catch (InterruptedIOException ex) {            System.err.println("Interrupted");        } catch (IOException e) {            System.err.println("I/O error: " + e.getMessage());        }        System.out.println("Shutdown");    }    static class HttpFileHandler implements HttpAsyncRequestHandler<HttpRequest> {        private final File docRoot;        public HttpFileHandler(final File docRoot) {            super();            this.docRoot = docRoot;        }        public HttpAsyncRequestConsumer<HttpRequest> processRequest(                final HttpRequest request,                final HttpContext context) {            // Buffer request content in memory for simplicity            return new BasicAsyncRequestConsumer();        }        public void handle(                final HttpRequest request,                final HttpAsyncExchange httpexchange,                final HttpContext context) throws HttpException, IOException {            HttpResponse response = httpexchange.getResponse();            handleInternal(request, response, context);            httpexchange.submitResponse(new BasicAsyncResponseProducer(response));        }        private void handleInternal(                final HttpRequest request,                final HttpResponse response,                final HttpContext context) throws HttpException, IOException {            String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);            if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {                throw new MethodNotSupportedException(method + " method not supported");            }            String target = request.getRequestLine().getUri();            final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));            if (!file.exists()) {                response.setStatusCode(HttpStatus.SC_NOT_FOUND);                NStringEntity entity = new NStringEntity(                        "<html><body><h1>File" + file.getPath() +                        " not found</h1></body></html>",                        ContentType.create("text/html", "UTF-8"));                response.setEntity(entity);                System.out.println("File " + file.getPath() + " not found");            } else if (!file.canRead() || file.isDirectory()) {                response.setStatusCode(HttpStatus.SC_FORBIDDEN);                NStringEntity entity = new NStringEntity(                        "<html><body><h1>Access denied</h1></body></html>",                        ContentType.create("text/html", "UTF-8"));                response.setEntity(entity);                System.out.println("Cannot read file " + file.getPath());            } else {                NHttpConnection conn = (NHttpConnection) context.getAttribute(                        ExecutionContext.HTTP_CONNECTION);                response.setStatusCode(HttpStatus.SC_OK);                NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));                response.setEntity(body);                System.out.println(conn + ": serving file " + file.getPath());            }        }    }}


6.异步的Http逆向代理程序(基于非阻塞I/O模型:NIO框架)

This example demonstrates how HttpCore NIO can be used to build an asynchronous, fully streaming reverse HTTP proxy.

/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation.  For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */package org.apache.http.examples.nio;import java.io.IOException;import java.io.InterruptedIOException;import java.net.InetSocketAddress;import java.net.URI;import java.nio.ByteBuffer;import java.util.Locale;import java.util.concurrent.atomic.AtomicLong;import org.apache.http.ConnectionReuseStrategy;import org.apache.http.HttpEntityEnclosingRequest;import org.apache.http.HttpException;import org.apache.http.HttpHost;import org.apache.http.HttpRequest;import org.apache.http.HttpRequestInterceptor;import org.apache.http.HttpResponse;import org.apache.http.HttpResponseInterceptor;import org.apache.http.HttpStatus;import org.apache.http.HttpVersion;import org.apache.http.entity.ContentType;import org.apache.http.impl.DefaultConnectionReuseStrategy;import org.apache.http.impl.EnglishReasonPhraseCatalog;import org.apache.http.impl.nio.DefaultHttpClientIODispatch;import org.apache.http.impl.nio.DefaultHttpServerIODispatch;import org.apache.http.impl.nio.pool.BasicNIOConnPool;import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;import org.apache.http.impl.nio.reactor.IOReactorConfig;import org.apache.http.message.BasicHttpEntityEnclosingRequest;import org.apache.http.message.BasicHttpRequest;import org.apache.http.message.BasicHttpResponse;import org.apache.http.nio.ContentDecoder;import org.apache.http.nio.ContentEncoder;import org.apache.http.nio.IOControl;import org.apache.http.nio.NHttpClientConnection;import org.apache.http.nio.NHttpConnection;import org.apache.http.nio.NHttpServerConnection;import org.apache.http.nio.entity.NStringEntity;import org.apache.http.nio.pool.NIOConnFactory;import org.apache.http.nio.protocol.BasicAsyncResponseProducer;import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;import org.apache.http.nio.protocol.HttpAsyncRequester;import org.apache.http.nio.protocol.HttpAsyncRequestHandler;import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;import org.apache.http.nio.protocol.HttpAsyncRequestHandlerResolver;import org.apache.http.nio.protocol.HttpAsyncRequestProducer;import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;import org.apache.http.nio.protocol.HttpAsyncResponseProducer;import org.apache.http.nio.protocol.HttpAsyncExchange;import org.apache.http.nio.protocol.HttpAsyncService;import org.apache.http.nio.reactor.ConnectingIOReactor;import org.apache.http.nio.reactor.IOEventDispatch;import org.apache.http.nio.reactor.ListeningIOReactor;import org.apache.http.params.CoreConnectionPNames;import org.apache.http.params.CoreProtocolPNames;import org.apache.http.params.HttpParams;import org.apache.http.params.SyncBasicHttpParams;import org.apache.http.pool.PoolStats;import org.apache.http.protocol.ExecutionContext;import org.apache.http.protocol.HttpContext;import org.apache.http.protocol.HttpProcessor;import org.apache.http.protocol.ImmutableHttpProcessor;import org.apache.http.protocol.RequestConnControl;import org.apache.http.protocol.RequestContent;import org.apache.http.protocol.RequestExpectContinue;import org.apache.http.protocol.RequestTargetHost;import org.apache.http.protocol.RequestUserAgent;import org.apache.http.protocol.ResponseConnControl;import org.apache.http.protocol.ResponseContent;import org.apache.http.protocol.ResponseDate;import org.apache.http.protocol.ResponseServer;/** * Asynchronous, fully streaming HTTP/1.1 reverse proxy. */public class NHttpReverseProxy {    public static void main(String[] args) throws Exception {        if (args.length < 1) {            System.out.println("Usage: NHttpReverseProxy <hostname> [port]");            System.exit(1);        }        URI uri = new URI(args[0]);        int port = 8080;        if (args.length > 1) {            port = Integer.parseInt(args[1]);        }        // Target host        HttpHost targetHost = new HttpHost(                uri.getHost(),                uri.getPort() > 0 ? uri.getPort() : 80,                uri.getScheme() != null ? uri.getScheme() : "http");        System.out.println("Reverse proxy to " + targetHost);        HttpParams params = new SyncBasicHttpParams();        params            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 30000)            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)            .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)            .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "Test/1.1")            .setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");        IOReactorConfig config = new IOReactorConfig();        config.setIoThreadCount(1);        final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(config);        final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(config);        // Set up HTTP protocol processor for incoming connections        HttpProcessor inhttpproc = new ImmutableHttpProcessor(                new HttpResponseInterceptor[] {                        new ResponseDate(),                        new ResponseServer(),                        new ResponseContent(),                        new ResponseConnControl()         });        // Set up HTTP protocol processor for outgoing connections        HttpProcessor outhttpproc = new ImmutableHttpProcessor(                new HttpRequestInterceptor[] {                        new RequestContent(),                        new RequestTargetHost(),                        new RequestConnControl(),                        new RequestUserAgent(),                        new RequestExpectContinue()        });        ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler();        HttpAsyncRequester executor = new HttpAsyncRequester(                outhttpproc, new ProxyOutgoingConnectionReuseStrategy(), params);        ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, params);        connPool.setMaxTotal(100);        connPool.setDefaultMaxPerRoute(20);        HttpAsyncRequestHandlerRegistry handlerRegistry = new HttpAsyncRequestHandlerRegistry();        handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool));        ProxyServiceHandler serviceHandler = new ProxyServiceHandler(                inhttpproc, new ProxyIncomingConnectionReuseStrategy(), handlerRegistry, params);        final IOEventDispatch connectingEventDispatch = new DefaultHttpClientIODispatch(                clientHandler, params);        final IOEventDispatch listeningEventDispatch = new DefaultHttpServerIODispatch(                serviceHandler, params);        Thread t = new Thread(new Runnable() {            public void run() {                try {                    connectingIOReactor.execute(connectingEventDispatch);                } catch (InterruptedIOException ex) {                    System.err.println("Interrupted");                } catch (IOException ex) {                    ex.printStackTrace();                } finally {                    try {                        listeningIOReactor.shutdown();                    } catch (IOException ex2) {                        ex2.printStackTrace();                    }                }            }        });        t.start();        try {            listeningIOReactor.listen(new InetSocketAddress(port));            listeningIOReactor.execute(listeningEventDispatch);        } catch (InterruptedIOException ex) {            System.err.println("Interrupted");        } catch (IOException ex) {            ex.printStackTrace();        } finally {            try {                connectingIOReactor.shutdown();            } catch (IOException ex2) {                ex2.printStackTrace();            }        }    }    static class ProxyHttpExchange {        private final ByteBuffer inBuffer;        private final ByteBuffer outBuffer;        private volatile String id;        private volatile HttpHost target;        private volatile HttpAsyncExchange responseTrigger;        private volatile IOControl originIOControl;        private volatile IOControl clientIOControl;        private volatile HttpRequest request;        private volatile boolean requestReceived;        private volatile HttpResponse response;        private volatile boolean responseReceived;        private volatile Exception ex;        public ProxyHttpExchange() {            super();            this.inBuffer = ByteBuffer.allocateDirect(10240);            this.outBuffer = ByteBuffer.allocateDirect(10240);        }        public ByteBuffer getInBuffer() {            return this.inBuffer;        }        public ByteBuffer getOutBuffer() {            return this.outBuffer;        }        public String getId() {            return this.id;        }        public void setId(final String id) {            this.id = id;        }        public HttpHost getTarget() {            return this.target;        }        public void setTarget(final HttpHost target) {            this.target = target;        }        public HttpRequest getRequest() {            return this.request;        }        public void setRequest(final HttpRequest request) {            this.request = request;        }        public HttpResponse getResponse() {            return this.response;        }        public void setResponse(final HttpResponse response) {            this.response = response;        }        public HttpAsyncExchange getResponseTrigger() {            return this.responseTrigger;        }        public void setResponseTrigger(final HttpAsyncExchange responseTrigger) {            this.responseTrigger = responseTrigger;        }        public IOControl getClientIOControl() {            return this.clientIOControl;        }        public void setClientIOControl(final IOControl clientIOControl) {            this.clientIOControl = clientIOControl;        }        public IOControl getOriginIOControl() {            return this.originIOControl;        }        public void setOriginIOControl(final IOControl originIOControl) {            this.originIOControl = originIOControl;        }        public boolean isRequestReceived() {            return this.requestReceived;        }        public void setRequestReceived() {            this.requestReceived = true;        }        public boolean isResponseReceived() {            return this.responseReceived;        }        public void setResponseReceived() {            this.responseReceived = true;        }        public Exception getException() {            return this.ex;        }        public void setException(final Exception ex) {            this.ex = ex;        }        public void reset() {            this.inBuffer.clear();            this.outBuffer.clear();            this.target = null;            this.id = null;            this.responseTrigger = null;            this.clientIOControl = null;            this.originIOControl = null;            this.request = null;            this.requestReceived = false;            this.response = null;            this.responseReceived = false;            this.ex = null;        }    }    static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> {        private final HttpHost target;        private final HttpAsyncRequester executor;        private final BasicNIOConnPool connPool;        private final AtomicLong counter;        public ProxyRequestHandler(                final HttpHost target,                final HttpAsyncRequester executor,                final BasicNIOConnPool connPool) {            super();            this.target = target;            this.executor = executor;            this.connPool = connPool;            this.counter = new AtomicLong(1);        }        public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest(                final HttpRequest request,                final HttpContext context) {            ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange");            if (httpExchange == null) {                httpExchange = new ProxyHttpExchange();                context.setAttribute("http-exchange", httpExchange);            }            synchronized (httpExchange) {                httpExchange.reset();                String id = String.format("%08X", this.counter.getAndIncrement());                httpExchange.setId(id);                httpExchange.setTarget(this.target);                return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool);            }        }        public void handle(                final ProxyHttpExchange httpExchange,                final HttpAsyncExchange responseTrigger,                final HttpContext context) throws HttpException, IOException {            synchronized (httpExchange) {                Exception ex = httpExchange.getException();                if (ex != null) {                    System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex);                    int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;                    HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,                            EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));                    String message = ex.getMessage();                    if (message == null) {                        message = "Unexpected error";                    }                    response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT));                    responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));                    System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered");                }                HttpResponse response = httpExchange.getResponse();                if (response != null) {                    responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange));                    System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered");                }                // No response yet.                httpExchange.setResponseTrigger(responseTrigger);            }        }    }    static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> {        private final ProxyHttpExchange httpExchange;        private final HttpAsyncRequester executor;        private final BasicNIOConnPool connPool;        private volatile boolean completed;        public ProxyRequestConsumer(                final ProxyHttpExchange httpExchange,                final HttpAsyncRequester executor,                final BasicNIOConnPool connPool) {            super();            this.httpExchange = httpExchange;            this.executor = executor;            this.connPool = connPool;        }        public void close() throws IOException {        }        public void requestReceived(final HttpRequest request) {            synchronized (this.httpExchange) {                System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine());                this.httpExchange.setRequest(request);                this.executor.execute(                        new ProxyRequestProducer(this.httpExchange),                        new ProxyResponseConsumer(this.httpExchange),                        this.connPool);            }        }        public void consumeContent(                final ContentDecoder decoder, final IOControl ioctrl) throws IOException {            synchronized (this.httpExchange) {                this.httpExchange.setClientIOControl(ioctrl);                // Receive data from the client                ByteBuffer buf = this.httpExchange.getInBuffer();                int n = decoder.read(buf);                System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read");                if (decoder.isCompleted()) {                    System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read");                }                // If the buffer is full, suspend client input until there is free                // space in the buffer                if (!buf.hasRemaining()) {                    ioctrl.suspendInput();                    System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input");                }                // If there is some content in the input buffer make sure origin                // output is active                if (buf.position() > 0) {                    if (this.httpExchange.getOriginIOControl() != null) {                        this.httpExchange.getOriginIOControl().requestOutput();                        System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output");                    }                }            }        }        public void requestCompleted(final HttpContext context) {            synchronized (this.httpExchange) {                this.completed = true;;                System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed");                this.httpExchange.setRequestReceived();                if (this.httpExchange.getOriginIOControl() != null) {                    this.httpExchange.getOriginIOControl().requestOutput();                    System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output");                }            }        }        public Exception getException() {            return null;        }        public ProxyHttpExchange getResult() {            return this.httpExchange;        }        public boolean isDone() {            return this.completed;        }        public void failed(final Exception ex) {            System.out.println("[client->proxy] " + ex.toString());        }    }    static class ProxyRequestProducer implements HttpAsyncRequestProducer {        private final ProxyHttpExchange httpExchange;        public ProxyRequestProducer(final ProxyHttpExchange httpExchange) {            super();            this.httpExchange = httpExchange;        }        public void close() throws IOException {        }        public HttpHost getTarget() {            synchronized (this.httpExchange) {                return this.httpExchange.getTarget();            }        }        public HttpRequest generateRequest() {            synchronized (this.httpExchange) {                HttpRequest request = this.httpExchange.getRequest();                System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine());                // Rewrite request!!!!                if (request instanceof HttpEntityEnclosingRequest) {                    BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest(                            request.getRequestLine());                    r.setEntity(((HttpEntityEnclosingRequest) request).getEntity());                    return r;                } else {                    return new BasicHttpRequest(request.getRequestLine());                }            }        }        public void produceContent(                final ContentEncoder encoder, final IOControl ioctrl) throws IOException {            synchronized (this.httpExchange) {                this.httpExchange.setOriginIOControl(ioctrl);                // Send data to the origin server                ByteBuffer buf = this.httpExchange.getInBuffer();                buf.flip();                int n = encoder.write(buf);                buf.compact();                System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written");                // If there is space in the buffer and the message has not been                // transferred, make sure the client is sending more data                if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) {                    if (this.httpExchange.getClientIOControl() != null) {                        this.httpExchange.getClientIOControl().requestInput();                        System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request client input");                    }                }                if (buf.position() == 0) {                    if (this.httpExchange.isRequestReceived()) {                        encoder.complete();                        System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written");                    } else {                        // Input buffer is empty. Wait until the client fills up                        // the buffer                        ioctrl.suspendOutput();                        System.out.println("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output");                    }                }            }        }        public void requestCompleted(final HttpContext context) {            synchronized (this.httpExchange) {                System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request completed");            }        }        public boolean isRepeatable() {            return false;        }        public void resetRequest() {        }        public void failed(final Exception ex) {            System.out.println("[proxy->origin] " + ex.toString());        }    }    static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> {        private final ProxyHttpExchange httpExchange;        private volatile boolean completed;        public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) {            super();            this.httpExchange = httpExchange;        }        public void close() throws IOException {        }        public void responseReceived(final HttpResponse response) {            synchronized (this.httpExchange) {                System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine());                this.httpExchange.setResponse(response);                HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger();                if (responseTrigger != null && !responseTrigger.isCompleted()) {                    System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered");                    responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange));                }            }        }        public void consumeContent(                final ContentDecoder decoder, final IOControl ioctrl) throws IOException {            synchronized (this.httpExchange) {                this.httpExchange.setOriginIOControl(ioctrl);                // Receive data from the origin                ByteBuffer buf = this.httpExchange.getOutBuffer();                int n = decoder.read(buf);                System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read");                if (decoder.isCompleted()) {                    System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read");                }                // If the buffer is full, suspend origin input until there is free                // space in the buffer                if (!buf.hasRemaining()) {                    ioctrl.suspendInput();                    System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input");                }                // If there is some content in the input buffer make sure client                // output is active                if (buf.position() > 0) {                    if (this.httpExchange.getClientIOControl() != null) {                        this.httpExchange.getClientIOControl().requestOutput();                        System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output");                    }                }            }        }        public void responseCompleted(final HttpContext context) {            synchronized (this.httpExchange) {                if (this.completed) {                    return;                }                this.completed = true;                System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed");                this.httpExchange.setResponseReceived();                if (this.httpExchange.getClientIOControl() != null) {                    this.httpExchange.getClientIOControl().requestOutput();                    System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output");                }            }        }        public void failed(final Exception ex) {            synchronized (this.httpExchange) {                if (this.completed) {                    return;                }                this.completed = true;                this.httpExchange.setException(ex);                HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger();                if (responseTrigger != null && !responseTrigger.isCompleted()) {                    System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex);                    int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;                    HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,                            EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));                    String message = ex.getMessage();                    if (message == null) {                        message = "Unexpected error";                    }                    response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT));                    responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));                }            }        }        public boolean cancel() {            synchronized (this.httpExchange) {                if (this.completed) {                    return false;                }                failed(new InterruptedIOException("Cancelled"));                return true;            }        }        public ProxyHttpExchange getResult() {            return this.httpExchange;        }        public Exception getException() {            return null;        }        public boolean isDone() {            return this.completed;        }    }    static class ProxyResponseProducer implements HttpAsyncResponseProducer {        private final ProxyHttpExchange httpExchange;        public ProxyResponseProducer(final ProxyHttpExchange httpExchange) {            super();            this.httpExchange = httpExchange;        }        public void close() throws IOException {            this.httpExchange.reset();        }        public HttpResponse generateResponse() {            synchronized (this.httpExchange) {                HttpResponse response = this.httpExchange.getResponse();                System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine());                // Rewrite response!!!!                BasicHttpResponse r = new BasicHttpResponse(response.getStatusLine());                r.setEntity(response.getEntity());                return r;            }        }        public void produceContent(                final ContentEncoder encoder, final IOControl ioctrl) throws IOException {            synchronized (this.httpExchange) {                this.httpExchange.setClientIOControl(ioctrl);                // Send data to the client                ByteBuffer buf = this.httpExchange.getOutBuffer();                buf.flip();                int n = encoder.write(buf);                buf.compact();                System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written");                // If there is space in the buffer and the message has not been                // transferred, make sure the origin is sending more data                if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) {                    if (this.httpExchange.getOriginIOControl() != null) {                        this.httpExchange.getOriginIOControl().requestInput();                        System.out.println("[client<-proxy] " + this.httpExchange.getId() + " request origin input");                    }                }                if (buf.position() == 0) {                    if (this.httpExchange.isResponseReceived()) {                        encoder.complete();                        System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written");                    } else {                        // Input buffer is empty. Wait until the origin fills up                        // the buffer                        ioctrl.suspendOutput();                        System.out.println("[client<-proxy] " + this.httpExchange.getId() + " suspend client output");                    }                }            }        }        public void responseCompleted(final HttpContext context) {            synchronized (this.httpExchange) {                System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response completed");            }        }        public void failed(final Exception ex) {            System.out.println("[client<-proxy] " + ex.toString());        }    }    static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {        @Override        public boolean keepAlive(final HttpResponse response, final HttpContext context) {            NHttpConnection conn = (NHttpConnection) context.getAttribute(                    ExecutionContext.HTTP_CONNECTION);            boolean keepAlive = super.keepAlive(response, context);            if (keepAlive) {                System.out.println("[client->proxy] connection kept alive " + conn);            }            return keepAlive;        }    };    static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {        @Override        public boolean keepAlive(final HttpResponse response, final HttpContext context) {            NHttpConnection conn = (NHttpConnection) context.getAttribute(                    ExecutionContext.HTTP_CONNECTION);            boolean keepAlive = super.keepAlive(response, context);            if (keepAlive) {                System.out.println("[proxy->origin] connection kept alive " + conn);            }            return keepAlive;        }    };    static class ProxyServiceHandler extends HttpAsyncService {        public ProxyServiceHandler(                final HttpProcessor httpProcessor,                final ConnectionReuseStrategy reuseStrategy,                final HttpAsyncRequestHandlerResolver handlerResolver,                final HttpParams params) {            super(httpProcessor, reuseStrategy, handlerResolver, params);        }        @Override        protected void log(final Exception ex) {            ex.printStackTrace();        }        @Override        public void connected(final NHttpServerConnection conn) {            System.out.println("[client->proxy] connection open " + conn);            super.connected(conn);        }        @Override        public void closed(final NHttpServerConnection conn) {            System.out.println("[client->proxy] connection closed " + conn);            super.closed(conn);        }    }    static class ProxyClientProtocolHandler extends HttpAsyncRequestExecutor {        public ProxyClientProtocolHandler() {            super();        }        @Override        protected void log(final Exception ex) {            ex.printStackTrace();        }        @Override        public void connected(final NHttpClientConnection conn,                final Object attachment) throws IOException, HttpException {            System.out.println("[proxy->origin] connection open " + conn);            super.connected(conn, attachment);        }        @Override        public void closed(final NHttpClientConnection conn) {            System.out.println("[proxy->origin] connection closed " + conn);            super.closed(conn);        }    }    static class ProxyConnPool extends BasicNIOConnPool {        public ProxyConnPool(final ConnectingIOReactor ioreactor, final HttpParams params) {            super(ioreactor, params);        }        public ProxyConnPool(                final ConnectingIOReactor ioreactor,                final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,                final HttpParams params) {            super(ioreactor, connFactory, params);        }        @Override        public void release(final BasicNIOPoolEntry entry, boolean reusable) {            System.out.println("[proxy->origin] connection released " + entry.getConnection());            super.release(entry, reusable);            StringBuilder buf = new StringBuilder();            PoolStats totals = getTotalStats();            buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");            buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());            buf.append(" of ").append(totals.getMax()).append("]");            System.out.println("[proxy->origin] " + buf.toString());        }    }}

本篇博文的版权信息如下:
HttpCore组件案例程序(Java描述) (Http Components- HttpCore Examples)

Tag:本文所有源代码来源均取自The Apache? Software Foundation Organization (http://hc.apache.org/httpcomponents-core-ga/examples.html)
2楼dxxang昨天 09:24
学习了
1楼zhengzhihust昨天 08:34
每日一博!

读书人网 >编程

热点推荐