file transfer over asynchronous TCP connection via boost.asio

// another sample that send file name and content to Tcp server, but using asynchronous mode Tcp connection

//
// send a file to a tcp server via boost.asio library
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <fstream>
#include <sstream>
using boost::asio::ip::tcp;
class async_tcp_client
{
public:
    async_tcp_client(boost::asio::io_service& io_service,
        const std::string& server, const std::string& path)
        : resolver_(io_service),
        socket_(io_service)
    {
        size_t pos = server.find(‘:’);
        if (pos==std::string::npos)
            return;
        std::string port_string = server.substr(pos+1);
        std::string server_ip_or_host = server.substr(0, pos);

        source_file.open(path.c_str(), std::ios_base::binary | std::ios_base::ate);
        if (!source_file)
        {
            std::cout << "failed to open " << path << std::endl;
            return ;
        }
        size_t file_size = source_file.tellg();
        source_file.seekg(0);
        // first send file name and file size to server
        std::ostream request_stream(&request_);
        request_stream << path << "\n"
            << file_size << "\n\n";
        std::cout << "request size:"<<request_.size()<<std::endl;
        // Start an asynchronous resolve to translate the server and service names
        // into a list of endpoints.
        tcp::resolver::query query(server_ip_or_host, port_string);
        resolver_.async_resolve(query,
            boost::bind(&async_tcp_client::handle_resolve, this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::iterator));
    }

private:
    void handle_resolve(const boost::system::error_code& err,
        tcp::resolver::iterator endpoint_iterator)
    {
        if (!err)
        {
            // Attempt a connection to the first endpoint in the list. Each endpoint
            // will be tried until we successfully establish a connection.
            tcp::endpoint endpoint = *endpoint_iterator;
            socket_.async_connect(endpoint,
                boost::bind(&async_tcp_client::handle_connect, this,
                boost::asio::placeholders::error, ++endpoint_iterator));
        }
        else
        {
            std::cout << "Error: " << err.message() << "\n";
        }
    }

    void handle_connect(const boost::system::error_code& err,
        tcp::resolver::iterator endpoint_iterator)
    {
        if (!err)
        {
            // The connection was successful. Send the request.
            boost::asio::async_write(socket_, request_,
                boost::bind(&async_tcp_client::handle_write_file, this,
                boost::asio::placeholders::error));
        }
        else if (endpoint_iterator != tcp::resolver::iterator())
        {
            // The connection failed. Try the next endpoint in the list.
            socket_.close();
            tcp::endpoint endpoint = *endpoint_iterator;
            socket_.async_connect(endpoint,
                boost::bind(&async_tcp_client::handle_connect, this,
                boost::asio::placeholders::error, ++endpoint_iterator));
        }
        else
        {
            std::cout << "Error: " << err.message() << "\n";
        }
    }

