1 /*------------------------------------------------------------------------- 2 * drawElements Quality Program Test Executor 3 * ------------------------------------------ 4 * 5 * Copyright 2014 The Android Open Source Project 6 * 7 * Licensed under the Apache License, Version 2.0 (the "License"); 8 * you may not use this file except in compliance with the License. 9 * You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 * 19 *//*! 20 * \file 21 * \brief Tcp/Ip communication link. 22 *//*--------------------------------------------------------------------*/ 23 24 #include "xeTcpIpLink.hpp" 25 #include "xsProtocol.hpp" 26 #include "deClock.h" 27 #include "deInt32.h" 28 29 namespace xe 30 { 31 32 enum 33 { 34 SEND_BUFFER_BLOCK_SIZE = 1024, 35 SEND_BUFFER_NUM_BLOCKS = 64 36 }; 37 38 // Utilities for writing messages out. 39 40 static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize) 41 { 42 deUint8 hdr[xs::MESSAGE_HEADER_SIZE]; 43 xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE); 44 dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]); 45 } 46 47 static void writeKeepalive (de::BlockBuffer<deUint8>& dst) 48 { 49 writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE); 50 dst.flush(); 51 } 52 53 static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList) 54 { 55 int nameSize = (int)strlen(name) + 1; 56 int paramsSize = (int)strlen(params) + 1; 57 int workDirSize = (int)strlen(workDir) + 1; 58 int caseListSize = (int)strlen(caseList) + 1; 59 int totalSize = xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize; 60 61 writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize); 62 dst.write(nameSize, (const deUint8*)name); 63 dst.write(paramsSize, (const deUint8*)params); 64 dst.write(workDirSize, (const deUint8*)workDir); 65 dst.write(caseListSize, (const deUint8*)caseList); 66 dst.flush(); 67 } 68 69 static void writeStopExecution (de::BlockBuffer<deUint8>& dst) 70 { 71 writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE); 72 dst.flush(); 73 } 74 75 // TcpIpLinkState 76 77 TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr) 78 : m_state (initialState) 79 , m_error (initialErr) 80 , m_lastKeepaliveReceived (0) 81 , m_stateChangedCallback (DE_NULL) 82 , m_testLogDataCallback (DE_NULL) 83 , m_infoLogDataCallback (DE_NULL) 84 , m_userPtr (DE_NULL) 85 { 86 } 87 88 TcpIpLinkState::~TcpIpLinkState (void) 89 { 90 } 91 92 CommLinkState TcpIpLinkState::getState (void) const 93 { 94 de::ScopedLock lock(m_lock); 95 96 return m_state; 97 } 98 99 CommLinkState TcpIpLinkState::getState (std::string& error) const 100 { 101 de::ScopedLock lock(m_lock); 102 103 error = m_error; 104 return m_state; 105 } 106 107 void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr) 108 { 109 de::ScopedLock lock(m_lock); 110 111 m_stateChangedCallback = stateChangedCallback; 112 m_testLogDataCallback = testLogDataCallback; 113 m_infoLogDataCallback = infoLogDataCallback; 114 m_userPtr = userPtr; 115 } 116 117 void TcpIpLinkState::setState (CommLinkState state, const char* error) 118 { 119 CommLink::StateChangedFunc callback = DE_NULL; 120 void* userPtr = DE_NULL; 121 122 { 123 de::ScopedLock lock(m_lock); 124 125 m_state = state; 126 m_error = error; 127 128 callback = m_stateChangedCallback; 129 userPtr = m_userPtr; 130 } 131 132 if (callback) 133 callback(userPtr, state, error); 134 } 135 136 void TcpIpLinkState::onTestLogData (const deUint8* bytes, int numBytes) const 137 { 138 CommLink::LogDataFunc callback = DE_NULL; 139 void* userPtr = DE_NULL; 140 141 m_lock.lock(); 142 callback = m_testLogDataCallback; 143 userPtr = m_userPtr; 144 m_lock.unlock(); 145 146 if (callback) 147 callback(userPtr, bytes, numBytes); 148 } 149 150 void TcpIpLinkState::onInfoLogData (const deUint8* bytes, int numBytes) const 151 { 152 CommLink::LogDataFunc callback = DE_NULL; 153 void* userPtr = DE_NULL; 154 155 m_lock.lock(); 156 callback = m_infoLogDataCallback; 157 userPtr = m_userPtr; 158 m_lock.unlock(); 159 160 if (callback) 161 callback(userPtr, bytes, numBytes); 162 } 163 164 void TcpIpLinkState::onKeepaliveReceived (void) 165 { 166 de::ScopedLock lock(m_lock); 167 m_lastKeepaliveReceived = deGetMicroseconds(); 168 } 169 170 deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const 171 { 172 de::ScopedLock lock(m_lock); 173 return m_lastKeepaliveReceived; 174 } 175 176 // TcpIpSendThread 177 178 TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state) 179 : m_socket (socket) 180 , m_state (state) 181 , m_buffer (SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS) 182 , m_isRunning (false) 183 { 184 } 185 186 TcpIpSendThread::~TcpIpSendThread (void) 187 { 188 } 189 190 void TcpIpSendThread::start (void) 191 { 192 DE_ASSERT(!m_isRunning); 193 194 // Reset state. 195 m_buffer.clear(); 196 m_isRunning = true; 197 198 de::Thread::start(); 199 } 200 201 void TcpIpSendThread::run (void) 202 { 203 try 204 { 205 deUint8 buf[SEND_BUFFER_BLOCK_SIZE]; 206 207 while (!m_buffer.isCanceled()) 208 { 209 int numToSend = 0; 210 int numSent = 0; 211 deSocketResult result = DE_SOCKETRESULT_LAST; 212 213 try 214 { 215 // Wait for single byte and then try to read more. 216 m_buffer.read(1, &buf[0]); 217 numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]); 218 } 219 catch (const de::BlockBuffer<deUint8>::CanceledException&) 220 { 221 // Handled in loop condition. 222 } 223 224 while (numSent < numToSend) 225 { 226 result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent); 227 228 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 229 XE_FAIL("Connection closed"); 230 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 231 XE_FAIL("Connection terminated"); 232 else if (result == DE_SOCKETRESULT_ERROR) 233 XE_FAIL("Socket error"); 234 else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 235 { 236 // \note Socket should not be in non-blocking mode. 237 DE_ASSERT(numSent <= 0); 238 deYield(); 239 } 240 else 241 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS); 242 } 243 } 244 } 245 catch (const std::exception& e) 246 { 247 m_state.setState(COMMLINKSTATE_ERROR, e.what()); 248 } 249 } 250 251 void TcpIpSendThread::stop (void) 252 { 253 if (m_isRunning) 254 { 255 m_buffer.cancel(); 256 join(); 257 m_isRunning = false; 258 } 259 } 260 261 // TcpIpRecvThread 262 263 TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state) 264 : m_socket (socket) 265 , m_state (state) 266 , m_curMsgPos (0) 267 , m_isRunning (false) 268 { 269 } 270 271 TcpIpRecvThread::~TcpIpRecvThread (void) 272 { 273 } 274 275 void TcpIpRecvThread::start (void) 276 { 277 DE_ASSERT(!m_isRunning); 278 279 // Reset state. 280 m_curMsgPos = 0; 281 m_isRunning = true; 282 283 de::Thread::start(); 284 } 285 286 void TcpIpRecvThread::run (void) 287 { 288 try 289 { 290 for (;;) 291 { 292 bool hasHeader = m_curMsgPos >= xs::MESSAGE_HEADER_SIZE; 293 bool hasPayload = false; 294 int messageSize = 0; 295 xs::MessageType messageType = (xs::MessageType)0; 296 297 if (hasHeader) 298 { 299 xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize); 300 hasPayload = m_curMsgPos >= messageSize; 301 } 302 303 if (hasPayload) 304 { 305 // Process message. 306 handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE); 307 m_curMsgPos = 0; 308 } 309 else 310 { 311 // Try to receive missing bytes. 312 int curSize = hasHeader ? messageSize : xs::MESSAGE_HEADER_SIZE; 313 int bytesToRecv = curSize-m_curMsgPos; 314 int numRecv = 0; 315 deSocketResult result = DE_SOCKETRESULT_LAST; 316 317 if ((int)m_curMsgBuf.size() < curSize) 318 m_curMsgBuf.resize(curSize); 319 320 result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv); 321 322 if (result == DE_SOCKETRESULT_CONNECTION_CLOSED) 323 XE_FAIL("Connection closed"); 324 else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED) 325 XE_FAIL("Connection terminated"); 326 else if (result == DE_SOCKETRESULT_ERROR) 327 XE_FAIL("Socket error"); 328 else if (result == DE_SOCKETRESULT_WOULD_BLOCK) 329 { 330 // \note Socket should not be in non-blocking mode. 331 DE_ASSERT(numRecv <= 0); 332 deYield(); 333 } 334 else 335 { 336 DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS); 337 DE_ASSERT(numRecv <= bytesToRecv); 338 m_curMsgPos += numRecv; 339 // Continue receiving bytes / handle message in next iter. 340 } 341 } 342 } 343 } 344 catch (const std::exception& e) 345 { 346 m_state.setState(COMMLINKSTATE_ERROR, e.what()); 347 } 348 } 349 350 void TcpIpRecvThread::stop (void) 351 { 352 if (m_isRunning) 353 { 354 // \note Socket must be closed before terminating receive thread. 355 XE_CHECK(!m_socket.isReceiveOpen()); 356 357 join(); 358 m_isRunning = false; 359 } 360 } 361 362 void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, int dataSize) 363 { 364 switch (messageType) 365 { 366 case xs::MESSAGETYPE_KEEPALIVE: 367 m_state.onKeepaliveReceived(); 368 break; 369 370 case xs::MESSAGETYPE_PROCESS_STARTED: 371 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message"); 372 m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING); 373 break; 374 375 case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED: 376 { 377 xs::ProcessLaunchFailedMessage msg(data, dataSize); 378 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message"); 379 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str()); 380 break; 381 } 382 383 case xs::MESSAGETYPE_PROCESS_FINISHED: 384 { 385 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message"); 386 xs::ProcessFinishedMessage msg(data, dataSize); 387 m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED); 388 DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code. 389 break; 390 } 391 392 case xs::MESSAGETYPE_PROCESS_LOG_DATA: 393 case xs::MESSAGETYPE_INFO: 394 // Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol. 395 if (data[dataSize-1] == 0) 396 dataSize -= 1; 397 398 if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA) 399 { 400 XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message"); 401 m_state.onTestLogData(&data[0], dataSize); 402 } 403 else 404 m_state.onInfoLogData(&data[0], dataSize); 405 break; 406 407 default: 408 XE_FAIL("Unknown message"); 409 } 410 } 411 412 // TcpIpLink 413 414 TcpIpLink::TcpIpLink (void) 415 : m_state (COMMLINKSTATE_ERROR, "Not connected") 416 , m_sendThread (m_socket, m_state) 417 , m_recvThread (m_socket, m_state) 418 , m_keepaliveTimer (DE_NULL) 419 { 420 m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this); 421 XE_CHECK(m_keepaliveTimer); 422 } 423 424 TcpIpLink::~TcpIpLink (void) 425 { 426 try 427 { 428 closeConnection(); 429 } 430 catch (...) 431 { 432 // Can't do much except to ignore error. 433 } 434 deTimer_destroy(m_keepaliveTimer); 435 } 436 437 void TcpIpLink::closeConnection (void) 438 { 439 { 440 deSocketState state = m_socket.getState(); 441 if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED) 442 m_socket.shutdown(); 443 } 444 445 if (deTimer_isActive(m_keepaliveTimer)) 446 deTimer_disable(m_keepaliveTimer); 447 448 if (m_sendThread.isRunning()) 449 m_sendThread.stop(); 450 451 if (m_recvThread.isRunning()) 452 m_recvThread.stop(); 453 454 if (m_socket.getState() != DE_SOCKETSTATE_CLOSED) 455 m_socket.close(); 456 } 457 458 void TcpIpLink::connect (const de::SocketAddress& address) 459 { 460 XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED); 461 XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR); 462 XE_CHECK(!m_sendThread.isRunning()); 463 XE_CHECK(!m_recvThread.isRunning()); 464 465 m_socket.connect(address); 466 467 try 468 { 469 // Clear error and set state to ready. 470 m_state.setState(COMMLINKSTATE_READY, ""); 471 m_state.onKeepaliveReceived(); 472 473 // Launch threads. 474 m_sendThread.start(); 475 m_recvThread.start(); 476 477 XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL)); 478 } 479 catch (const std::exception& e) 480 { 481 closeConnection(); 482 m_state.setState(COMMLINKSTATE_ERROR, e.what()); 483 } 484 } 485 486 void TcpIpLink::disconnect (void) 487 { 488 try 489 { 490 closeConnection(); 491 m_state.setState(COMMLINKSTATE_ERROR, "Not connected"); 492 } 493 catch (const std::exception& e) 494 { 495 m_state.setState(COMMLINKSTATE_ERROR, e.what()); 496 } 497 } 498 499 void TcpIpLink::reset (void) 500 { 501 // \note Just clears error state if we are connected. 502 if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED) 503 { 504 m_state.setState(COMMLINKSTATE_READY, ""); 505 506 // \todo [2012-07-10 pyry] Do we need to reset send/receive buffers? 507 } 508 else 509 disconnect(); // Abnormal state/usage. Disconnect socket. 510 } 511 512 void TcpIpLink::keepaliveTimerCallback (void* ptr) 513 { 514 TcpIpLink* link = static_cast<TcpIpLink*>(ptr); 515 deUint64 lastKeepalive = link->m_state.getLastKeepaliveRecevied(); 516 deUint64 curTime = deGetMicroseconds(); 517 518 // Check for timeout. 519 if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000) 520 link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout"); 521 522 // Enqueue new keepalive. 523 try 524 { 525 writeKeepalive(link->m_sendThread.getBuffer()); 526 } 527 catch (const de::BlockBuffer<deUint8>::CanceledException&) 528 { 529 // Ignore. Can happen in connection teardown. 530 } 531 } 532 533 CommLinkState TcpIpLink::getState (void) const 534 { 535 return m_state.getState(); 536 } 537 538 CommLinkState TcpIpLink::getState (std::string& message) const 539 { 540 return m_state.getState(message); 541 } 542 543 void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr) 544 { 545 m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr); 546 } 547 548 void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList) 549 { 550 XE_CHECK(m_state.getState() == COMMLINKSTATE_READY); 551 552 m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING); 553 writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList); 554 } 555 556 void TcpIpLink::stopTestProcess (void) 557 { 558 XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR); 559 writeStopExecution(m_sendThread.getBuffer()); 560 } 561 562 } // xe 563