Home | History | Annotate | Download | only in remote-processor
      1 /*
      2  * Copyright (c) 2011-2015, Intel Corporation
      3  * All rights reserved.
      4  *
      5  * Redistribution and use in source and binary forms, with or without modification,
      6  * are permitted provided that the following conditions are met:
      7  *
      8  * 1. Redistributions of source code must retain the above copyright notice, this
      9  * list of conditions and the following disclaimer.
     10  *
     11  * 2. Redistributions in binary form must reproduce the above copyright notice,
     12  * this list of conditions and the following disclaimer in the documentation and/or
     13  * other materials provided with the distribution.
     14  *
     15  * 3. Neither the name of the copyright holder nor the names of its contributors
     16  * may be used to endorse or promote products derived from this software without
     17  * specific prior written permission.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
     20  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     21  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
     22  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
     23  * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
     24  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
     25  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
     26  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     28  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 #include "RemoteProcessorServer.h"
     31 #include <iostream>
     32 #include <memory>
     33 #include <assert.h>
     34 #include <string.h>
     35 #include <unistd.h>
     36 #include "RequestMessage.h"
     37 #include "AnswerMessage.h"
     38 #include "RemoteCommandHandler.h"
     39 #include "Socket.h"
     40 #include "convert.hpp"
     41 
     42 using std::string;
     43 
     44 CRemoteProcessorServer::CRemoteProcessorServer(std::string bindAddress)
     45     : _bindAddress(bindAddress), _io_service(), _acceptor(_io_service), _socket(_io_service)
     46 {
     47 }
     48 
     49 CRemoteProcessorServer::~CRemoteProcessorServer()
     50 {
     51     stop();
     52 }
     53 
     54 // State
     55 bool CRemoteProcessorServer::start(string &error)
     56 {
     57     using namespace asio;
     58 
     59     try {
     60         generic::stream_protocol::endpoint endpoint;
     61         uint16_t port;
     62         std::string endpointName;
     63         bool isInet;
     64 
     65         // For backward compatibility, tcp port referred by its value only
     66         if (convertTo(_bindAddress, port)) {
     67             isInet = true;
     68         } else {
     69             // required form is <protocol>://<host:port|port_name>
     70             const std::string tcpProtocol{"tcp"};
     71             const std::string unixProtocol{"unix"};
     72             const std::vector<std::string> supportedProtocols{ tcpProtocol, unixProtocol };
     73             const std::string protocolDel{"://"};
     74 
     75             size_t protocolDelPos = _bindAddress.find(protocolDel);
     76             if (protocolDelPos == std::string::npos) {
     77                 error = "bindaddress " + _bindAddress + " ill formed, missing " + protocolDel;
     78                 return false;
     79             }
     80             std::string protocol = _bindAddress.substr(0, protocolDelPos);
     81 
     82             if (std::find(begin(supportedProtocols), end(supportedProtocols), protocol) ==
     83                     end(supportedProtocols)) {
     84                 error = "bindaddress " + _bindAddress + " has invalid protocol " + protocol;
     85                 return false;
     86             }
     87             isInet = (_bindAddress.find(tcpProtocol) != std::string::npos);
     88             if (isInet) {
     89                 size_t portDelPos = _bindAddress.find(':', protocolDelPos + protocolDel.size());
     90                 if (portDelPos == std::string::npos) {
     91                     error = "bindaddress " + _bindAddress + " ill formed, missing " + ":";
     92                     return false;
     93                 }
     94                 std::string portLiteral{_bindAddress.substr(portDelPos + 1)};
     95                 if (!convertTo(portLiteral, port)) {
     96                     error = "bindaddress " + _bindAddress + " port " + portLiteral + " ill formed";
     97                     return false;
     98                 }
     99             } else {
    100                 endpointName = _bindAddress.substr(protocolDelPos + protocolDel.size());
    101             }
    102         }
    103 
    104         if (isInet) {
    105             endpoint = ip::tcp::endpoint(ip::tcp::v6(), port);
    106         } else {
    107             endpoint = local::stream_protocol::endpoint(endpointName);
    108         }
    109 
    110         _acceptor.open(endpoint.protocol());
    111 
    112         if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
    113             _acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
    114         } else if (endpoint.protocol().protocol() == AF_UNSPEC) {
    115             // In case of reuse, remote it first
    116             unlink(endpointName.c_str());
    117         }
    118         _acceptor.set_option(socket_base::linger(true, 0));
    119         _acceptor.set_option(socket_base::enable_connection_aborted(true));
    120 
    121         _acceptor.bind(endpoint);
    122         _acceptor.listen();
    123     } catch (std::exception &e) {
    124         error = "Unable to listen on " + _bindAddress + ": " + e.what();
    125         return false;
    126     }
    127 
    128     return true;
    129 }
    130 
    131 bool CRemoteProcessorServer::stop()
    132 {
    133     _io_service.stop();
    134 
    135     return true;
    136 }
    137 
    138 void CRemoteProcessorServer::acceptRegister(IRemoteCommandHandler &commandHandler)
    139 {
    140     auto peerHandler = [this, &commandHandler](asio::error_code ec) {
    141         if (ec) {
    142             std::cerr << "Accept failed: " << ec.message() << std::endl;
    143             return;
    144         }
    145 
    146         const auto &endpoint = _socket.local_endpoint();
    147         if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) {
    148             _socket.set_option(asio::ip::tcp::no_delay(true));
    149         }
    150         handleNewConnection(commandHandler);
    151 
    152         _socket.close();
    153 
    154         acceptRegister(commandHandler);
    155     };
    156 
    157     _acceptor.async_accept(_socket, peerHandler);
    158 }
    159 
    160 bool CRemoteProcessorServer::process(IRemoteCommandHandler &commandHandler)
    161 {
    162     acceptRegister(commandHandler);
    163 
    164     asio::error_code ec;
    165 
    166     _io_service.run(ec);
    167 
    168     if (ec) {
    169         std::cerr << "Server failed: " << ec.message() << std::endl;
    170     }
    171 
    172     return ec.value() == 0;
    173 }
    174 
    175 // New connection
    176 void CRemoteProcessorServer::handleNewConnection(IRemoteCommandHandler &commandHandler)
    177 {
    178     // Process all incoming requests from the client
    179     while (true) {
    180 
    181         // Process requests
    182         // Create command message
    183         CRequestMessage requestMessage;
    184 
    185         string strError;
    186         ///// Receive command
    187         CRequestMessage::Result res;
    188         res = requestMessage.serialize(Socket(_socket), false, strError);
    189 
    190         switch (res) {
    191         case CRequestMessage::error:
    192             std::cout << "Error while receiving message: " << strError << std::endl;
    193         // fall through
    194         case CRequestMessage::peerDisconnected:
    195             // Consider peer disconnection as normal, no log
    196             return; // Bail out
    197         case CRequestMessage::success:
    198             break; // No error, continue
    199         }
    200 
    201         // Actually process the request
    202         bool bSuccess;
    203 
    204         string strResult;
    205 
    206         bSuccess = commandHandler.remoteCommandProcess(requestMessage, strResult);
    207 
    208         // Send back answer
    209         // Create answer message
    210         CAnswerMessage answerMessage(strResult, bSuccess);
    211 
    212         ///// Send answer
    213         res = answerMessage.serialize(_socket, true, strError);
    214 
    215         switch (res) {
    216         case CRequestMessage::peerDisconnected:
    217         // Peer should not disconnect while waiting for an answer
    218         // Fall through to log the error and bail out
    219         case CRequestMessage::error:
    220             std::cout << "Error while receiving message: " << strError << std::endl;
    221             return; // Bail out
    222         case CRequestMessage::success:
    223             break; // No error, continue
    224         }
    225     }
    226 }
    227