    void handle_write_file(const boost::system::error_code& err)
    {
        if (!err)
        {
                if (source_file.eof()==false)
                {
                    source_file.read(buf.c_array(), (std::streamsize)buf.size());
                    if (source_file.gcount()<=0)
                    {
                        std::cout << "read file error " << std::endl;
                        return;
                    }
                    std::cout << "send " <<source_file.gcount()<<" bytes, total:" << source_file.tellg() << " bytes.\n";
                    boost::asio::async_write(socket_,
                        boost::asio::buffer(buf.c_array(), source_file.gcount()),
                        boost::bind(&async_tcp_client::handle_write_file, this,
                        boost::asio::placeholders::error));
                    if (err)
                    {
                        std::cout << "send error:" << err << std::endl;
                        return;
                    }
                }
                else
                    return;
        }
        else
        {
            std::cout << "Error: " << err.message() << "\n";
        }

    }
    tcp::resolver resolver_;
    tcp::socket socket_;
    boost::array<char, 1024> buf;
    boost::asio::streambuf request_;
    std::ifstream source_file;
};
int main(int argc, char* argv[])
{
    if (argc != 3)
    {
        std::cerr << "Usage: " << argv[0] << " <server-address> <file path>" << std::endl;
        std::cerr << "sample: " << argv[0] << " 127.0.0.1:1234 c:\\tmp\\a.txt" << std::endl;
        return __LINE__;
    }
    try
    {
        boost::asio::io_service io_service;
        async_tcp_client client(io_service, argv[1], argv[2]);
        io_service.run();

        std::cout << "send file " << argv[2] << " completed successfully.\n";
//         system("pause");
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

// I also made an asynchronous Tcp server to accept work with above client.

//receive a file from socket client via boost.asio
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <fstream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
unsigned short tcp_port = 1234;
class async_tcp_connection: public boost::enable_shared_from_this<async_tcp_connection>
{
public:
    async_tcp_connection(boost::asio::io_service& io_service)
        : socket_(io_service), file_size(0)
    {
    }
    void start()
    {
        std::cout << __FUNCTION__  << std::endl;
        async_read_until(socket_,
            request_buf, "\n\n",
            boost::bind(&async_tcp_connection::handle_read_request,
            shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    }
    boost::asio::ip::tcp::socket& socket() { return socket_; }
private:
    boost::asio::streambuf request_buf;
    size_t file_size;
    std::ofstream output_file;
    boost::asio::ip::tcp::socket socket_;
    boost::array<char, 40960> buf;
    void handle_read_request(const boost::system::error_code& err, std::size_t bytes_transferred)
    {
        if (err)
        {
            return handle_error(__FUNCTION__, err);
        }
        std::cout << __FUNCTION__ << "(" << bytes_transferred << ")"
            << ", in_avail=" << request_buf.in_avail()
            << ", size=" << request_buf.size()
            << ", max_size=" << request_buf.max_size() <<".\n";
        std::istream request_stream(&request_buf);
        std::string file_path;           
        request_stream >> file_path;
        request_stream >> file_size;
        request_stream.read(buf.c_array(), 2); // eat the "\n\n"
        std::cout << file_path << " size is " << file_size << ", tellg=" << request_stream.tellg()<< std::endl;
        size_t pos = file_path.find_last_of(‘\\’);
        if (pos!=std::string::npos)
            file_path = file_path.substr(pos+1);
        output_file.open(file_path.c_str(), std::ios_base::binary);
        if (!output_file)
        {
            std::cout << "failed to open " << file_path << std::endl;
            return;
        }
        // write extra bytes to file
        do
        {
            request_stream.read(buf.c_array(), (std::streamsize)buf.size());
            std::cout << __FUNCTION__ << " write " << request_stream.gcount() << " bytes.\n";
            output_file.write(buf.c_array(), request_stream.gcount());
        } while (request_stream.gcount()>0);
        async_read(socket_, boost::asio::buffer(buf.c_array(), buf.size()),
            boost::bind(&async_tcp_connection::handle_read_file_content,
            shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));          
    }

    void handle_read_file_content(const boost::system::error_code& err, std::size_t bytes_transferred)
    {
        if (bytes_transferred>0)
        {
            output_file.write(buf.c_array(), (std::streamsize)bytes_transferred);
            std::cout << __FUNCTION__ << " recv " << output_file.tellp() << " bytes."<< std::endl;
            if (output_file.tellp()>=(std::streamsize)file_size)
            {
                return;
            }
        }
        if (err)
        {
            return handle_error(__FUNCTION__, err);
        }
        async_read(socket_, boost::asio::buffer(buf.c_array(), buf.size()),
            boost::bind(&async_tcp_connection::handle_read_file_content,
            shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    }
    void handle_error(const std::string& function_name, const boost::system::error_code& err)
    {
        std::cout << __FUNCTION__ << " in " << function_name <<" due to " << err <<" " << err.message()<< std::endl;
    }
};

class async_tcp_server : private boost::noncopyable
{
public:
    typedef boost::shared_ptr<async_tcp_connection> ptr_async_tcp_connection;

    async_tcp_server(unsigned short port)
        : acceptor_(io_service_, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port), true)
    {
            ptr_async_tcp_connection new_connection_(new async_tcp_connection(io_service_));
            acceptor_.async_accept(new_connection_->socket(),
            boost::bind(&async_tcp_server::handle_accept, this,new_connection_,
            boost::asio::placeholders::error));
            io_service_.run();
   }   
    void handle_accept(ptr_async_tcp_connection current_connection, const boost::system::error_code& e)
    {
        std::cout << __FUNCTION__ << " " << e << ", " << e.message()<<std::endl;
        if (!e)
        {
            current_connection->start();
            //ptr_async_tcp_connection new_connection_(new async_tcp_connection(io_service_));
            //acceptor_.async_accept(new_connection_->socket(),
            //    boost::bind(&async_tcp_server::handle_accept, this,new_connection_,
            //    boost::asio::placeholders::error));
        }
    }
    ~async_tcp_server()
    {
        io_service_.stop();
    }
private:
    boost::asio::io_service io_service_;
    boost::asio::ip::tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
    try
    {
        if (argc==2)
        {
            tcp_port=atoi(argv[1]);
        }
        std::cout <<argv[0] << " listen on port " << tcp_port << std::endl;
        async_tcp_server *recv_file_tcp_server = new async_tcp_server(tcp_port);
        delete recv_file_tcp_server;
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

About these ads
This entry was posted in Open Source. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s