Home | History | Annotate | Download | only in jdwp
      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *
     15  *  See the License for the specific language governing permissions and
     16  *  limitations under the License.
     17  */
     18 
     19 /**
     20  * @author Anton V. Karnachuk
     21  */
     22 
     23 /**
     24  * Created on 16.03.2005
     25  */
     26 package org.apache.harmony.jpda.tests.framework.jdwp;
     27 
     28 import java.util.List;
     29 import java.io.IOException;
     30 import java.io.InterruptedIOException;
     31 import java.util.ArrayList;
     32 import java.util.Hashtable;
     33 
     34 import org.apache.harmony.jpda.tests.framework.LogWriter;
     35 import org.apache.harmony.jpda.tests.framework.TestOptions;
     36 import org.apache.harmony.jpda.tests.framework.jdwp.exceptions.TimeoutException;
     37 
     38 /**
     39  * This class provides asynchronous sending JDWP commands and receiving JDWP
     40  * events through established JDWP connection and supports timeout for these
     41  * operations.
     42  */
     43 public class PacketDispatcher extends Thread {
     44 
     45     /**
     46      * Variables below are intended only to help with tests failures
     47      * investigation. They turn on/off some kinds of trace during
     48      * tests execution which can clear up details of test failure.
     49      *
     50      * commandsNumberForTrace and begCommandIdForTrace define trace
     51      * of sent JDWP commands and received replies for these commands:
     52      * - begCommandIdForTrace defines starting command ID for trace
     53      *   (the first command has ID=1, the second - ID=2 and so on).
     54      *   if <= 0 then the same as = 1.
     55      * - commandsNumberForTrace defines number of command for trace.
     56      *   if <= 0 then commands' trace is off.
     57      *
     58      * - eventRequestIDForTrace defines trace of received events
     59      *   according to request ID value:
     60      *   if < 0 then this trace is off;
     61      *   if = 0 then trace is for all received events;
     62      *   if > 0 then trace is for received events, which are triggered
     63      *          by this specified request ID value;
     64      *
     65      * - eventKindForTrace defines trace of received events
     66      *   according to this specified kind of event.
     67      *   if = 0 then this trace is off;
     68      *   See JDWPConstants.EventKind class for values of
     69      *   event kinds.
     70      */
     71     int begCommandIdForTrace = 1;
     72 
     73     int commandsNumberForTrace = 0;
     74 
     75     int eventRequestIDForTrace = -1;
     76 
     77     byte eventKindForTrace = 0;
     78 
     79     /**
     80      * Internal class to synchronize jdwp events. When an event is received it
     81      * is stored in eventQueue. If there are any thread that waits for event it
     82      * is notified.
     83      */
     84     private class EventsSynchronyzer {
     85 
     86         /**
     87          * List of received events.
     88          */
     89         private List<EventPacket> eventQueue;
     90 
     91         /**
     92          * A default constructor.
     93          */
     94         EventsSynchronyzer() {
     95             // initialize eventQueue
     96             eventQueue = new ArrayList<EventPacket>();
     97         }
     98 
     99         /**
    100          * Notifies thread that the new event has been received.
    101          *
    102          * @param eventPacket
    103          *            instance of EventPacket
    104          * @throws InterruptedException
    105          */
    106         public void notifyThread(EventPacket eventPacket)
    107                 throws InterruptedException {
    108 
    109             // use this object as lock
    110             synchronized (this) {
    111                 // add the event to eventQueue
    112                 eventQueue.add(eventPacket);
    113 
    114                 // notify next waiting thread
    115                 this.notify();
    116             }
    117         }
    118 
    119         /**
    120          * Waits for new event during timeout.
    121          *
    122          * @param timeout
    123          *            wait timeout
    124          * @return EventPacket
    125          * @throws InterruptedException
    126          * @throws IOException
    127          * @throws TimeoutException
    128          *             if no event was received
    129          */
    130         public EventPacket waitForNextEvent(long timeout)
    131                 throws InterruptedException, IOException {
    132 
    133             // use this object as lock
    134             synchronized (this) {
    135 
    136                 // if there is already received event in eventQueue,
    137                 // then return it
    138                 synchronized (eventQueue) {
    139                     if (!eventQueue.isEmpty()) {
    140                         return eventQueue.remove(0);
    141                     }
    142 
    143                     // if eventQueue is empty and connection is already closed
    144                     // reraise the exception
    145                     if (connectionException != null)
    146                         throw connectionException;
    147                 }
    148 
    149                 // wait for the next event
    150                 this.wait(timeout);
    151 
    152                 // We have the following opportunities here -
    153                 // next event was received, exception in main cyrcle or timeout
    154                 // happens
    155                 synchronized (eventQueue) {
    156                     if (!eventQueue.isEmpty()) {
    157                         // event received
    158                         EventPacket event = eventQueue.remove(0);
    159                         return event;
    160                     }
    161 
    162                     if (connectionException != null) {
    163                         // if eventQueue is empty and connection is already
    164                         // closed
    165                         // reraise the exception
    166                         throw connectionException;
    167                     }
    168                 }
    169             }
    170 
    171             // no events were occurred during timeout
    172             throw new TimeoutException(false);
    173         }
    174 
    175         /**
    176          * This method is called when connection is closed. It notifies all the
    177          * waiting threads.
    178          */
    179         public void terminate() {
    180             synchronized (this) {
    181                 this.notifyAll();
    182             }
    183         }
    184     }
    185 
    186     /**
    187      * Internal class to synchronize jdwp commands. It sends command packets
    188      * through connection and returns replies.
    189      */
    190     class CommandsSynchronyzer {
    191 
    192         private int commandId;
    193 
    194         private Hashtable<Integer, CommandPacket> commands;
    195 
    196         private Hashtable<Integer, ReplyPacket> replies;
    197 
    198         /**
    199          * A default constructor.
    200          */
    201         CommandsSynchronyzer() {
    202             commands = new Hashtable<Integer, CommandPacket>();
    203             replies = new Hashtable<Integer, ReplyPacket>();
    204 
    205             // set first command id to 1
    206             commandId = 1;
    207         }
    208 
    209         /**
    210          * Gets the next new id for a command.
    211          *
    212          * @return int
    213          */
    214         private synchronized int getNextId() {
    215             return commandId++;
    216         }
    217 
    218         /**
    219          * Notifies thread that reply packet was received.
    220          *
    221          * @param replyPacket
    222          *            instance of ReplyPacket
    223          * @throws IOException
    224          * @throws InterruptedException
    225          */
    226         public void notifyThread(ReplyPacket replyPacket) throws IOException,
    227                 InterruptedException {
    228 
    229             synchronized (commands) {
    230 
    231                 // obtain the current command id
    232                 Integer Id = new Integer(replyPacket.getId());
    233 
    234                 // obtain the current command packet by command id
    235                 CommandPacket command = commands.remove(Id);
    236                 if (command == null) {
    237                     // we received reply's id that does not correspond to any
    238                     // command
    239                     throw new IOException(
    240                             "Reply id is corresponded to no command. Id = "
    241                                     + Id);
    242                 }
    243 
    244                 synchronized (command) {
    245                     // put the reply in replies queue
    246                     synchronized (replies) {
    247                         replies.put(new Integer(replyPacket.getId()),
    248                                 replyPacket);
    249                     }
    250                     // notify waiting thread
    251                     command.notifyAll();
    252                 }
    253             }
    254         }
    255 
    256         /**
    257          * Sends command and waits for the reply during timeout.
    258          *
    259          * @param command
    260          *            instance of CommandPacket
    261          * @param timeout
    262          *            reply wait timeout
    263          * @return
    264          *            a reply packet
    265          * @throws TimeoutException
    266          *             if no reply was received
    267          */
    268         public ReplyPacket waitForReply(CommandPacket command, long timeout)
    269                 throws InterruptedException, IOException {
    270 
    271             synchronized (command) {
    272 
    273                 // if connection is already closed reraise the exception
    274                 if (connectionException != null)
    275                     throw connectionException;
    276 
    277                 // obtain new command id
    278                 Integer Id = new Integer(getNextId());
    279                 command.setId(Id.intValue());
    280 
    281                 // add command into commands hashtable
    282                 synchronized (commands) {
    283                     commands.put(Id, command);
    284 
    285                     // below is trace for sent coomasnds
    286                     if (commandsNumberForTrace > 0) {
    287                         int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace
    288                                 : 1;
    289                         if (Id.intValue() >= begCommandId) {
    290                             if ((Id.intValue() - begCommandId) < commandsNumberForTrace) {
    291                                 logWriter
    292                                         .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = "
    293                                                 + Id.intValue()
    294                                                 + "; CommandSet = "
    295                                                 + command.getCommandSet()
    296                                                 + "; Command = "
    297                                                 + command.getCommand() + "...");
    298                             }
    299                         }
    300                     }
    301 
    302                     // write this package to connection
    303                     connection.writePacket(command.toBytesArray());
    304                 }
    305 
    306                 // if connection is already closed reraise the exception
    307                 if (connectionException != null)
    308                     throw connectionException;
    309 
    310                 // wait for reply
    311                 command.wait(timeout);
    312 
    313                 // receive the reply
    314                 ReplyPacket currentReply = null;
    315                 synchronized (replies) {
    316                     currentReply = replies.remove(Id);
    317                 }
    318 
    319                 // if reply is ok, return it
    320                 if (currentReply != null) {
    321                     return currentReply;
    322                 }
    323 
    324                 // if connection is already closed reraise the exception
    325                 if (connectionException != null)
    326                     throw connectionException;
    327             }
    328 
    329             // no event was occurred during timeout
    330             throw new TimeoutException(false);
    331         }
    332 
    333         /**
    334          * Sends command without waiting for the reply and returns id of the
    335          * sent command.
    336          *
    337          * @param command
    338          *            instance of CommandPacket
    339          * @return command id
    340          * @throws IOException
    341          */
    342         public int sendCommand(CommandPacket command) throws IOException {
    343 
    344             // if connection is already closed reraise the exception
    345             if (connectionException != null)
    346                 throw connectionException;
    347 
    348             // obtain new command id
    349             Integer Id = new Integer(getNextId());
    350             command.setId(Id.intValue());
    351 
    352             // add command into commands hashtable
    353             synchronized (commands) {
    354                 commands.put(Id, command);
    355 
    356                 // below is trace for sent coomasnds
    357                 if (commandsNumberForTrace > 0) {
    358                     int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace
    359                             : 1;
    360                     if (Id.intValue() >= begCommandId) {
    361                         if ((Id.intValue() - begCommandId) < commandsNumberForTrace) {
    362                             logWriter
    363                                     .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = "
    364                                             + Id.intValue()
    365                                             + "; CommandSet = "
    366                                             + command.getCommandSet()
    367                                             + "; Command = "
    368                                             + command.getCommand() + "...");
    369                         }
    370                     }
    371                 }
    372 
    373                 // write this package to connection
    374                 connection.writePacket(command.toBytesArray());
    375             }
    376 
    377             // if connection is already closed reraise the exception
    378             if (connectionException != null) {
    379                 throw connectionException;
    380             }
    381 
    382             return Id.intValue();
    383 
    384         }
    385 
    386         /**
    387          * Receives the reply during timeout for command with specified command
    388          * ID.
    389          *
    390          * @param commandId
    391          *            id of previously sent commend
    392          * @param timeout
    393          *            receive timeout
    394          * @return received ReplyPacket
    395          * @throws TimeoutException
    396          *             if no reply was received
    397          */
    398         public ReplyPacket receiveReply(int commandId, long timeout)
    399                 throws InterruptedException, IOException {
    400 
    401             // if connection is already closed reraise the exception
    402             if (connectionException != null)
    403                 throw connectionException;
    404 
    405             // receive the reply
    406             long endTimeMlsecForWait = System.currentTimeMillis() + timeout;
    407             synchronized (replies) {
    408                 while (true) {
    409                     ReplyPacket currentReply = replies.remove(new Integer(commandId));
    410                     // if reply is ok, return it
    411                     if (currentReply != null) {
    412                         return currentReply;
    413                     }
    414                     // if connection is already closed reraise the exception
    415                     if (connectionException != null) {
    416                         throw connectionException;
    417                     }
    418                     if (System.currentTimeMillis() >= endTimeMlsecForWait) {
    419                         break;
    420                     }
    421                     replies.wait(100);
    422                 }
    423             }
    424             // no expected reply was found during timeout
    425             throw new TimeoutException(false);
    426         }
    427 
    428         /**
    429          * This method is called when connection is closed. It notifies all the
    430          * waiting threads.
    431          *
    432          */
    433         public void terminate() {
    434 
    435             synchronized (commands) {
    436                 // enumerate all waiting commands
    437                 for (CommandPacket command : commands.values()) {
    438                     synchronized (command) {
    439                         // notify the waiting object
    440                         command.notifyAll();
    441                     }
    442                 }
    443             }
    444         }
    445     }
    446 
    447     /** Transport which is used to sent and receive packets. */
    448     private TransportWrapper connection;
    449 
    450     /** Current test run configuration. */
    451     TestOptions config;
    452 
    453     private CommandsSynchronyzer commandsSynchronyzer;
    454 
    455     private EventsSynchronyzer eventsSynchronyzer;
    456 
    457     private LogWriter logWriter;
    458 
    459     private IOException connectionException;
    460 
    461     /**
    462      * Creates new PacketDispatcher instance.
    463      *
    464      * @param connection
    465      *            open connection for reading and writing packets
    466      * @param config
    467      *            test run options
    468      * @param logWriter
    469      *            LogWriter object
    470      */
    471     public PacketDispatcher(TransportWrapper connection, TestOptions config,
    472             LogWriter logWriter) {
    473 
    474         this.connection = connection;
    475         this.config = config;
    476         this.logWriter = logWriter;
    477 
    478         commandsSynchronyzer = new CommandsSynchronyzer();
    479         eventsSynchronyzer = new EventsSynchronyzer();
    480 
    481         // make thread daemon
    482         setDaemon(true);
    483 
    484         // start the thread
    485         start();
    486     }
    487 
    488     /**
    489      * Reads packets from connection and dispatches them between waiting
    490      * threads.
    491      */
    492     @Override
    493     public void run() {
    494 
    495         connectionException = null;
    496 
    497         try {
    498             // start listening for replies
    499             while (!isInterrupted()) {
    500 
    501                 // read packet from transport
    502                 byte[] packet = connection.readPacket();
    503 
    504                 // break cycle if empty packet
    505                 if (packet == null || packet.length == 0)
    506                     break;
    507 
    508                 // check flags
    509                 if (packet.length < Packet.FLAGS_INDEX) {
    510                     logWriter
    511                             .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet size = "
    512                                     + packet.length);
    513                 } else {
    514                     int flag = packet[Packet.FLAGS_INDEX] & 0xFF;
    515                     if (flag != 0) {
    516                         if (flag != Packet.REPLY_PACKET_FLAG) {
    517                             logWriter
    518                                     .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet flags = "
    519                                             + Integer.toHexString(flag));
    520                         }
    521                     }
    522                 }
    523 
    524                 // check the reply flag
    525                 if (Packet.isReply(packet)) {
    526                     // new reply
    527                     ReplyPacket replyPacket = new ReplyPacket(packet);
    528 
    529                     // check for received reply packet length
    530                     int packetLength = replyPacket.getLength();
    531                     if (packetLength < Packet.HEADER_SIZE) {
    532                         logWriter
    533                                 .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet length = "
    534                                         + packetLength);
    535                     }
    536 
    537                     // below is trace for received coomasnds
    538                     if (commandsNumberForTrace > 0) {
    539                         int replyID = replyPacket.getId();
    540                         int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace
    541                                 : 1;
    542                         if (replyID >= begCommandId) {
    543                             if ((replyID - begCommandId) < commandsNumberForTrace) {
    544                                 logWriter
    545                                         .println(">>>>>>>>>> PacketDispatcher: Received REPLY ID = "
    546                                                 + replyID);
    547                             }
    548                         }
    549                     }
    550 
    551                     commandsSynchronyzer.notifyThread(replyPacket);
    552                 } else {
    553                     // new event
    554                     EventPacket eventPacket = new EventPacket(packet);
    555                     // below is to check received events for correctness
    556 
    557                     // below is trace for received events
    558                     ParsedEvent[] parsedEvents = ParsedEvent
    559                             .parseEventPacket(eventPacket);
    560                     if ((eventRequestIDForTrace >= 0)
    561                             || (eventKindForTrace > 0)) {
    562                         for (int i = 0; i < parsedEvents.length; i++) {
    563                             boolean trace = false;
    564                             int eventRequestID = parsedEvents[i].getRequestID();
    565                             if (eventRequestIDForTrace == 0) {
    566                                 trace = true;
    567                             } else {
    568                                 if (eventRequestID == eventRequestIDForTrace) {
    569                                     trace = true;
    570                                 }
    571                             }
    572                             byte eventKind = parsedEvents[i].getEventKind();
    573                             if (eventKind == eventKindForTrace) {
    574                                 trace = true;
    575                             }
    576                             if (trace) {
    577                                 logWriter
    578                                         .println(">>>>>>>>>> PacketDispatcher: Received_EVENT["
    579                                                 + i
    580                                                 + "]: eventRequestID= "
    581                                                 + eventRequestID
    582                                                 + "; eventKind =  "
    583                                                 + eventKind
    584                                                 + "("
    585                                                 + JDWPConstants.EventKind
    586                                                         .getName(eventKind)
    587                                                 + ")");
    588                             }
    589                         }
    590                     }
    591                     eventsSynchronyzer.notifyThread(eventPacket);
    592                 }
    593             }
    594 
    595             // this exception is send for all waiting threads
    596             connectionException = new TimeoutException(true);
    597         } catch (IOException e) {
    598             // connection exception is send for all waiting threads
    599             connectionException = e;
    600 
    601             // print stack trace
    602             e.printStackTrace();
    603         } catch (InterruptedException e) {
    604             // connection exception is send for all waiting threads
    605             connectionException = new InterruptedIOException(e.getMessage());
    606             connectionException.initCause(e);
    607 
    608             // print stack trace
    609             e.printStackTrace();
    610         }
    611 
    612         // notify all the waiting threads
    613         eventsSynchronyzer.terminate();
    614         commandsSynchronyzer.terminate();
    615     }
    616 
    617     /**
    618      * Receives event from event queue if there are any events or waits during
    619      * timeout for any event occurrence. This method should not be used
    620      * simultaneously from different threads. If there were no reply during the
    621      * timeout, TimeoutException is thrown.
    622      *
    623      * @param timeout
    624      *            timeout in milliseconds
    625      * @return received event packet
    626      * @throws IOException
    627      *             is any connection error occurred
    628      * @throws InterruptedException
    629      *             if reading packet was interrupted
    630      * @throws TimeoutException
    631      *             if timeout exceeded
    632      */
    633     public EventPacket receiveEvent(long timeout) throws IOException,
    634             InterruptedException, TimeoutException {
    635 
    636         return eventsSynchronyzer.waitForNextEvent(timeout);
    637     }
    638 
    639     /**
    640      * Sends JDWP command packet and waits for reply packet during default
    641      * timeout. If there were no reply packet during the timeout,
    642      * TimeoutException is thrown.
    643      *
    644      * @return received reply packet
    645      * @throws InterruptedException
    646      *             if reading packet was interrupted
    647      * @throws IOException
    648      *             if any connection error occurred
    649      * @throws TimeoutException
    650      *             if timeout exceeded
    651      */
    652     public ReplyPacket performCommand(CommandPacket command)
    653             throws InterruptedException, IOException, TimeoutException {
    654 
    655         return performCommand(command, config.getTimeout());
    656     }
    657 
    658     /**
    659      * Sends JDWP command packet and waits for reply packet during certain
    660      * timeout. If there were no reply packet during the timeout,
    661      * TimeoutException is thrown.
    662      *
    663      * @param command
    664      *            command packet to send
    665      * @param timeout
    666      *            timeout in milliseconds
    667      * @return received reply packet
    668      * @throws InterruptedException
    669      *             if packet reading was interrupted
    670      * @throws IOException
    671      *             if any connection error occurred
    672      * @throws TimeoutException
    673      *             if timeout exceeded
    674      */
    675     public ReplyPacket performCommand(CommandPacket command, long timeout)
    676             throws InterruptedException, IOException, TimeoutException {
    677 
    678         return commandsSynchronyzer.waitForReply(command, timeout);
    679     }
    680 
    681     /**
    682      * Sends CommandPacket to debuggee VM without waiting for the reply. This
    683      * method is intended for special cases when there is need to divide
    684      * command's performing into two actions: command's sending and receiving
    685      * reply (e.g. for asynchronous JDWP commands' testing). After this method
    686      * the 'receiveReply()' method must be used latter for receiving reply for
    687      * sent command. It is NOT recommended to use this method for usual cases -
    688      * 'performCommand()' method must be used.
    689      *
    690      * @param command
    691      *            Command packet to be sent
    692      * @return command ID of sent command
    693      * @throws IOException
    694      *             if any connection error occurred
    695      */
    696     public int sendCommand(CommandPacket command) throws IOException {
    697         return commandsSynchronyzer.sendCommand(command);
    698     }
    699 
    700     /**
    701      * Waits for reply for command which was sent before by 'sendCommand()'
    702      * method. Specified timeout is used as time limit for waiting. This method
    703      * (jointly with 'sendCommand()') is intended for special cases when there
    704      * is need to divide command's performing into two actions: command's
    705      * sending and receiving reply (e.g. for asynchronous JDWP commands'
    706      * testing). It is NOT recommended to use 'sendCommand()- receiveReply()'
    707      * pair for usual cases - 'performCommand()' method must be used.
    708      *
    709      * @param commandId
    710      *            Command ID of sent before command, reply from which is
    711      *            expected to be received
    712      * @param timeout
    713      *            Specified timeout in milliseconds to wait for reply
    714      * @return received ReplyPacket
    715      * @throws IOException
    716      *             if any connection error occurred
    717      * @throws InterruptedException
    718      *             if reply packet's waiting was interrupted
    719      * @throws TimeoutException
    720      *             if timeout exceeded
    721      */
    722     public ReplyPacket receiveReply(int commandId, long timeout)
    723             throws InterruptedException, IOException, TimeoutException {
    724         return commandsSynchronyzer.receiveReply(commandId, timeout);
    725     }
    726 
    727 }
    728