1 package org.testng.remote.strprotocol; 2 3 import java.io.BufferedReader; 4 import java.io.BufferedWriter; 5 import java.io.Closeable; 6 import java.io.IOException; 7 import java.io.InputStream; 8 import java.io.InputStreamReader; 9 import java.io.OutputStream; 10 import java.io.OutputStreamWriter; 11 import java.io.PrintWriter; 12 import java.io.UnsupportedEncodingException; 13 import java.net.ConnectException; 14 import java.net.ServerSocket; 15 import java.net.Socket; 16 import java.net.SocketTimeoutException; 17 18 import org.testng.TestNGException; 19 20 import static org.testng.remote.RemoteTestNG.isVerbose; 21 22 abstract public class BaseMessageSender implements IMessageSender { 23 private boolean m_debug = false; 24 protected Socket m_clientSocket; 25 private String m_host; 26 private int m_port; 27 protected final Object m_ackLock = new Object(); 28 29 private boolean m_requestStopReceiver; 30 /** Outgoing message stream. */ 31 protected OutputStream m_outStream; 32 /** Used to send ACK and STOP */ 33 private PrintWriter m_outWriter; 34 35 /** Incoming message stream. */ 36 protected volatile InputStream m_inStream; 37 /** Used to receive ACK and STOP */ 38 protected volatile BufferedReader m_inReader; 39 40 private ReaderThread m_readerThread; 41 private boolean m_ack; 42 // protected InputStream m_receiverInputStream; 43 44 public BaseMessageSender(String host, int port, boolean ack) { 45 m_host = host; 46 m_port = port; 47 m_ack = ack; 48 } 49 50 /** 51 * Starts the connection. 52 * 53 * @throws TestNGException if an exception occurred while establishing the connection 54 */ 55 @Override 56 public void connect() throws IOException { 57 p("Waiting for Eclipse client on " + m_host + ":" + m_port); 58 while (true) { 59 try { 60 m_clientSocket = new Socket(m_host, m_port); 61 p("Received a connection from Eclipse on " + m_host + ":" + m_port); 62 63 // Output streams 64 m_outStream = m_clientSocket.getOutputStream(); 65 m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream))); 66 67 // Input streams 68 m_inStream = m_clientSocket.getInputStream(); 69 try { 70 m_inReader = new BufferedReader(new InputStreamReader(m_inStream, 71 "UTF-8")); //$NON-NLS-1$ 72 } 73 catch(UnsupportedEncodingException ueex) { 74 // Should never happen 75 m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); 76 } 77 78 p("Connection established, starting reader thread"); 79 m_readerThread = new ReaderThread(); 80 m_readerThread.start(); 81 return; 82 } 83 catch(ConnectException ex) { 84 // ignore and retry 85 try { 86 Thread.sleep(4000); 87 } 88 catch(InterruptedException handled) { 89 Thread.currentThread().interrupt(); 90 } 91 } 92 } 93 } 94 95 private void sendAdminMessage(String message) { 96 m_outWriter.println(message); 97 m_outWriter.flush(); 98 } 99 100 private int m_serial = 0; 101 102 @Override 103 public void sendAck() { 104 p("Sending ACK " + m_serial); 105 // Note: adding the serial at the end of this message causes a lock up if interacting 106 // with TestNG 5.14 and older (reported by JetBrains). The following git commit: 107 // 5730bdfb33ec7a8bf4104852cd4a5f2875ba8267 108 // changed equals() to startsWith(). 109 // It's ok to add this serial back for debugging, but don't commit it until JetBrains 110 // confirms they no longer need backward compatibility with 5.14. 111 sendAdminMessage(MessageHelper.ACK_MSG); // + m_serial++); 112 } 113 114 @Override 115 public void sendStop() { 116 sendAdminMessage(MessageHelper.STOP_MSG); 117 } 118 119 @Override 120 public void initReceiver() throws SocketTimeoutException { 121 if (m_inStream != null) { 122 p("Receiver already initialized"); 123 } 124 ServerSocket serverSocket = null; 125 try { 126 p("initReceiver on port " + m_port); 127 serverSocket = new ServerSocket(m_port); 128 serverSocket.setSoTimeout(5000); 129 130 Socket socket = null; 131 while (!m_requestStopReceiver) { 132 try { 133 if (m_debug) { 134 p("polling the client connection"); 135 } 136 socket = serverSocket.accept(); 137 // break the loop once the first client connected 138 break; 139 } 140 catch (IOException ioe) { 141 try { 142 Thread.sleep(100L); 143 } 144 catch (InterruptedException ie) { 145 // Do nothing. 146 } 147 } 148 } 149 if (socket != null) { 150 m_inStream = socket.getInputStream(); 151 m_inReader = new BufferedReader(new InputStreamReader(m_inStream)); 152 m_outStream = socket.getOutputStream(); 153 m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream)); 154 } 155 } 156 catch(SocketTimeoutException ste) { 157 throw ste; 158 } 159 catch (IOException ioe) { 160 closeQuietly(serverSocket); 161 } 162 } 163 164 public void stopReceiver() { 165 m_requestStopReceiver = true; 166 } 167 168 @Override 169 public void shutDown() { 170 closeQuietly(m_outStream); 171 m_outStream = null; 172 173 if (null != m_readerThread) { 174 m_readerThread.interrupt(); 175 } 176 177 closeQuietly(m_inReader); 178 m_inReader = null; 179 180 closeQuietly(m_clientSocket); 181 m_clientSocket = null; 182 } 183 184 private void closeQuietly(Closeable c) { 185 if (c != null) { 186 try { 187 c.close(); 188 } catch (IOException e) { 189 if (m_debug) { 190 e.printStackTrace(); 191 } 192 } 193 } 194 } 195 196 private String m_latestAck; 197 198 protected void waitForAck() { 199 if (m_ack) { 200 try { 201 p("Message sent, waiting for ACK..."); 202 synchronized(m_ackLock) { 203 m_ackLock.wait(); 204 } 205 p("... ACK received:" + m_latestAck); 206 } 207 catch(InterruptedException handled) { 208 Thread.currentThread().interrupt(); 209 } 210 } 211 } 212 213 private static void p(String msg) { 214 if (isVerbose()) { 215 System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$ 216 } 217 } 218 219 /** 220 * Reader thread that processes messages from the client. 221 */ 222 private class ReaderThread extends Thread { 223 224 public ReaderThread() { 225 super("ReaderThread"); //$NON-NLS-1$ 226 } 227 228 @Override 229 public void run() { 230 try { 231 p("ReaderThread waiting for an admin message"); 232 String message = m_inReader.readLine(); 233 p("ReaderThread received admin message:" + message); 234 while (message != null) { 235 if (m_debug) { 236 p("Admin message:" + message); //$NON-NLS-1$ 237 } 238 boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG); 239 boolean stop = MessageHelper.STOP_MSG.equals(message); 240 if(acknowledge || stop) { 241 if (acknowledge) { 242 p("Received ACK:" + message); 243 m_latestAck = message; 244 } 245 synchronized(m_ackLock) { 246 m_ackLock.notifyAll(); 247 } 248 if (stop) { 249 break; 250 } 251 } else { 252 p("Received unknown message: '" + message + "'"); 253 } 254 message = m_inReader != null ? m_inReader.readLine() : null; 255 } 256 // while((m_reader != null) && (message = m_reader.readLine()) != null) { 257 // if (m_debug) { 258 // p("Admin message:" + message); //$NON-NLS-1$ 259 // } 260 // boolean acknowledge = MessageHelper.ACK_MSG.equals(message); 261 // boolean stop = MessageHelper.STOP_MSG.equals(message); 262 // if(acknowledge || stop) { 263 // synchronized(m_lock) { 264 // m_lock.notifyAll(); 265 // } 266 // if (stop) { 267 // break; 268 // } 269 // } 270 // } 271 } 272 catch(IOException ioe) { 273 if (isVerbose()) { 274 ioe.printStackTrace(); 275 } 276 } 277 } 278 } 279 } 280