1 /* 2 * Copyright (c) 2011-2015, Intel Corporation 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without modification, 6 * are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, this 9 * list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright notice, 12 * this list of conditions and the following disclaimer in the documentation and/or 13 * other materials provided with the distribution. 14 * 15 * 3. Neither the name of the copyright holder nor the names of its contributors 16 * may be used to endorse or promote products derived from this software without 17 * specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 22 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR 23 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 26 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 28 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 #include "RemoteProcessorServer.h" 31 #include <iostream> 32 #include <memory> 33 #include <assert.h> 34 #include <string.h> 35 #include <unistd.h> 36 #include "RequestMessage.h" 37 #include "AnswerMessage.h" 38 #include "RemoteCommandHandler.h" 39 #include "Socket.h" 40 #include "convert.hpp" 41 42 using std::string; 43 44 CRemoteProcessorServer::CRemoteProcessorServer(std::string bindAddress) 45 : _bindAddress(bindAddress), _io_service(), _acceptor(_io_service), _socket(_io_service) 46 { 47 } 48 49 CRemoteProcessorServer::~CRemoteProcessorServer() 50 { 51 stop(); 52 } 53 54 // State 55 bool CRemoteProcessorServer::start(string &error) 56 { 57 using namespace asio; 58 59 try { 60 generic::stream_protocol::endpoint endpoint; 61 uint16_t port; 62 std::string endpointName; 63 bool isInet; 64 65 // For backward compatibility, tcp port referred by its value only 66 if (convertTo(_bindAddress, port)) { 67 isInet = true; 68 } else { 69 // required form is <protocol>://<host:port|port_name> 70 const std::string tcpProtocol{"tcp"}; 71 const std::string unixProtocol{"unix"}; 72 const std::vector<std::string> supportedProtocols{ tcpProtocol, unixProtocol }; 73 const std::string protocolDel{"://"}; 74 75 size_t protocolDelPos = _bindAddress.find(protocolDel); 76 if (protocolDelPos == std::string::npos) { 77 error = "bindaddress " + _bindAddress + " ill formed, missing " + protocolDel; 78 return false; 79 } 80 std::string protocol = _bindAddress.substr(0, protocolDelPos); 81 82 if (std::find(begin(supportedProtocols), end(supportedProtocols), protocol) == 83 end(supportedProtocols)) { 84 error = "bindaddress " + _bindAddress + " has invalid protocol " + protocol; 85 return false; 86 } 87 isInet = (_bindAddress.find(tcpProtocol) != std::string::npos); 88 if (isInet) { 89 size_t portDelPos = _bindAddress.find(':', protocolDelPos + protocolDel.size()); 90 if (portDelPos == std::string::npos) { 91 error = "bindaddress " + _bindAddress + " ill formed, missing " + ":"; 92 return false; 93 } 94 std::string portLiteral{_bindAddress.substr(portDelPos + 1)}; 95 if (!convertTo(portLiteral, port)) { 96 error = "bindaddress " + _bindAddress + " port " + portLiteral + " ill formed"; 97 return false; 98 } 99 } else { 100 endpointName = _bindAddress.substr(protocolDelPos + protocolDel.size()); 101 } 102 } 103 104 if (isInet) { 105 endpoint = ip::tcp::endpoint(ip::tcp::v6(), port); 106 } else { 107 endpoint = local::stream_protocol::endpoint(endpointName); 108 } 109 110 _acceptor.open(endpoint.protocol()); 111 112 if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) { 113 _acceptor.set_option(ip::tcp::acceptor::reuse_address(true)); 114 } else if (endpoint.protocol().protocol() == AF_UNSPEC) { 115 // In case of reuse, remote it first 116 unlink(endpointName.c_str()); 117 } 118 _acceptor.set_option(socket_base::linger(true, 0)); 119 _acceptor.set_option(socket_base::enable_connection_aborted(true)); 120 121 _acceptor.bind(endpoint); 122 _acceptor.listen(); 123 } catch (std::exception &e) { 124 error = "Unable to listen on " + _bindAddress + ": " + e.what(); 125 return false; 126 } 127 128 return true; 129 } 130 131 bool CRemoteProcessorServer::stop() 132 { 133 _io_service.stop(); 134 135 return true; 136 } 137 138 void CRemoteProcessorServer::acceptRegister(IRemoteCommandHandler &commandHandler) 139 { 140 auto peerHandler = [this, &commandHandler](asio::error_code ec) { 141 if (ec) { 142 std::cerr << "Accept failed: " << ec.message() << std::endl; 143 return; 144 } 145 146 const auto &endpoint = _socket.local_endpoint(); 147 if (endpoint.protocol().protocol() == ASIO_OS_DEF(IPPROTO_TCP)) { 148 _socket.set_option(asio::ip::tcp::no_delay(true)); 149 } 150 handleNewConnection(commandHandler); 151 152 _socket.close(); 153 154 acceptRegister(commandHandler); 155 }; 156 157 _acceptor.async_accept(_socket, peerHandler); 158 } 159 160 bool CRemoteProcessorServer::process(IRemoteCommandHandler &commandHandler) 161 { 162 acceptRegister(commandHandler); 163 164 asio::error_code ec; 165 166 _io_service.run(ec); 167 168 if (ec) { 169 std::cerr << "Server failed: " << ec.message() << std::endl; 170 } 171 172 return ec.value() == 0; 173 } 174 175 // New connection 176 void CRemoteProcessorServer::handleNewConnection(IRemoteCommandHandler &commandHandler) 177 { 178 // Process all incoming requests from the client 179 while (true) { 180 181 // Process requests 182 // Create command message 183 CRequestMessage requestMessage; 184 185 string strError; 186 ///// Receive command 187 CRequestMessage::Result res; 188 res = requestMessage.serialize(Socket(_socket), false, strError); 189 190 switch (res) { 191 case CRequestMessage::error: 192 std::cout << "Error while receiving message: " << strError << std::endl; 193 // fall through 194 case CRequestMessage::peerDisconnected: 195 // Consider peer disconnection as normal, no log 196 return; // Bail out 197 case CRequestMessage::success: 198 break; // No error, continue 199 } 200 201 // Actually process the request 202 bool bSuccess; 203 204 string strResult; 205 206 bSuccess = commandHandler.remoteCommandProcess(requestMessage, strResult); 207 208 // Send back answer 209 // Create answer message 210 CAnswerMessage answerMessage(strResult, bSuccess); 211 212 ///// Send answer 213 res = answerMessage.serialize(_socket, true, strError); 214 215 switch (res) { 216 case CRequestMessage::peerDisconnected: 217 // Peer should not disconnect while waiting for an answer 218 // Fall through to log the error and bail out 219 case CRequestMessage::error: 220 std::cout << "Error while receiving message: " << strError << std::endl; 221 return; // Bail out 222 case CRequestMessage::success: 223 break; // No error, continue 224 } 225 } 226 } 227