1 /*------------------------------------------------------------------------- 2 * drawElements Quality Program Execution Server 3 * --------------------------------------------- 4 * 5 * Copyright 2014 The Android Open Source Project 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 *//*! 20 * \file 21 * \brief Test Execution Server. 22 *//*--------------------------------------------------------------------*/ 23 24 #include "xsExecutionServer.hpp" 25 #include "deClock.h" 26 27 #include <cstdio> 28 29 using std::vector; 30 using std::string; 31 32 #if 1 33 # define DBG_PRINT(X) printf X 34 #else 35 # define DBG_PRINT(X) 36 #endif 37 38 namespace xs 39 { 40 41 inline bool MessageBuilder::isComplete (void) const 42 { 43 if (m_buffer.size() < MESSAGE_HEADER_SIZE) 44 return false; 45 else 46 return m_buffer.size() == getMessageSize(); 47 } 48 49 const deUint8* MessageBuilder::getMessageData (void) const 50 { 51 return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL; 52 } 53 54 size_t MessageBuilder::getMessageDataSize (void) const 55 { 56 DE_ASSERT(isComplete()); 57 return m_buffer.size() - MESSAGE_HEADER_SIZE; 58 } 59 60 void MessageBuilder::read (ByteBuffer& src) 61 { 62 // Try to get header. 63 if (m_buffer.size() < MESSAGE_HEADER_SIZE) 64 { 65 while (m_buffer.size() < MESSAGE_HEADER_SIZE && 66 src.getNumElements() > 0) 67 m_buffer.push_back(src.popBack()); 68 69 DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE); 70 71 if (m_buffer.size() == MESSAGE_HEADER_SIZE) 72 { 73 // Got whole header, parse it. 74 Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize); 75 } 76 } 77 78 if (m_buffer.size() >= MESSAGE_HEADER_SIZE) 79 { 80 // We have header. 81 size_t msgSize = getMessageSize(); 82 size_t numBytesLeft = msgSize - m_buffer.size(); 83 size_t numToRead = (size_t)de::min(src.getNumElements(), (int)numBytesLeft); 84 85 if (numToRead > 0) 86 { 87 int curBufPos = (int)m_buffer.size(); 88 m_buffer.resize(curBufPos+numToRead); 89 src.popBack(&m_buffer[curBufPos], (int)numToRead); 90 } 91 } 92 } 93 94 void MessageBuilder::clear (void) 95 { 96 m_buffer.clear(); 97 m_messageType = MESSAGETYPE_NONE; 98 m_messageSize = 0; 99 } 100 101 ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode) 102 : TcpServer (family, port) 103 , m_testDriver (testProcess) 104 , m_runMode (runMode) 105 { 106 } 107 108 ExecutionServer::~ExecutionServer (void) 109 { 110 } 111 112 TestDriver* ExecutionServer::acquireTestDriver (void) 113 { 114 if (!m_testDriverLock.tryLock()) 115 throw Error("Failed to acquire test driver"); 116 117 return &m_testDriver; 118 } 119 120 void ExecutionServer::releaseTestDriver (TestDriver* driver) 121 { 122 DE_ASSERT(&m_testDriver == driver); 123 DE_UNREF(driver); 124 m_testDriverLock.unlock(); 125 } 126 127 ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress) 128 { 129 printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort()); 130 return new ExecutionRequestHandler(this, socket); 131 } 132 133 void ExecutionServer::connectionDone (ConnectionHandler* handler) 134 { 135 if (m_runMode == RUNMODE_SINGLE_EXEC) 136 m_socket.close(); 137 138 TcpServer::connectionDone(handler); 139 } 140 141 ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket) 142 : ConnectionHandler (server, socket) 143 , m_execServer (server) 144 , m_testDriver (DE_NULL) 145 , m_bufferIn (RECV_BUFFER_SIZE) 146 , m_bufferOut (SEND_BUFFER_SIZE) 147 , m_run (false) 148 , m_sendRecvTmpBuf (SEND_RECV_TMP_BUFFER_SIZE) 149 { 150 // Set flags. 151 m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC); 152 153 // Init protocol keepalives. 154 initKeepAlives(); 155 } 156 157 ExecutionRequestHandler::~ExecutionRequestHandler (void) 158 { 159 if (m_testDriver) 160 m_execServer->releaseTestDriver(m_testDriver); 161 } 162 163 void ExecutionRequestHandler::handle (void) 164 { 165 DBG_PRINT(("ExecutionRequestHandler::handle()\n")); 166 167 try 168 { 169 // Process execution session. 170 processSession(); 171 } 172 catch (const std::exception& e) 173 { 174 printf("ExecutionRequestHandler::run(): %s\n", e.what()); 175 } 176 177 DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n")); 178 179 // Release test driver. 180 if (m_testDriver) 181 { 182 try 183 { 184 m_testDriver->reset(); 185 } 186 catch (...) 187 { 188 } 189 m_execServer->releaseTestDriver(m_testDriver); 190 m_testDriver = DE_NULL; 191 } 192 193 // Close connection. 194 if (m_socket->isConnected()) 195 m_socket->shutdown(); 196 } 197 198 void ExecutionRequestHandler::acquireTestDriver (void) 199 { 200 DE_ASSERT(!m_testDriver); 201 202 // Try to acquire test driver - may fail. 203 m_testDriver = m_execServer->acquireTestDriver(); 204 DE_ASSERT(m_testDriver); 205 m_testDriver->reset(); 206 207 } 208 209 void ExecutionRequestHandler::processSession (void) 210 { 211 m_run = true; 212 213 deUint64 lastIoTime = deGetMicroseconds(); 214 215 while (m_run) 216 { 217 bool anyIO = false; 218 219 // Read from socket to buffer. 220 anyIO = receive() || anyIO; 221 222 // Send bytes in buffer. 223 anyIO = send() || anyIO; 224 225 // Process incoming data. 226 if (m_bufferIn.getNumElements() > 0) 227 { 228 DE_ASSERT(!m_msgBuilder.isComplete()); 229 m_msgBuilder.read(m_bufferIn); 230 } 231 232 if (m_msgBuilder.isComplete()) 233 { 234 // Process message. 235 processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize()); 236 237 m_msgBuilder.clear(); 238 } 239 240 // Keepalives, anyone? 241 pollKeepAlives(); 242 243 // Poll test driver for IO. 244 if (m_testDriver) 245 anyIO = getTestDriver()->poll(m_bufferOut) || anyIO; 246 247 // If no IO happens in a reasonable amount of time, go to sleep. 248 { 249 deUint64 curTime = deGetMicroseconds(); 250 if (anyIO) 251 lastIoTime = curTime; 252 else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000) 253 deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while. 254 else 255 deYield(); // Just give other threads chance to run. 256 } 257 } 258 } 259 260 void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, size_t dataSize) 261 { 262 switch (type) 263 { 264 case MESSAGETYPE_HELLO: 265 { 266 HelloMessage msg(data, dataSize); 267 DBG_PRINT(("HelloMessage: version = %d\n", msg.version)); 268 if (msg.version != PROTOCOL_VERSION) 269 throw ProtocolError("Unsupported protocol version"); 270 break; 271 } 272 273 case MESSAGETYPE_TEST: 274 { 275 TestMessage msg(data, dataSize); 276 DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str())); 277 break; 278 } 279 280 case MESSAGETYPE_KEEPALIVE: 281 { 282 KeepAliveMessage msg(data, dataSize); 283 DBG_PRINT(("KeepAliveMessage\n")); 284 keepAliveReceived(); 285 break; 286 } 287 288 case MESSAGETYPE_EXECUTE_BINARY: 289 { 290 ExecuteBinaryMessage msg(data, dataSize); 291 DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str())); 292 getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str()); 293 keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed. 294 break; 295 } 296 297 case MESSAGETYPE_STOP_EXECUTION: 298 { 299 StopExecutionMessage msg(data, dataSize); 300 DBG_PRINT(("StopExecutionMessage\n")); 301 getTestDriver()->stopProcess(); 302 break; 303 } 304 305 default: 306 throw ProtocolError("Unsupported message"); 307 } 308 } 309 310 void ExecutionRequestHandler::initKeepAlives (void) 311 { 312 deUint64 curTime = deGetMicroseconds(); 313 m_lastKeepAliveSent = curTime; 314 m_lastKeepAliveReceived = curTime; 315 } 316 317 void ExecutionRequestHandler::keepAliveReceived (void) 318 { 319 m_lastKeepAliveReceived = deGetMicroseconds(); 320 } 321 322 void ExecutionRequestHandler::pollKeepAlives (void) 323 { 324 deUint64 curTime = deGetMicroseconds(); 325 326 // Check that we've got keepalives in timely fashion. 327 if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000) 328 throw ProtocolError("Keepalive timeout occurred"); 329 330 // Send some? 331 if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 && 332 m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE) 333 { 334 vector<deUint8> buf; 335 KeepAliveMessage().write(buf); 336 m_bufferOut.pushFront(&buf[0], (int)buf.size()); 337 338 m_lastKeepAliveSent = deGetMicroseconds(); 339 } 340 } 341 342 bool ExecutionRequestHandler::receive (void) 343 { 344 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferIn.getNumFree()); 345 346 if (maxLen > 0) 347 { 348 size_t numRecv; 349 deSocketResult result = m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv); 350 351 if (result == DE_SOCKETRESULT_SUCCESS) 352 { 353 DE_ASSERT(numRecv > 0); 354 m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], (int)numRecv); 355 return true; 356 } 357 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 358 { 359 m_run = false; 360 return true; 361 } 362 else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 363 return false; 364 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 365 throw ConnectionError("Connection terminated"); 366 else 367 throw ConnectionError("receive() failed"); 368 } 369 else 370 return false; 371 } 372 373 bool ExecutionRequestHandler::send (void) 374 { 375 size_t maxLen = de::min(m_sendRecvTmpBuf.size(), (size_t)m_bufferOut.getNumElements()); 376 377 if (maxLen > 0) 378 { 379 m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], (int)maxLen); 380 381 size_t numSent; 382 deSocketResult result = m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent); 383 384 if (result == DE_SOCKETRESULT_SUCCESS) 385 { 386 DE_ASSERT(numSent > 0); 387 m_bufferOut.popBack((int)numSent); 388 return true; 389 } 390 else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 391 { 392 m_run = false; 393 return true; 394 } 395 else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 396 return false; 397 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 398 throw ConnectionError("Connection terminated"); 399 else 400 throw ConnectionError("send() failed"); 401 } 402 else 403 return false; 404 } 405 406 } // xs 407