读书人

异步读取数据 怎么添加线程

发布时间: 2012-08-03 00:12:14 作者: rapoo

异步读取数据, 如何添加线程

C/C++ code
往哪里添加线程啊???对于异步处理.代码是服务端,接受数据并显示。handle_read是读取数据函数,如果数据发送过多,  假设有多个客户端发送数据过来是否需要用线程来处理???线程 貌似加不上, 谁个演示,加上个线程,来处理数据。 class clientSession    :public boost::enable_shared_from_this<clientSession>{public:    clientSession(boost::asio::io_service& ioservice)        :m_socket(ioservice)    {        memset(data_,'\0',sizeof(data_));    }    ~clientSession()    {}    tcp::socket& socket()    {        return m_socket;    }    void start()    {        m_socket.async_read_some(boost::asio::buffer(data_,max_len),                boost::bind(&clientSession::handle_read,shared_from_this(),                boost::asio::placeholders::error));    }private:        void handle_read(const boost::system::error_code& error)    {                if(!error)        {            m_socket.async_read_some(boost::asio::buffer(data_,max_len),                boost::bind(&clientSession::handle_read,shared_from_this(),                boost::asio::placeholders::error));             std::cout << data_ << std::endl;        }        else        {            m_socket.close();        }        }private:    tcp::socket m_socket;    char data_[max_len];};class serverApp{    typedef boost::shared_ptr<clientSession> session_ptr;public:    serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint)        :m_ioservice(ioservice),        acceptor_(ioservice,endpoint)    {        session_ptr new_session(new clientSession(ioservice));        acceptor_.async_accept(new_session->socket(),            boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,            new_session));    } private:    void handle_accept(const boost::system::error_code& error,session_ptr& session)    {        if(!error)        {            std::cout << "get a new client!" << std::endl;            //实现对每个客户端的数据处理            session->start();            //在这就应该看出为什么要封session类了吧,每一个session就是一个客户端            session_ptr new_session(new clientSession(m_ioservice));            acceptor_.async_accept(new_session->socket(),            boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error,            new_session));        }    }private:    boost::asio::io_service& m_ioservice;    tcp::acceptor acceptor_;};


[解决办法]
epoll单线程并发读写,如有阻塞操作则创建线程池做阻塞操作,epoll线程将阻塞任务派发到线程池做处理, 线程池处理完后送回epoll线程,触发session状态变更调用状态机继续处理。

异步化需要用到epoll+pipe+queue的编程设计, 不熟悉只能读点开源代码积累一下这么方面的见识了。
[解决办法]
探讨
不好意思啊, 问题没有描述清楚额, 多怪我了


需求: 10个客户端发送数据给 服务端, 服务单接受数据,然后保存到磁盘(以不同的文件名保存)

不用其他模型,就用 boost实现。 能否在我的主帖的代码上提供 思路。 谢谢大家了啊。




平台:windows,

感谢楼上那位搞linux的,呵呵

[解决办法]
参考boost提供的例子,很有帮助的:
http://www.boost.org/doc/libs/1_49_0/doc/html/boost_asio/examples.html#boost_asio.examples.chat
[解决办法]
运行boost::asio::io_service的run函数的线程就是工作线程
C/C++ code
        std::vector<boost::shared_ptr<boost::thread> > threads;                for (std::size_t i = 0; i < io_thread_pool_size_; ++i)        {            boost::shared_ptr<boost::thread> thread(new boost::thread(                boost::bind(&boost::asio::io_service::run, &io_service_)));            threads.push_back(thread);        }
------解决方案--------------------


C/C++ code
#include <boost/asio.hpp>#include <boost/smart_ptr/enable_shared_from_this.hpp>#include <boost/bind.hpp>#include <boost/thread/thread.hpp>#include <string>#include <vector>using boost::enable_shared_from_this;using boost::shared_ptr;using namespace boost::asio;using boost::asio::ip::tcp;using std::string;typedef shared_ptr<tcp::socket> sock_pt;#define max_len      200#define client_count 10#define THREAD_NUM   1class ClientSession : public enable_shared_from_this<ClientSession>{public:    ClientSession(io_service& ioservice, sock_pt sock)        :m_socket(sock), m_pos(0)    {        memset(m_data, '\0', sizeof(m_data));    }    sock_pt socket()    {        return m_socket;    }    void start(int nPos)    {        m_pos = 0;        m_socket->async_read_some(buffer(m_data, max_len),            boost::bind(&ClientSession::handle_read, shared_from_this(), placeholders::error));    }private:    //fileName是文件名    void handle_read(const boost::system::error_code& error)    {        if (! error)        {            m_socket->async_read_some(buffer(m_data, max_len),                 boost::bind(&ClientSession::handle_read, shared_from_this(), placeholders::error));            //存盘            printf("Client(%d) recv msg:%s\n", m_pos, m_data);        }        else        {            m_socket->close();        }    }private:    sock_pt m_socket;    char m_data[max_len];    int m_pos;};class serverApp{    typedef boost::shared_ptr<ClientSession> session_ptr;public:    serverApp(io_service& ioservice,tcp::endpoint& endpoint)        :m_ioservice(ioservice),        m_acceptor(ioservice, endpoint)    {        for(int i= 0 ;i < client_count; i++)        {            sock_pt sock(new tcp::socket(ioservice));            m_acceptor.async_accept(*sock, boost::bind(&serverApp::handle_accept, this, placeholders::error, sock, i));        }    }private:    void handle_accept(const boost::system::error_code& error, sock_pt sock, int nPos)    {        if (! error)        {            session_ptr new_session(new ClientSession(m_ioservice, sock));            new_session->start(nPos);        }    }private:    io_service& m_ioservice;    tcp::acceptor m_acceptor;};int _tmain(int argc, _TCHAR* argv[]){    boost::asio::io_service ios;    serverApp serv(ios, tcp::endpoint(tcp::v4(), 6688));    boost::thread_group tg;    for (int i = 0; i < THREAD_NUM; i++)        tg.add_thread(new boost::thread(boost::bind(&io_service::run, &ios)));    tg.join_all();    return 0;} 

读书人网 >C++

热点推荐