1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 /** 20 * @author Anton V. Karnachuk 21 */ 22 23 /** 24 * Created on 16.03.2005 25 */ 26 package org.apache.harmony.jpda.tests.framework.jdwp; 27 28 import java.util.List; 29 import java.io.IOException; 30 import java.io.InterruptedIOException; 31 import java.util.ArrayList; 32 import java.util.Enumeration; 33 import java.util.Hashtable; 34 35 import org.apache.harmony.jpda.tests.framework.LogWriter; 36 import org.apache.harmony.jpda.tests.framework.TestOptions; 37 import org.apache.harmony.jpda.tests.framework.jdwp.exceptions.TimeoutException; 38 39 /** 40 * This class provides asynchronous sending JDWP commands and receiving JDWP 41 * events through established JDWP connection and supports timeout for these 42 * operations. 43 */ 44 public class PacketDispatcher extends Thread { 45 46 /** 47 * Variables below are intended only to help with tests failures 48 * investigation. They turn on/off some kinds of trace during 49 * tests execution which can clear up details of test failure. 50 * 51 * commandsNumberForTrace and begCommandIdForTrace define trace 52 * of sent JDWP commands and received replies for these commands: 53 * - begCommandIdForTrace defines starting command ID for trace 54 * (the first command has ID=1, the second - ID=2 and so on). 55 * if <= 0 then the same as = 1. 56 * - commandsNumberForTrace defines number of command for trace. 57 * if <= 0 then commands' trace is off. 58 * 59 * - eventRequestIDForTrace defines trace of received events 60 * according to request ID value: 61 * if < 0 then this trace is off; 62 * if = 0 then trace is for all received events; 63 * if > 0 then trace is for received events, which are triggered 64 * by this specified request ID value; 65 * 66 * - eventKindForTrace defines trace of received events 67 * according to this specified kind of event. 68 * if = 0 then this trace is off; 69 * See JDWPConstants.EventKind class for values of 70 * event kinds. 71 */ 72 int begCommandIdForTrace = 1; 73 74 int commandsNumberForTrace = 0; 75 76 int eventRequestIDForTrace = -1; 77 78 byte eventKindForTrace = 0; 79 80 /** 81 * Internal class to synchronize jdwp events. When an event is received it 82 * is stored in eventQueue. If there are any thread that waits for event it 83 * is notified. 84 */ 85 private class EventsSynchronyzer { 86 87 /** 88 * List of received events. 89 */ 90 private List<EventPacket> eventQueue; 91 92 /** 93 * A default constructor. 94 */ 95 EventsSynchronyzer() { 96 // initialize eventQueue 97 eventQueue = new ArrayList<EventPacket>(); 98 } 99 100 /** 101 * Notifies thread that the new event has been received. 102 * 103 * @param eventPacket 104 * instance of EventPacket 105 * @throws InterruptedException 106 */ 107 public void notifyThread(EventPacket eventPacket) 108 throws InterruptedException { 109 110 // use this object as lock 111 synchronized (this) { 112 // add the event to eventQueue 113 eventQueue.add(eventPacket); 114 115 // notify next waiting thread 116 this.notify(); 117 } 118 } 119 120 /** 121 * Waits for new event during timeout. 122 * 123 * @param timeout 124 * wait timeout 125 * @return EventPacket 126 * @throws InterruptedException 127 * @throws IOException 128 * @throws TimeoutException 129 * if no event was received 130 */ 131 public EventPacket waitForNextEvent(long timeout) 132 throws InterruptedException, IOException { 133 134 // use this object as lock 135 synchronized (this) { 136 137 // if there is already received event in eventQueue, 138 // then return it 139 synchronized (eventQueue) { 140 if (!eventQueue.isEmpty()) { 141 return (EventPacket) eventQueue.remove(0); 142 } 143 144 // if eventQueue is empty and connection is already closed 145 // reraise the exception 146 if (connectionException != null) 147 throw connectionException; 148 } 149 150 // wait for the next event 151 this.wait(timeout); 152 153 // We have the following opportunities here - 154 // next event was received, exception in main cyrcle or timeout 155 // happens 156 synchronized (eventQueue) { 157 if (!eventQueue.isEmpty()) { 158 // event received 159 EventPacket event = (EventPacket) eventQueue.remove(0); 160 return event; 161 } 162 163 if (connectionException != null) { 164 // if eventQueue is empty and connection is already 165 // closed 166 // reraise the exception 167 throw connectionException; 168 } 169 } 170 } 171 172 // no events were occurred during timeout 173 throw new TimeoutException(false); 174 } 175 176 /** 177 * This method is called when connection is closed. It notifies all the 178 * waiting threads. 179 */ 180 public void terminate() { 181 synchronized (this) { 182 this.notifyAll(); 183 } 184 } 185 } 186 187 /** 188 * Internal class to synchronize jdwp commands. It sends command packets 189 * through connection and returns replies. 190 */ 191 class CommandsSynchronyzer { 192 193 private int commandId; 194 195 private Hashtable<Integer, CommandPacket> commands; 196 197 private Hashtable<Integer, ReplyPacket> replies; 198 199 /** 200 * A default constructor. 201 */ 202 CommandsSynchronyzer() { 203 commands = new Hashtable<Integer, CommandPacket>(); 204 replies = new Hashtable<Integer, ReplyPacket>(); 205 206 // set first command id to 1 207 commandId = 1; 208 } 209 210 /** 211 * Gets the next new id for a command. 212 * 213 * @return int 214 */ 215 private synchronized int getNextId() { 216 return commandId++; 217 } 218 219 /** 220 * Notifies thread that reply packet was received. 221 * 222 * @param replyPacket 223 * instance of ReplyPacket 224 * @throws IOException 225 * @throws InterruptedException 226 */ 227 public void notifyThread(ReplyPacket replyPacket) throws IOException, 228 InterruptedException { 229 230 synchronized (commands) { 231 232 // obtain the current command id 233 Integer Id = new Integer(replyPacket.getId()); 234 235 // obtain the current command packet by command id 236 CommandPacket command = (CommandPacket) commands.remove(Id); 237 if (command == null) { 238 // we received reply's id that does not correspond to any 239 // command 240 throw new IOException( 241 "Reply id is corresponded to no command. Id = " 242 + Id); 243 } 244 245 synchronized (command) { 246 // put the reply in replies queue 247 synchronized (replies) { 248 replies.put(new Integer(replyPacket.getId()), 249 replyPacket); 250 } 251 // notify waiting thread 252 command.notifyAll(); 253 } 254 } 255 } 256 257 /** 258 * Sends command and waits for the reply during timeout. 259 * 260 * @param command 261 * instance of CommandPacket 262 * @param timeout 263 * reply wait timeout 264 * @return 265 * @throws TimeoutException 266 * if no reply was received 267 */ 268 public ReplyPacket waitForReply(CommandPacket command, long timeout) 269 throws InterruptedException, IOException { 270 271 synchronized (command) { 272 273 // if connection is already closed reraise the exception 274 if (connectionException != null) 275 throw connectionException; 276 277 // obtain new command id 278 Integer Id = new Integer(getNextId()); 279 command.setId(Id.intValue()); 280 281 // add command into commands hashtable 282 synchronized (commands) { 283 commands.put(Id, command); 284 285 // below is trace for sent coomasnds 286 if (commandsNumberForTrace > 0) { 287 int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace 288 : 1; 289 if (Id.intValue() >= begCommandId) { 290 if ((Id.intValue() - begCommandId) < commandsNumberForTrace) { 291 logWriter 292 .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = " 293 + Id.intValue() 294 + "; CommandSet = " 295 + command.getCommandSet() 296 + "; Command = " 297 + command.getCommand() + "..."); 298 } 299 } 300 } 301 302 // write this package to connection 303 connection.writePacket(command.toBytesArray()); 304 } 305 306 // if connection is already closed reraise the exception 307 if (connectionException != null) 308 throw connectionException; 309 310 // wait for reply 311 command.wait(timeout); 312 313 // receive the reply 314 ReplyPacket currentReply = null; 315 synchronized (replies) { 316 currentReply = (ReplyPacket) replies.remove(Id); 317 } 318 319 // if reply is ok, return it 320 if (currentReply != null) { 321 return currentReply; 322 } 323 324 // if connection is already closed reraise the exception 325 if (connectionException != null) 326 throw connectionException; 327 } 328 329 // no event was occurred during timeout 330 throw new TimeoutException(false); 331 } 332 333 /** 334 * Sends command without waiting for the reply and returns id of the 335 * sent command. 336 * 337 * @param command 338 * instance of CommandPacket 339 * @return command id 340 * @throws IOException 341 */ 342 public int sendCommand(CommandPacket command) throws IOException { 343 344 // if connection is already closed reraise the exception 345 if (connectionException != null) 346 throw connectionException; 347 348 // obtain new command id 349 Integer Id = new Integer(getNextId()); 350 command.setId(Id.intValue()); 351 352 // add command into commands hashtable 353 synchronized (commands) { 354 commands.put(Id, command); 355 356 // below is trace for sent coomasnds 357 if (commandsNumberForTrace > 0) { 358 int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace 359 : 1; 360 if (Id.intValue() >= begCommandId) { 361 if ((Id.intValue() - begCommandId) < commandsNumberForTrace) { 362 logWriter 363 .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = " 364 + Id.intValue() 365 + "; CommandSet = " 366 + command.getCommandSet() 367 + "; Command = " 368 + command.getCommand() + "..."); 369 } 370 } 371 } 372 373 // write this package to connection 374 connection.writePacket(command.toBytesArray()); 375 } 376 377 // if connection is already closed reraise the exception 378 if (connectionException != null) { 379 throw connectionException; 380 } 381 382 return Id.intValue(); 383 384 } 385 386 /** 387 * Receives the reply during timeout for command with specified command 388 * ID. 389 * 390 * @param commandId 391 * id of previously sent commend 392 * @param timeout 393 * receive timeout 394 * @return received ReplyPacket 395 * @throws TimeoutException 396 * if no reply was received 397 */ 398 public ReplyPacket receiveReply(int commandId, long timeout) 399 throws InterruptedException, IOException { 400 401 // if connection is already closed reraise the exception 402 if (connectionException != null) 403 throw connectionException; 404 405 // receive the reply 406 ReplyPacket currentReply = null; 407 long endTimeMlsecForWait = System.currentTimeMillis() + timeout; 408 synchronized (replies) { 409 while (true) { 410 currentReply = (ReplyPacket) replies.remove(new Integer( 411 commandId)); 412 // if reply is ok, return it 413 if (currentReply != null) { 414 return currentReply; 415 } 416 // if connection is already closed reraise the exception 417 if (connectionException != null) { 418 throw connectionException; 419 } 420 if (System.currentTimeMillis() >= endTimeMlsecForWait) { 421 break; 422 } 423 replies.wait(100); 424 } 425 } 426 // no expected reply was found during timeout 427 throw new TimeoutException(false); 428 } 429 430 /** 431 * This method is called when connection is closed. It notifies all the 432 * waiting threads. 433 * 434 */ 435 public void terminate() { 436 437 synchronized (commands) { 438 // enumerate all waiting commands 439 for (Enumeration en = commands.keys(); en.hasMoreElements();) { 440 CommandPacket command = (CommandPacket) commands.get(en 441 .nextElement()); 442 synchronized (command) { 443 // notify the waiting object 444 command.notifyAll(); 445 } 446 } 447 } 448 } 449 } 450 451 /** Transport which is used to sent and receive packets. */ 452 private TransportWrapper connection; 453 454 /** Current test run configuration. */ 455 TestOptions config; 456 457 private CommandsSynchronyzer commandsSynchronyzer; 458 459 private EventsSynchronyzer eventsSynchronyzer; 460 461 private LogWriter logWriter; 462 463 private IOException connectionException; 464 465 /** 466 * Creates new PacketDispatcher instance. 467 * 468 * @param connection 469 * open connection for reading and writing packets 470 * @param config 471 * test run options 472 * @param logWriter 473 * LogWriter object 474 */ 475 public PacketDispatcher(TransportWrapper connection, TestOptions config, 476 LogWriter logWriter) { 477 478 this.connection = connection; 479 this.config = config; 480 this.logWriter = logWriter; 481 482 commandsSynchronyzer = new CommandsSynchronyzer(); 483 eventsSynchronyzer = new EventsSynchronyzer(); 484 485 // make thread daemon 486 setDaemon(true); 487 488 // start the thread 489 start(); 490 } 491 492 /** 493 * Reads packets from connection and dispatches them between waiting 494 * threads. 495 */ 496 public void run() { 497 498 connectionException = null; 499 500 try { 501 // start listening for replies 502 while (!isInterrupted()) { 503 504 // read packet from transport 505 byte[] packet = connection.readPacket(); 506 507 // break cycle if empty packet 508 if (packet == null || packet.length == 0) 509 break; 510 511 // check flags 512 if (packet.length < Packet.FLAGS_INDEX) { 513 logWriter 514 .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet size = " 515 + packet.length); 516 } else { 517 int flag = packet[Packet.FLAGS_INDEX] & 0xFF; 518 if (flag != 0) { 519 if (flag != Packet.REPLY_PACKET_FLAG) { 520 logWriter 521 .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet flags = " 522 + Integer.toHexString(flag)); 523 } 524 } 525 } 526 527 // check the reply flag 528 if (Packet.isReply(packet)) { 529 // new reply 530 ReplyPacket replyPacket = new ReplyPacket(packet); 531 532 // check for received reply packet length 533 int packetLength = replyPacket.getLength(); 534 if (packetLength < Packet.HEADER_SIZE) { 535 logWriter 536 .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet length = " 537 + packetLength); 538 } 539 540 // below is trace for received coomasnds 541 if (commandsNumberForTrace > 0) { 542 int replyID = replyPacket.getId(); 543 int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace 544 : 1; 545 if (replyID >= begCommandId) { 546 if ((replyID - begCommandId) < commandsNumberForTrace) { 547 logWriter 548 .println(">>>>>>>>>> PacketDispatcher: Received REPLY ID = " 549 + replyID); 550 } 551 } 552 } 553 554 commandsSynchronyzer.notifyThread(replyPacket); 555 } else { 556 // new event 557 EventPacket eventPacket = new EventPacket(packet); 558 // below is to check received events for correctness 559 560 // below is trace for received events 561 ParsedEvent[] parsedEvents = ParsedEvent 562 .parseEventPacket(eventPacket); 563 if ((eventRequestIDForTrace >= 0) 564 || (eventKindForTrace > 0)) { 565 for (int i = 0; i < parsedEvents.length; i++) { 566 boolean trace = false; 567 int eventRequestID = parsedEvents[i].getRequestID(); 568 if (eventRequestIDForTrace == 0) { 569 trace = true; 570 } else { 571 if (eventRequestID == eventRequestIDForTrace) { 572 trace = true; 573 } 574 } 575 byte eventKind = parsedEvents[i].getEventKind(); 576 if (eventKind == eventKindForTrace) { 577 trace = true; 578 } 579 if (trace) { 580 logWriter 581 .println(">>>>>>>>>> PacketDispatcher: Received_EVENT[" 582 + i 583 + "]: eventRequestID= " 584 + eventRequestID 585 + "; eventKind = " 586 + eventKind 587 + "(" 588 + JDWPConstants.EventKind 589 .getName(eventKind) 590 + ")"); 591 } 592 } 593 } 594 eventsSynchronyzer.notifyThread(eventPacket); 595 } 596 } 597 598 // this exception is send for all waiting threads 599 connectionException = new TimeoutException(true); 600 } catch (IOException e) { 601 // connection exception is send for all waiting threads 602 connectionException = e; 603 604 // print stack trace 605 e.printStackTrace(); 606 } catch (InterruptedException e) { 607 // connection exception is send for all waiting threads 608 connectionException = new InterruptedIOException(e.getMessage()); 609 connectionException.initCause(e); 610 611 // print stack trace 612 e.printStackTrace(); 613 } 614 615 // notify all the waiting threads 616 eventsSynchronyzer.terminate(); 617 commandsSynchronyzer.terminate(); 618 } 619 620 /** 621 * Receives event from event queue if there are any events or waits during 622 * timeout for any event occurrence. This method should not be used 623 * simultaneously from different threads. If there were no reply during the 624 * timeout, TimeoutException is thrown. 625 * 626 * @param timeout 627 * timeout in milliseconds 628 * @return received event packet 629 * @throws IOException 630 * is any connection error occurred 631 * @throws InterruptedException 632 * if reading packet was interrupted 633 * @throws TimeoutException 634 * if timeout exceeded 635 */ 636 public EventPacket receiveEvent(long timeout) throws IOException, 637 InterruptedException, TimeoutException { 638 639 return eventsSynchronyzer.waitForNextEvent(timeout); 640 } 641 642 /** 643 * Sends JDWP command packet and waits for reply packet during default 644 * timeout. If there were no reply packet during the timeout, 645 * TimeoutException is thrown. 646 * 647 * @return received reply packet 648 * @throws InterruptedException 649 * if reading packet was interrupted 650 * @throws IOException 651 * if any connection error occurred 652 * @throws TimeoutException 653 * if timeout exceeded 654 */ 655 public ReplyPacket performCommand(CommandPacket command) 656 throws InterruptedException, IOException, TimeoutException { 657 658 return performCommand(command, config.getTimeout()); 659 } 660 661 /** 662 * Sends JDWP command packet and waits for reply packet during certain 663 * timeout. If there were no reply packet during the timeout, 664 * TimeoutException is thrown. 665 * 666 * @param command 667 * command packet to send 668 * @param timeout 669 * timeout in milliseconds 670 * @return received reply packet 671 * @throws InterruptedException 672 * if packet reading was interrupted 673 * @throws IOException 674 * if any connection error occurred 675 * @throws TimeoutException 676 * if timeout exceeded 677 */ 678 public ReplyPacket performCommand(CommandPacket command, long timeout) 679 throws InterruptedException, IOException, TimeoutException { 680 681 return commandsSynchronyzer.waitForReply(command, timeout); 682 } 683 684 /** 685 * Sends CommandPacket to debuggee VM without waiting for the reply. This 686 * method is intended for special cases when there is need to divide 687 * command's performing into two actions: command's sending and receiving 688 * reply (e.g. for asynchronous JDWP commands' testing). After this method 689 * the 'receiveReply()' method must be used latter for receiving reply for 690 * sent command. It is NOT recommended to use this method for usual cases - 691 * 'performCommand()' method must be used. 692 * 693 * @param command 694 * Command packet to be sent 695 * @return command ID of sent command 696 * @throws IOException 697 * if any connection error occurred 698 */ 699 public int sendCommand(CommandPacket command) throws IOException { 700 return commandsSynchronyzer.sendCommand(command); 701 } 702 703 /** 704 * Waits for reply for command which was sent before by 'sendCommand()' 705 * method. Specified timeout is used as time limit for waiting. This method 706 * (jointly with 'sendCommand()') is intended for special cases when there 707 * is need to divide command's performing into two actions: command's 708 * sending and receiving reply (e.g. for asynchronous JDWP commands' 709 * testing). It is NOT recommended to use 'sendCommand()- receiveReply()' 710 * pair for usual cases - 'performCommand()' method must be used. 711 * 712 * @param commandId 713 * Command ID of sent before command, reply from which is 714 * expected to be received 715 * @param timeout 716 * Specified timeout in milliseconds to wait for reply 717 * @return received ReplyPacket 718 * @throws IOException 719 * if any connection error occurred 720 * @throws InterruptedException 721 * if reply packet's waiting was interrupted 722 * @throws TimeoutException 723 * if timeout exceeded 724 */ 725 public ReplyPacket receiveReply(int commandId, long timeout) 726 throws InterruptedException, IOException, TimeoutException { 727 return commandsSynchronyzer.receiveReply(commandId, timeout); 728 } 729 730 } 731