Home | History | Annotate | Download | only in executor
      1 /*-------------------------------------------------------------------------
      2  * drawElements Quality Program Test Executor
      3  * ------------------------------------------
      4  *
      5  * Copyright 2014 The Android Open Source Project
      6  *
      7  * Licensed under the Apache License, Version 2.0 (the "License");
      8  * you may not use this file except in compliance with the License.
      9  * You may obtain a copy of the License at
     10  *
     11  *      http://www.apache.org/licenses/LICENSE-2.0
     12  *
     13  * Unless required by applicable law or agreed to in writing, software
     14  * distributed under the License is distributed on an "AS IS" BASIS,
     15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     16  * See the License for the specific language governing permissions and
     17  * limitations under the License.
     18  *
     19  *//*!
     20  * \file
     21  * \brief Tcp/Ip communication link.
     22  *//*--------------------------------------------------------------------*/
     23 
     24 #include "xeTcpIpLink.hpp"
     25 #include "xsProtocol.hpp"
     26 #include "deClock.h"
     27 #include "deInt32.h"
     28 
     29 namespace xe
     30 {
     31 
     32 enum
     33 {
     34 	SEND_BUFFER_BLOCK_SIZE		= 1024,
     35 	SEND_BUFFER_NUM_BLOCKS		= 64
     36 };
     37 
     38 // Utilities for writing messages out.
     39 
     40 static void writeMessageHeader (de::BlockBuffer<deUint8>& dst, xs::MessageType type, int messageSize)
     41 {
     42 	deUint8 hdr[xs::MESSAGE_HEADER_SIZE];
     43 	xs::Message::writeHeader(type, messageSize, &hdr[0], xs::MESSAGE_HEADER_SIZE);
     44 	dst.write(xs::MESSAGE_HEADER_SIZE, &hdr[0]);
     45 }
     46 
     47 static void writeKeepalive (de::BlockBuffer<deUint8>& dst)
     48 {
     49 	writeMessageHeader(dst, xs::MESSAGETYPE_KEEPALIVE, xs::MESSAGE_HEADER_SIZE);
     50 	dst.flush();
     51 }
     52 
     53 static void writeExecuteBinary (de::BlockBuffer<deUint8>& dst, const char* name, const char* params, const char* workDir, const char* caseList)
     54 {
     55 	int		nameSize			= (int)strlen(name)		+ 1;
     56 	int		paramsSize			= (int)strlen(params)	+ 1;
     57 	int		workDirSize			= (int)strlen(workDir)	+ 1;
     58 	int		caseListSize		= (int)strlen(caseList)	+ 1;
     59 	int		totalSize			= xs::MESSAGE_HEADER_SIZE + nameSize + paramsSize + workDirSize + caseListSize;
     60 
     61 	writeMessageHeader(dst, xs::MESSAGETYPE_EXECUTE_BINARY, totalSize);
     62 	dst.write(nameSize,		(const deUint8*)name);
     63 	dst.write(paramsSize,	(const deUint8*)params);
     64 	dst.write(workDirSize,	(const deUint8*)workDir);
     65 	dst.write(caseListSize,	(const deUint8*)caseList);
     66 	dst.flush();
     67 }
     68 
     69 static void writeStopExecution (de::BlockBuffer<deUint8>& dst)
     70 {
     71 	writeMessageHeader(dst, xs::MESSAGETYPE_STOP_EXECUTION, xs::MESSAGE_HEADER_SIZE);
     72 	dst.flush();
     73 }
     74 
     75 // TcpIpLinkState
     76 
     77 TcpIpLinkState::TcpIpLinkState (CommLinkState initialState, const char* initialErr)
     78 	: m_state					(initialState)
     79 	, m_error					(initialErr)
     80 	, m_lastKeepaliveReceived	(0)
     81 	, m_stateChangedCallback	(DE_NULL)
     82 	, m_testLogDataCallback		(DE_NULL)
     83 	, m_infoLogDataCallback		(DE_NULL)
     84 	, m_userPtr					(DE_NULL)
     85 {
     86 }
     87 
     88 TcpIpLinkState::~TcpIpLinkState (void)
     89 {
     90 }
     91 
     92 CommLinkState TcpIpLinkState::getState (void) const
     93 {
     94 	de::ScopedLock lock(m_lock);
     95 
     96 	return m_state;
     97 }
     98 
     99 CommLinkState TcpIpLinkState::getState (std::string& error) const
    100 {
    101 	de::ScopedLock lock(m_lock);
    102 
    103 	error = m_error;
    104 	return m_state;
    105 }
    106 
    107 void TcpIpLinkState::setCallbacks (CommLink::StateChangedFunc stateChangedCallback, CommLink::LogDataFunc testLogDataCallback, CommLink::LogDataFunc infoLogDataCallback, void* userPtr)
    108 {
    109 	de::ScopedLock lock(m_lock);
    110 
    111 	m_stateChangedCallback		= stateChangedCallback;
    112 	m_testLogDataCallback		= testLogDataCallback;
    113 	m_infoLogDataCallback		= infoLogDataCallback;
    114 	m_userPtr					= userPtr;
    115 }
    116 
    117 void TcpIpLinkState::setState (CommLinkState state, const char* error)
    118 {
    119 	CommLink::StateChangedFunc	callback	= DE_NULL;
    120 	void*						userPtr		= DE_NULL;
    121 
    122 	{
    123 		de::ScopedLock lock(m_lock);
    124 
    125 		m_state = state;
    126 		m_error	= error;
    127 
    128 		callback	= m_stateChangedCallback;
    129 		userPtr		= m_userPtr;
    130 	}
    131 
    132 	if (callback)
    133 		callback(userPtr, state, error);
    134 }
    135 
    136 void TcpIpLinkState::onTestLogData (const deUint8* bytes, size_t numBytes) const
    137 {
    138 	CommLink::LogDataFunc	callback	= DE_NULL;
    139 	void*					userPtr		= DE_NULL;
    140 
    141 	m_lock.lock();
    142 	callback	= m_testLogDataCallback;
    143 	userPtr		= m_userPtr;
    144 	m_lock.unlock();
    145 
    146 	if (callback)
    147 		callback(userPtr, bytes, numBytes);
    148 }
    149 
    150 void TcpIpLinkState::onInfoLogData (const deUint8* bytes, size_t numBytes) const
    151 {
    152 	CommLink::LogDataFunc	callback	= DE_NULL;
    153 	void*					userPtr		= DE_NULL;
    154 
    155 	m_lock.lock();
    156 	callback	= m_infoLogDataCallback;
    157 	userPtr		= m_userPtr;
    158 	m_lock.unlock();
    159 
    160 	if (callback)
    161 		callback(userPtr, bytes, numBytes);
    162 }
    163 
    164 void TcpIpLinkState::onKeepaliveReceived (void)
    165 {
    166 	de::ScopedLock lock(m_lock);
    167 	m_lastKeepaliveReceived = deGetMicroseconds();
    168 }
    169 
    170 deUint64 TcpIpLinkState::getLastKeepaliveRecevied (void) const
    171 {
    172 	de::ScopedLock lock(m_lock);
    173 	return m_lastKeepaliveReceived;
    174 }
    175 
    176 // TcpIpSendThread
    177 
    178 TcpIpSendThread::TcpIpSendThread (de::Socket& socket, TcpIpLinkState& state)
    179 	: m_socket		(socket)
    180 	, m_state		(state)
    181 	, m_buffer		(SEND_BUFFER_BLOCK_SIZE, SEND_BUFFER_NUM_BLOCKS)
    182 	, m_isRunning	(false)
    183 {
    184 }
    185 
    186 TcpIpSendThread::~TcpIpSendThread (void)
    187 {
    188 }
    189 
    190 void TcpIpSendThread::start (void)
    191 {
    192 	DE_ASSERT(!m_isRunning);
    193 
    194 	// Reset state.
    195 	m_buffer.clear();
    196 	m_isRunning = true;
    197 
    198 	de::Thread::start();
    199 }
    200 
    201 void TcpIpSendThread::run (void)
    202 {
    203 	try
    204 	{
    205 		deUint8 buf[SEND_BUFFER_BLOCK_SIZE];
    206 
    207 		while (!m_buffer.isCanceled())
    208 		{
    209 			size_t			numToSend	= 0;
    210 			size_t			numSent		= 0;
    211 			deSocketResult	result		= DE_SOCKETRESULT_LAST;
    212 
    213 			try
    214 			{
    215 				// Wait for single byte and then try to read more.
    216 				m_buffer.read(1, &buf[0]);
    217 				numToSend = 1 + m_buffer.tryRead(DE_LENGTH_OF_ARRAY(buf)-1, &buf[1]);
    218 			}
    219 			catch (const de::BlockBuffer<deUint8>::CanceledException&)
    220 			{
    221 				// Handled in loop condition.
    222 			}
    223 
    224 			while (numSent < numToSend)
    225 			{
    226 				result = m_socket.send(&buf[numSent], numToSend-numSent, &numSent);
    227 
    228 				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
    229 					XE_FAIL("Connection closed");
    230 				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
    231 					XE_FAIL("Connection terminated");
    232 				else if (result == DE_SOCKETRESULT_ERROR)
    233 					XE_FAIL("Socket error");
    234 				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
    235 				{
    236 					// \note Socket should not be in non-blocking mode.
    237 					DE_ASSERT(numSent == 0);
    238 					deYield();
    239 				}
    240 				else
    241 					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
    242 			}
    243 		}
    244 	}
    245 	catch (const std::exception& e)
    246 	{
    247 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
    248 	}
    249 }
    250 
    251 void TcpIpSendThread::stop (void)
    252 {
    253 	if (m_isRunning)
    254 	{
    255 		m_buffer.cancel();
    256 		join();
    257 		m_isRunning = false;
    258 	}
    259 }
    260 
    261 // TcpIpRecvThread
    262 
    263 TcpIpRecvThread::TcpIpRecvThread (de::Socket& socket, TcpIpLinkState& state)
    264 	: m_socket		(socket)
    265 	, m_state		(state)
    266 	, m_curMsgPos	(0)
    267 	, m_isRunning	(false)
    268 {
    269 }
    270 
    271 TcpIpRecvThread::~TcpIpRecvThread (void)
    272 {
    273 }
    274 
    275 void TcpIpRecvThread::start (void)
    276 {
    277 	DE_ASSERT(!m_isRunning);
    278 
    279 	// Reset state.
    280 	m_curMsgPos = 0;
    281 	m_isRunning = true;
    282 
    283 	de::Thread::start();
    284 }
    285 
    286 void TcpIpRecvThread::run (void)
    287 {
    288 	try
    289 	{
    290 		for (;;)
    291 		{
    292 			bool				hasHeader		= m_curMsgPos >= xs::MESSAGE_HEADER_SIZE;
    293 			bool				hasPayload		= false;
    294 			size_t				messageSize		= 0;
    295 			xs::MessageType		messageType		= (xs::MessageType)0;
    296 
    297 			if (hasHeader)
    298 			{
    299 				xs::Message::parseHeader(&m_curMsgBuf[0], xs::MESSAGE_HEADER_SIZE, messageType, messageSize);
    300 				hasPayload = m_curMsgPos >= messageSize;
    301 			}
    302 
    303 			if (hasPayload)
    304 			{
    305 				// Process message.
    306 				handleMessage(messageType, m_curMsgPos > xs::MESSAGE_HEADER_SIZE ? &m_curMsgBuf[xs::MESSAGE_HEADER_SIZE] : DE_NULL, messageSize-xs::MESSAGE_HEADER_SIZE);
    307 				m_curMsgPos = 0;
    308 			}
    309 			else
    310 			{
    311 				// Try to receive missing bytes.
    312 				size_t				curSize			= hasHeader ? messageSize : (size_t)xs::MESSAGE_HEADER_SIZE;
    313 				size_t				bytesToRecv		= curSize-m_curMsgPos;
    314 				size_t				numRecv			= 0;
    315 				deSocketResult		result			= DE_SOCKETRESULT_LAST;
    316 
    317 				if (m_curMsgBuf.size() < curSize)
    318 					m_curMsgBuf.resize(curSize);
    319 
    320 				result = m_socket.receive(&m_curMsgBuf[m_curMsgPos], bytesToRecv, &numRecv);
    321 
    322 				if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
    323 					XE_FAIL("Connection closed");
    324 				else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
    325 					XE_FAIL("Connection terminated");
    326 				else if (result == DE_SOCKETRESULT_ERROR)
    327 					XE_FAIL("Socket error");
    328 				else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
    329 				{
    330 					// \note Socket should not be in non-blocking mode.
    331 					DE_ASSERT(numRecv == 0);
    332 					deYield();
    333 				}
    334 				else
    335 				{
    336 					DE_ASSERT(result == DE_SOCKETRESULT_SUCCESS);
    337 					DE_ASSERT(numRecv <= bytesToRecv);
    338 					m_curMsgPos += numRecv;
    339 					// Continue receiving bytes / handle message in next iter.
    340 				}
    341 			}
    342 		}
    343 	}
    344 	catch (const std::exception& e)
    345 	{
    346 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
    347 	}
    348 }
    349 
    350 void TcpIpRecvThread::stop (void)
    351 {
    352 	if (m_isRunning)
    353 	{
    354 		// \note Socket must be closed before terminating receive thread.
    355 		XE_CHECK(!m_socket.isReceiveOpen());
    356 
    357 		join();
    358 		m_isRunning = false;
    359 	}
    360 }
    361 
    362 void TcpIpRecvThread::handleMessage (xs::MessageType messageType, const deUint8* data, size_t dataSize)
    363 {
    364 	switch (messageType)
    365 	{
    366 		case xs::MESSAGETYPE_KEEPALIVE:
    367 			m_state.onKeepaliveReceived();
    368 			break;
    369 
    370 		case xs::MESSAGETYPE_PROCESS_STARTED:
    371 			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_STARTED message");
    372 			m_state.setState(COMMLINKSTATE_TEST_PROCESS_RUNNING);
    373 			break;
    374 
    375 		case xs::MESSAGETYPE_PROCESS_LAUNCH_FAILED:
    376 		{
    377 			xs::ProcessLaunchFailedMessage msg(data, dataSize);
    378 			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_LAUNCHING, "Unexpected PROCESS_LAUNCH_FAILED message");
    379 			m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCH_FAILED, msg.reason.c_str());
    380 			break;
    381 		}
    382 
    383 		case xs::MESSAGETYPE_PROCESS_FINISHED:
    384 		{
    385 			XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_FINISHED message");
    386 			xs::ProcessFinishedMessage msg(data, dataSize);
    387 			m_state.setState(COMMLINKSTATE_TEST_PROCESS_FINISHED);
    388 			DE_UNREF(msg); // \todo [2012-06-19 pyry] Report exit code.
    389 			break;
    390 		}
    391 
    392 		case xs::MESSAGETYPE_PROCESS_LOG_DATA:
    393 		case xs::MESSAGETYPE_INFO:
    394 			// Ignore leading \0 if such is present. \todo [2012-06-19 pyry] Improve protocol.
    395 			if (data[dataSize-1] == 0)
    396 				dataSize -= 1;
    397 
    398 			if (messageType == xs::MESSAGETYPE_PROCESS_LOG_DATA)
    399 			{
    400 				XE_CHECK_MSG(m_state.getState() == COMMLINKSTATE_TEST_PROCESS_RUNNING, "Unexpected PROCESS_LOG_DATA message");
    401 				m_state.onTestLogData(&data[0], dataSize);
    402 			}
    403 			else
    404 				m_state.onInfoLogData(&data[0], dataSize);
    405 			break;
    406 
    407 		default:
    408 			XE_FAIL("Unknown message");
    409 	}
    410 }
    411 
    412 // TcpIpLink
    413 
    414 TcpIpLink::TcpIpLink (void)
    415 	: m_state			(COMMLINKSTATE_ERROR, "Not connected")
    416 	, m_sendThread		(m_socket, m_state)
    417 	, m_recvThread		(m_socket, m_state)
    418 	, m_keepaliveTimer	(DE_NULL)
    419 {
    420 	m_keepaliveTimer = deTimer_create(keepaliveTimerCallback, this);
    421 	XE_CHECK(m_keepaliveTimer);
    422 }
    423 
    424 TcpIpLink::~TcpIpLink (void)
    425 {
    426 	try
    427 	{
    428 		closeConnection();
    429 	}
    430 	catch (...)
    431 	{
    432 		// Can't do much except to ignore error.
    433 	}
    434 	deTimer_destroy(m_keepaliveTimer);
    435 }
    436 
    437 void TcpIpLink::closeConnection (void)
    438 {
    439 	{
    440 		deSocketState state = m_socket.getState();
    441 		if (state != DE_SOCKETSTATE_DISCONNECTED && state != DE_SOCKETSTATE_CLOSED)
    442 			m_socket.shutdown();
    443 	}
    444 
    445 	if (deTimer_isActive(m_keepaliveTimer))
    446 		deTimer_disable(m_keepaliveTimer);
    447 
    448 	if (m_sendThread.isRunning())
    449 		m_sendThread.stop();
    450 
    451 	if (m_recvThread.isRunning())
    452 		m_recvThread.stop();
    453 
    454 	if (m_socket.getState() != DE_SOCKETSTATE_CLOSED)
    455 		m_socket.close();
    456 }
    457 
    458 void TcpIpLink::connect (const de::SocketAddress& address)
    459 {
    460 	XE_CHECK(m_socket.getState() == DE_SOCKETSTATE_CLOSED);
    461 	XE_CHECK(m_state.getState() == COMMLINKSTATE_ERROR);
    462 	XE_CHECK(!m_sendThread.isRunning());
    463 	XE_CHECK(!m_recvThread.isRunning());
    464 
    465 	m_socket.connect(address);
    466 
    467 	try
    468 	{
    469 		// Clear error and set state to ready.
    470 		m_state.setState(COMMLINKSTATE_READY, "");
    471 		m_state.onKeepaliveReceived();
    472 
    473 		// Launch threads.
    474 		m_sendThread.start();
    475 		m_recvThread.start();
    476 
    477 		XE_CHECK(deTimer_scheduleInterval(m_keepaliveTimer, xs::KEEPALIVE_SEND_INTERVAL));
    478 	}
    479 	catch (const std::exception& e)
    480 	{
    481 		closeConnection();
    482 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
    483 		throw;
    484 	}
    485 }
    486 
    487 void TcpIpLink::disconnect (void)
    488 {
    489 	try
    490 	{
    491 		closeConnection();
    492 		m_state.setState(COMMLINKSTATE_ERROR, "Not connected");
    493 	}
    494 	catch (const std::exception& e)
    495 	{
    496 		m_state.setState(COMMLINKSTATE_ERROR, e.what());
    497 	}
    498 }
    499 
    500 void TcpIpLink::reset (void)
    501 {
    502 	// \note Just clears error state if we are connected.
    503 	if (m_socket.getState() == DE_SOCKETSTATE_CONNECTED)
    504 	{
    505 		m_state.setState(COMMLINKSTATE_READY, "");
    506 
    507 		// \todo [2012-07-10 pyry] Do we need to reset send/receive buffers?
    508 	}
    509 	else
    510 		disconnect(); // Abnormal state/usage. Disconnect socket.
    511 }
    512 
    513 void TcpIpLink::keepaliveTimerCallback (void* ptr)
    514 {
    515 	TcpIpLink*	link			= static_cast<TcpIpLink*>(ptr);
    516 	deUint64	lastKeepalive	= link->m_state.getLastKeepaliveRecevied();
    517 	deUint64	curTime			= deGetMicroseconds();
    518 
    519 	// Check for timeout.
    520 	if ((deInt64)curTime-(deInt64)lastKeepalive > xs::KEEPALIVE_TIMEOUT*1000)
    521 		link->m_state.setState(COMMLINKSTATE_ERROR, "Keepalive timeout");
    522 
    523 	// Enqueue new keepalive.
    524 	try
    525 	{
    526 		writeKeepalive(link->m_sendThread.getBuffer());
    527 	}
    528 	catch (const de::BlockBuffer<deUint8>::CanceledException&)
    529 	{
    530 		// Ignore. Can happen in connection teardown.
    531 	}
    532 }
    533 
    534 CommLinkState TcpIpLink::getState (void) const
    535 {
    536 	return m_state.getState();
    537 }
    538 
    539 CommLinkState TcpIpLink::getState (std::string& message) const
    540 {
    541 	return m_state.getState(message);
    542 }
    543 
    544 void TcpIpLink::setCallbacks (StateChangedFunc stateChangedCallback, LogDataFunc testLogDataCallback, LogDataFunc infoLogDataCallback, void* userPtr)
    545 {
    546 	m_state.setCallbacks(stateChangedCallback, testLogDataCallback, infoLogDataCallback, userPtr);
    547 }
    548 
    549 void TcpIpLink::startTestProcess (const char* name, const char* params, const char* workingDir, const char* caseList)
    550 {
    551 	XE_CHECK(m_state.getState() == COMMLINKSTATE_READY);
    552 
    553 	m_state.setState(COMMLINKSTATE_TEST_PROCESS_LAUNCHING);
    554 	writeExecuteBinary(m_sendThread.getBuffer(), name, params, workingDir, caseList);
    555 }
    556 
    557 void TcpIpLink::stopTestProcess (void)
    558 {
    559 	XE_CHECK(m_state.getState() != COMMLINKSTATE_ERROR);
    560 	writeStopExecution(m_sendThread.getBuffer());
    561 }
    562 
    563 } // xe
    564