Home | History | Annotate | Download | only in execserver
      1 /*-------------------------------------------------------------------------
      2  * drawElements Quality Program Execution Server
      3  * ---------------------------------------------
      4  *
      5  * Copyright 2014 The Android Open Source Project
      6  *
      7  * Licensed under the Apache License, Version 2.0 (the "License");
      8  * you may not use this file except in compliance with the License.
      9  * You may obtain a copy of the License at
     10  *
     11  *      http://www.apache.org/licenses/LICENSE-2.0
     12  *
     13  * Unless required by applicable law or agreed to in writing, software
     14  * distributed under the License is distributed on an "AS IS" BASIS,
     15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     16  * See the License for the specific language governing permissions and
     17  * limitations under the License.
     18  *
     19  *//*!
     20  * \file
     21  * \brief Test Execution Server.
     22  *//*--------------------------------------------------------------------*/
     23 
     24 #include "xsExecutionServer.hpp"
     25 #include "deClock.h"
     26 
     27 #include <cstdio>
     28 
     29 using std::vector;
     30 using std::string;
     31 
     32 #if 1
     33 #	define DBG_PRINT(X) printf X
     34 #else
     35 #	define DBG_PRINT(X)
     36 #endif
     37 
     38 namespace xs
     39 {
     40 
     41 inline bool MessageBuilder::isComplete (void) const
     42 {
     43 	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
     44 		return false;
     45 	else
     46 		return (int)m_buffer.size() == getMessageSize();
     47 }
     48 
     49 const deUint8* MessageBuilder::getMessageData (void) const
     50 {
     51 	return m_buffer.size() > MESSAGE_HEADER_SIZE ? &m_buffer[MESSAGE_HEADER_SIZE] : DE_NULL;
     52 }
     53 
     54 int MessageBuilder::getMessageDataSize (void) const
     55 {
     56 	DE_ASSERT(isComplete());
     57 	return (int)m_buffer.size() - MESSAGE_HEADER_SIZE;
     58 }
     59 
     60 void MessageBuilder::read (ByteBuffer& src)
     61 {
     62 	// Try to get header.
     63 	if (m_buffer.size() < MESSAGE_HEADER_SIZE)
     64 	{
     65 		while (m_buffer.size() < MESSAGE_HEADER_SIZE &&
     66 			   src.getNumElements() > 0)
     67 			m_buffer.push_back(src.popBack());
     68 
     69 		DE_ASSERT(m_buffer.size() <= MESSAGE_HEADER_SIZE);
     70 
     71 		if (m_buffer.size() == MESSAGE_HEADER_SIZE)
     72 		{
     73 			// Got whole header, parse it.
     74 			Message::parseHeader(&m_buffer[0], (int)m_buffer.size(), m_messageType, m_messageSize);
     75 		}
     76 	}
     77 
     78 	if (m_buffer.size() >= MESSAGE_HEADER_SIZE)
     79 	{
     80 		// We have header.
     81 		int msgSize			= getMessageSize();
     82 		int numBytesLeft	= msgSize - (int)m_buffer.size();
     83 		int	numToRead		= de::min(src.getNumElements(), numBytesLeft);
     84 
     85 		if (numToRead > 0)
     86 		{
     87 			int curBufPos = (int)m_buffer.size();
     88 			m_buffer.resize(curBufPos+numToRead);
     89 			src.popBack(&m_buffer[curBufPos], numToRead);
     90 		}
     91 	}
     92 }
     93 
     94 void MessageBuilder::clear (void)
     95 {
     96 	m_buffer.clear();
     97 	m_messageType	= MESSAGETYPE_NONE;
     98 	m_messageSize	= 0;
     99 }
    100 
    101 ExecutionServer::ExecutionServer (xs::TestProcess* testProcess, deSocketFamily family, int port, RunMode runMode)
    102 	: TcpServer		(family, port)
    103 	, m_testDriver	(testProcess)
    104 	, m_runMode		(runMode)
    105 {
    106 }
    107 
    108 ExecutionServer::~ExecutionServer (void)
    109 {
    110 }
    111 
    112 TestDriver* ExecutionServer::acquireTestDriver (void)
    113 {
    114 	if (!m_testDriverLock.tryLock())
    115 		throw Error("Failed to acquire test driver");
    116 
    117 	return &m_testDriver;
    118 }
    119 
    120 void ExecutionServer::releaseTestDriver (TestDriver* driver)
    121 {
    122 	DE_ASSERT(&m_testDriver == driver);
    123 	DE_UNREF(driver);
    124 	m_testDriverLock.unlock();
    125 }
    126 
    127 ConnectionHandler* ExecutionServer::createHandler (de::Socket* socket, const de::SocketAddress& clientAddress)
    128 {
    129 	printf("ExecutionServer: New connection from %s:%d\n", clientAddress.getHost(), clientAddress.getPort());
    130 	return new ExecutionRequestHandler(this, socket);
    131 }
    132 
    133 void ExecutionServer::connectionDone (ConnectionHandler* handler)
    134 {
    135 	if (m_runMode == RUNMODE_SINGLE_EXEC)
    136 		m_socket.close();
    137 
    138 	TcpServer::connectionDone(handler);
    139 }
    140 
    141 ExecutionRequestHandler::ExecutionRequestHandler (ExecutionServer* server, de::Socket* socket)
    142 	: ConnectionHandler	(server, socket)
    143 	, m_execServer		(server)
    144 	, m_testDriver		(DE_NULL)
    145 	, m_bufferIn		(RECV_BUFFER_SIZE)
    146 	, m_bufferOut		(SEND_BUFFER_SIZE)
    147 	, m_run				(false)
    148 	, m_sendRecvTmpBuf	(SEND_RECV_TMP_BUFFER_SIZE)
    149 {
    150 	// Set flags.
    151 	m_socket->setFlags(DE_SOCKET_NONBLOCKING|DE_SOCKET_KEEPALIVE|DE_SOCKET_CLOSE_ON_EXEC);
    152 
    153 	// Init protocol keepalives.
    154 	initKeepAlives();
    155 }
    156 
    157 ExecutionRequestHandler::~ExecutionRequestHandler (void)
    158 {
    159 	if (m_testDriver)
    160 		m_execServer->releaseTestDriver(m_testDriver);
    161 }
    162 
    163 void ExecutionRequestHandler::handle (void)
    164 {
    165 	DBG_PRINT(("ExecutionRequestHandler::handle()\n"));
    166 
    167 	try
    168 	{
    169 		// Process execution session.
    170 		processSession();
    171 	}
    172 	catch (const std::exception& e)
    173 	{
    174 		printf("ExecutionRequestHandler::run(): %s\n", e.what());
    175 	}
    176 
    177 	DBG_PRINT(("ExecutionRequestHandler::handle(): Done!\n"));
    178 
    179 	// Release test driver.
    180 	if (m_testDriver)
    181 	{
    182 		try
    183 		{
    184 			m_testDriver->reset();
    185 		}
    186 		catch (...)
    187 		{
    188 		}
    189 		m_execServer->releaseTestDriver(m_testDriver);
    190 		m_testDriver = DE_NULL;
    191 	}
    192 
    193 	// Close connection.
    194 	if (m_socket->isConnected())
    195 		m_socket->shutdown();
    196 }
    197 
    198 void ExecutionRequestHandler::acquireTestDriver (void)
    199 {
    200 	DE_ASSERT(!m_testDriver);
    201 
    202 	// Try to acquire test driver - may fail.
    203 	m_testDriver = m_execServer->acquireTestDriver();
    204 	DE_ASSERT(m_testDriver);
    205 	m_testDriver->reset();
    206 
    207 }
    208 
    209 void ExecutionRequestHandler::processSession (void)
    210 {
    211 	m_run = true;
    212 
    213 	deUint64 lastIoTime = deGetMicroseconds();
    214 
    215 	while (m_run)
    216 	{
    217 		bool anyIO = false;
    218 
    219 		// Read from socket to buffer.
    220 		anyIO = receive() || anyIO;
    221 
    222 		// Send bytes in buffer.
    223 		anyIO = send() || anyIO;
    224 
    225 		// Process incoming data.
    226 		if (m_bufferIn.getNumElements() > 0)
    227 		{
    228 			DE_ASSERT(!m_msgBuilder.isComplete());
    229 			m_msgBuilder.read(m_bufferIn);
    230 		}
    231 
    232 		if (m_msgBuilder.isComplete())
    233 		{
    234 			// Process message.
    235 			processMessage(m_msgBuilder.getMessageType(), m_msgBuilder.getMessageData(), m_msgBuilder.getMessageDataSize());
    236 
    237 			m_msgBuilder.clear();
    238 		}
    239 
    240 		// Keepalives, anyone?
    241 		pollKeepAlives();
    242 
    243 		// Poll test driver for IO.
    244 		if (m_testDriver)
    245 			anyIO = getTestDriver()->poll(m_bufferOut) || anyIO;
    246 
    247 		// If no IO happens in a reasonable amount of time, go to sleep.
    248 		{
    249 			deUint64 curTime = deGetMicroseconds();
    250 			if (anyIO)
    251 				lastIoTime = curTime;
    252 			else if (curTime-lastIoTime > SERVER_IDLE_THRESHOLD*1000)
    253 				deSleep(SERVER_IDLE_SLEEP); // Too long since last IO, sleep for a while.
    254 			else
    255 				deYield(); // Just give other threads chance to run.
    256 		}
    257 	}
    258 }
    259 
    260 void ExecutionRequestHandler::processMessage (MessageType type, const deUint8* data, int dataSize)
    261 {
    262 	switch (type)
    263 	{
    264 		case MESSAGETYPE_HELLO:
    265 		{
    266 			HelloMessage msg(data, dataSize);
    267 			DBG_PRINT(("HelloMessage: version = %d\n", msg.version));
    268 			if (msg.version != PROTOCOL_VERSION)
    269 				throw ProtocolError("Unsupported protocol version");
    270 			break;
    271 		}
    272 
    273 		case MESSAGETYPE_TEST:
    274 		{
    275 			TestMessage msg(data, dataSize);
    276 			DBG_PRINT(("TestMessage: '%s'\n", msg.test.c_str()));
    277 			break;
    278 		}
    279 
    280 		case MESSAGETYPE_KEEPALIVE:
    281 		{
    282 			KeepAliveMessage msg(data, dataSize);
    283 			DBG_PRINT(("KeepAliveMessage\n"));
    284 			keepAliveReceived();
    285 			break;
    286 		}
    287 
    288 		case MESSAGETYPE_EXECUTE_BINARY:
    289 		{
    290 			ExecuteBinaryMessage msg(data, dataSize);
    291 			DBG_PRINT(("ExecuteBinaryMessage: '%s', '%s', '%s', '%s'\n", msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.substr(0, 10).c_str()));
    292 			getTestDriver()->startProcess(msg.name.c_str(), msg.params.c_str(), msg.workDir.c_str(), msg.caseList.c_str());
    293 			keepAliveReceived(); // \todo [2011-10-11 pyry] Remove this once Candy is fixed.
    294 			break;
    295 		}
    296 
    297 		case MESSAGETYPE_STOP_EXECUTION:
    298 		{
    299 			StopExecutionMessage msg(data, dataSize);
    300 			DBG_PRINT(("StopExecutionMessage\n"));
    301 			getTestDriver()->stopProcess();
    302 			break;
    303 		}
    304 
    305 		default:
    306 			throw ProtocolError("Unsupported message");
    307 	}
    308 }
    309 
    310 void ExecutionRequestHandler::initKeepAlives (void)
    311 {
    312 	deUint64 curTime = deGetMicroseconds();
    313 	m_lastKeepAliveSent		= curTime;
    314 	m_lastKeepAliveReceived	= curTime;
    315 }
    316 
    317 void ExecutionRequestHandler::keepAliveReceived (void)
    318 {
    319 	m_lastKeepAliveReceived = deGetMicroseconds();
    320 }
    321 
    322 void ExecutionRequestHandler::pollKeepAlives (void)
    323 {
    324 	deUint64 curTime = deGetMicroseconds();
    325 
    326 	// Check that we've got keepalives in timely fashion.
    327 	if (curTime - m_lastKeepAliveReceived > KEEPALIVE_TIMEOUT*1000)
    328 		throw ProtocolError("Keepalive timeout occurred");
    329 
    330 	// Send some?
    331 	if (curTime - m_lastKeepAliveSent > KEEPALIVE_SEND_INTERVAL*1000 &&
    332 		m_bufferOut.getNumFree() >= MESSAGE_HEADER_SIZE)
    333 	{
    334 		vector<deUint8> buf;
    335 		KeepAliveMessage().write(buf);
    336 		m_bufferOut.pushFront(&buf[0], (int)buf.size());
    337 
    338 		m_lastKeepAliveSent = deGetMicroseconds();
    339 	}
    340 }
    341 
    342 bool ExecutionRequestHandler::receive (void)
    343 {
    344 	int maxLen = de::min<int>((int)m_sendRecvTmpBuf.size(), m_bufferIn.getNumFree());
    345 
    346 	if (maxLen > 0)
    347 	{
    348 		int				numRecv;
    349 		deSocketResult	result	= m_socket->receive(&m_sendRecvTmpBuf[0], maxLen, &numRecv);
    350 
    351 		if (result == DE_SOCKETRESULT_SUCCESS)
    352 		{
    353 			DE_ASSERT(numRecv > 0);
    354 			m_bufferIn.pushFront(&m_sendRecvTmpBuf[0], numRecv);
    355 			return true;
    356 		}
    357 		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
    358 		{
    359 			m_run = false;
    360 			return true;
    361 		}
    362 		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
    363 			return false;
    364 		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
    365 			throw ConnectionError("Connection terminated");
    366 		else
    367 			throw ConnectionError("receive() failed");
    368 	}
    369 	else
    370 		return false;
    371 }
    372 
    373 bool ExecutionRequestHandler::send (void)
    374 {
    375 	int maxLen = de::min<int>((int)m_sendRecvTmpBuf.size(), m_bufferOut.getNumElements());
    376 
    377 	if (maxLen > 0)
    378 	{
    379 		m_bufferOut.peekBack(&m_sendRecvTmpBuf[0], maxLen);
    380 
    381 		int				numSent;
    382 		deSocketResult	result	= m_socket->send(&m_sendRecvTmpBuf[0], maxLen, &numSent);
    383 
    384 		if (result == DE_SOCKETRESULT_SUCCESS)
    385 		{
    386 			DE_ASSERT(numSent > 0);
    387 			m_bufferOut.popBack(numSent);
    388 			return true;
    389 		}
    390 		else if (result == DE_SOCKETRESULT_CONNECTION_CLOSED)
    391 		{
    392 			m_run = false;
    393 			return true;
    394 		}
    395 		else if (result == DE_SOCKETRESULT_WOULD_BLOCK)
    396 			return false;
    397 		else if (result == DE_SOCKETRESULT_CONNECTION_TERMINATED)
    398 			throw ConnectionError("Connection terminated");
    399 		else
    400 			throw ConnectionError("send() failed");
    401 	}
    402 	else
    403 		return false;
    404 }
    405 
    406 } // xs
    407