Home | History | Annotate | Download | only in strprotocol
      1 package org.testng.remote.strprotocol;
      2 
      3 import java.io.BufferedReader;
      4 import java.io.BufferedWriter;
      5 import java.io.Closeable;
      6 import java.io.IOException;
      7 import java.io.InputStream;
      8 import java.io.InputStreamReader;
      9 import java.io.OutputStream;
     10 import java.io.OutputStreamWriter;
     11 import java.io.PrintWriter;
     12 import java.io.UnsupportedEncodingException;
     13 import java.net.ConnectException;
     14 import java.net.ServerSocket;
     15 import java.net.Socket;
     16 import java.net.SocketTimeoutException;
     17 
     18 import org.testng.TestNGException;
     19 
     20 import static org.testng.remote.RemoteTestNG.isVerbose;
     21 
     22 abstract public class BaseMessageSender implements IMessageSender {
     23   private boolean m_debug = false;
     24   protected Socket m_clientSocket;
     25   private String m_host;
     26   private int m_port;
     27   protected final Object m_ackLock = new Object();
     28 
     29   private boolean m_requestStopReceiver;
     30   /** Outgoing message stream. */
     31   protected OutputStream m_outStream;
     32   /** Used to send ACK and STOP */
     33   private PrintWriter m_outWriter;
     34 
     35   /** Incoming message stream. */
     36   protected volatile InputStream m_inStream;
     37   /** Used to receive ACK and STOP */
     38   protected volatile BufferedReader m_inReader;
     39 
     40   private ReaderThread m_readerThread;
     41   private boolean m_ack;
     42 //  protected InputStream m_receiverInputStream;
     43 
     44   public BaseMessageSender(String host, int port, boolean ack) {
     45     m_host = host;
     46     m_port = port;
     47     m_ack = ack;
     48   }
     49 
     50   /**
     51    * Starts the connection.
     52    *
     53    * @throws TestNGException if an exception occurred while establishing the connection
     54    */
     55   @Override
     56   public void connect() throws IOException {
     57     p("Waiting for Eclipse client on " + m_host + ":" + m_port);
     58     while (true) {
     59       try {
     60         m_clientSocket = new Socket(m_host, m_port);
     61         p("Received a connection from Eclipse on " + m_host + ":" + m_port);
     62 
     63         // Output streams
     64         m_outStream = m_clientSocket.getOutputStream();
     65         m_outWriter = new PrintWriter(new BufferedWriter(new OutputStreamWriter(m_outStream)));
     66 
     67         // Input streams
     68         m_inStream = m_clientSocket.getInputStream();
     69         try {
     70           m_inReader = new BufferedReader(new InputStreamReader(m_inStream,
     71               "UTF-8")); //$NON-NLS-1$
     72         }
     73         catch(UnsupportedEncodingException ueex) {
     74           // Should never happen
     75           m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
     76         }
     77 
     78         p("Connection established, starting reader thread");
     79         m_readerThread = new ReaderThread();
     80         m_readerThread.start();
     81         return;
     82       }
     83       catch(ConnectException ex) {
     84         // ignore and retry
     85         try {
     86           Thread.sleep(4000);
     87         }
     88         catch(InterruptedException handled) {
     89           Thread.currentThread().interrupt();
     90         }
     91       }
     92     }
     93   }
     94 
     95   private void sendAdminMessage(String message) {
     96     m_outWriter.println(message);
     97     m_outWriter.flush();
     98   }
     99 
    100   private int m_serial = 0;
    101 
    102   @Override
    103   public void sendAck() {
    104     p("Sending ACK " + m_serial);
    105     // Note: adding the serial at the end of this message causes a lock up if interacting
    106     // with TestNG 5.14 and older (reported by JetBrains). The following git commit:
    107     // 5730bdfb33ec7a8bf4104852cd4a5f2875ba8267
    108     // changed equals() to startsWith().
    109     // It's ok to add this serial back for debugging, but don't commit it until JetBrains
    110     // confirms they no longer need backward compatibility with 5.14.
    111     sendAdminMessage(MessageHelper.ACK_MSG); // + m_serial++);
    112   }
    113 
    114   @Override
    115   public void sendStop() {
    116     sendAdminMessage(MessageHelper.STOP_MSG);
    117   }
    118 
    119   @Override
    120   public void initReceiver() throws SocketTimeoutException {
    121     if (m_inStream != null) {
    122       p("Receiver already initialized");
    123     }
    124     ServerSocket serverSocket = null;
    125     try {
    126       p("initReceiver on port " + m_port);
    127       serverSocket = new ServerSocket(m_port);
    128       serverSocket.setSoTimeout(5000);
    129 
    130       Socket socket = null;
    131       while (!m_requestStopReceiver) {
    132         try {
    133           if (m_debug) {
    134             p("polling the client connection");
    135           }
    136           socket = serverSocket.accept();
    137           // break the loop once the first client connected
    138           break;
    139         }
    140         catch (IOException ioe) {
    141           try {
    142             Thread.sleep(100L);
    143           }
    144           catch (InterruptedException ie) {
    145             // Do nothing.
    146           }
    147         }
    148       }
    149       if (socket != null) {
    150         m_inStream = socket.getInputStream();
    151         m_inReader = new BufferedReader(new InputStreamReader(m_inStream));
    152         m_outStream = socket.getOutputStream();
    153         m_outWriter = new PrintWriter(new OutputStreamWriter(m_outStream));
    154       }
    155     }
    156     catch(SocketTimeoutException ste) {
    157       throw ste;
    158     }
    159     catch (IOException ioe) {
    160       closeQuietly(serverSocket);
    161     }
    162   }
    163 
    164   public void stopReceiver() {
    165     m_requestStopReceiver = true;
    166   }
    167 
    168   @Override
    169   public void shutDown() {
    170     closeQuietly(m_outStream);
    171     m_outStream = null;
    172 
    173     if (null != m_readerThread) {
    174       m_readerThread.interrupt();
    175     }
    176 
    177     closeQuietly(m_inReader);
    178     m_inReader = null;
    179 
    180     closeQuietly(m_clientSocket);
    181     m_clientSocket = null;
    182   }
    183 
    184   private void closeQuietly(Closeable c) {
    185     if (c != null) {
    186       try {
    187         c.close();
    188       } catch (IOException e) {
    189         if (m_debug) {
    190           e.printStackTrace();
    191         }
    192       }
    193     }
    194   }
    195 
    196   private String m_latestAck;
    197 
    198   protected void waitForAck() {
    199     if (m_ack) {
    200       try {
    201         p("Message sent, waiting for ACK...");
    202         synchronized(m_ackLock) {
    203           m_ackLock.wait();
    204         }
    205         p("... ACK received:" + m_latestAck);
    206       }
    207       catch(InterruptedException handled) {
    208         Thread.currentThread().interrupt();
    209       }
    210     }
    211   }
    212 
    213   private static void p(String msg) {
    214     if (isVerbose()) {
    215       System.out.println("[BaseMessageSender] " + msg); //$NON-NLS-1$
    216     }
    217   }
    218 
    219   /**
    220    * Reader thread that processes messages from the client.
    221    */
    222   private class ReaderThread extends Thread {
    223 
    224     public ReaderThread() {
    225       super("ReaderThread"); //$NON-NLS-1$
    226     }
    227 
    228     @Override
    229     public void run() {
    230       try {
    231         p("ReaderThread waiting for an admin message");
    232         String message = m_inReader.readLine();
    233         p("ReaderThread received admin message:" + message);
    234         while (message != null) {
    235           if (m_debug) {
    236             p("Admin message:" + message); //$NON-NLS-1$
    237           }
    238           boolean acknowledge = message.startsWith(MessageHelper.ACK_MSG);
    239           boolean stop = MessageHelper.STOP_MSG.equals(message);
    240           if(acknowledge || stop) {
    241             if (acknowledge) {
    242               p("Received ACK:" + message);
    243               m_latestAck = message;
    244             }
    245             synchronized(m_ackLock) {
    246               m_ackLock.notifyAll();
    247             }
    248             if (stop) {
    249               break;
    250             }
    251           } else {
    252             p("Received unknown message: '" + message + "'");
    253           }
    254           message = m_inReader != null ? m_inReader.readLine() : null;
    255         }
    256 //        while((m_reader != null) && (message = m_reader.readLine()) != null) {
    257 //          if (m_debug) {
    258 //            p("Admin message:" + message); //$NON-NLS-1$
    259 //          }
    260 //          boolean acknowledge = MessageHelper.ACK_MSG.equals(message);
    261 //          boolean stop = MessageHelper.STOP_MSG.equals(message);
    262 //          if(acknowledge || stop) {
    263 //            synchronized(m_lock) {
    264 //              m_lock.notifyAll();
    265 //            }
    266 //            if (stop) {
    267 //              break;
    268 //            }
    269 //          }
    270 //        }
    271       }
    272       catch(IOException ioe) {
    273         if (isVerbose()) {
    274           ioe.printStackTrace();
    275         }
    276       }
    277     }
    278   }
    279 }
    280