1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 /** 20 * @author Anton V. Karnachuk 21 */ 22 23 /** 24 * Created on 16.03.2005 25 */ 26 package org.apache.harmony.jpda.tests.framework.jdwp; 27 28 import java.util.List; 29 import java.io.IOException; 30 import java.io.InterruptedIOException; 31 import java.util.ArrayList; 32 import java.util.Hashtable; 33 34 import org.apache.harmony.jpda.tests.framework.LogWriter; 35 import org.apache.harmony.jpda.tests.framework.TestOptions; 36 import org.apache.harmony.jpda.tests.framework.jdwp.exceptions.TimeoutException; 37 38 /** 39 * This class provides asynchronous sending JDWP commands and receiving JDWP 40 * events through established JDWP connection and supports timeout for these 41 * operations. 42 */ 43 public class PacketDispatcher extends Thread { 44 45 /** 46 * Variables below are intended only to help with tests failures 47 * investigation. They turn on/off some kinds of trace during 48 * tests execution which can clear up details of test failure. 49 * 50 * commandsNumberForTrace and begCommandIdForTrace define trace 51 * of sent JDWP commands and received replies for these commands: 52 * - begCommandIdForTrace defines starting command ID for trace 53 * (the first command has ID=1, the second - ID=2 and so on). 54 * if <= 0 then the same as = 1. 55 * - commandsNumberForTrace defines number of command for trace. 56 * if <= 0 then commands' trace is off. 57 * 58 * - eventRequestIDForTrace defines trace of received events 59 * according to request ID value: 60 * if < 0 then this trace is off; 61 * if = 0 then trace is for all received events; 62 * if > 0 then trace is for received events, which are triggered 63 * by this specified request ID value; 64 * 65 * - eventKindForTrace defines trace of received events 66 * according to this specified kind of event. 67 * if = 0 then this trace is off; 68 * See JDWPConstants.EventKind class for values of 69 * event kinds. 70 */ 71 int begCommandIdForTrace = 1; 72 73 int commandsNumberForTrace = 0; 74 75 int eventRequestIDForTrace = -1; 76 77 byte eventKindForTrace = 0; 78 79 /** 80 * Internal class to synchronize jdwp events. When an event is received it 81 * is stored in eventQueue. If there are any thread that waits for event it 82 * is notified. 83 */ 84 private class EventsSynchronyzer { 85 86 /** 87 * List of received events. 88 */ 89 private List<EventPacket> eventQueue; 90 91 /** 92 * A default constructor. 93 */ 94 EventsSynchronyzer() { 95 // initialize eventQueue 96 eventQueue = new ArrayList<EventPacket>(); 97 } 98 99 /** 100 * Notifies thread that the new event has been received. 101 * 102 * @param eventPacket 103 * instance of EventPacket 104 * @throws InterruptedException 105 */ 106 public void notifyThread(EventPacket eventPacket) 107 throws InterruptedException { 108 109 // use this object as lock 110 synchronized (this) { 111 // add the event to eventQueue 112 eventQueue.add(eventPacket); 113 114 // notify next waiting thread 115 this.notify(); 116 } 117 } 118 119 /** 120 * Waits for new event during timeout. 121 * 122 * @param timeout 123 * wait timeout 124 * @return EventPacket 125 * @throws InterruptedException 126 * @throws IOException 127 * @throws TimeoutException 128 * if no event was received 129 */ 130 public EventPacket waitForNextEvent(long timeout) 131 throws InterruptedException, IOException { 132 133 // use this object as lock 134 synchronized (this) { 135 136 // if there is already received event in eventQueue, 137 // then return it 138 synchronized (eventQueue) { 139 if (!eventQueue.isEmpty()) { 140 return eventQueue.remove(0); 141 } 142 143 // if eventQueue is empty and connection is already closed 144 // reraise the exception 145 if (connectionException != null) 146 throw connectionException; 147 } 148 149 // wait for the next event 150 this.wait(timeout); 151 152 // We have the following opportunities here - 153 // next event was received, exception in main cyrcle or timeout 154 // happens 155 synchronized (eventQueue) { 156 if (!eventQueue.isEmpty()) { 157 // event received 158 EventPacket event = eventQueue.remove(0); 159 return event; 160 } 161 162 if (connectionException != null) { 163 // if eventQueue is empty and connection is already 164 // closed 165 // reraise the exception 166 throw connectionException; 167 } 168 } 169 } 170 171 // no events were occurred during timeout 172 throw new TimeoutException(false); 173 } 174 175 /** 176 * This method is called when connection is closed. It notifies all the 177 * waiting threads. 178 */ 179 public void terminate() { 180 synchronized (this) { 181 this.notifyAll(); 182 } 183 } 184 } 185 186 /** 187 * Internal class to synchronize jdwp commands. It sends command packets 188 * through connection and returns replies. 189 */ 190 class CommandsSynchronyzer { 191 192 private int commandId; 193 194 private Hashtable<Integer, CommandPacket> commands; 195 196 private Hashtable<Integer, ReplyPacket> replies; 197 198 /** 199 * A default constructor. 200 */ 201 CommandsSynchronyzer() { 202 commands = new Hashtable<Integer, CommandPacket>(); 203 replies = new Hashtable<Integer, ReplyPacket>(); 204 205 // set first command id to 1 206 commandId = 1; 207 } 208 209 /** 210 * Gets the next new id for a command. 211 * 212 * @return int 213 */ 214 private synchronized int getNextId() { 215 return commandId++; 216 } 217 218 /** 219 * Notifies thread that reply packet was received. 220 * 221 * @param replyPacket 222 * instance of ReplyPacket 223 * @throws IOException 224 * @throws InterruptedException 225 */ 226 public void notifyThread(ReplyPacket replyPacket) throws IOException, 227 InterruptedException { 228 229 synchronized (commands) { 230 231 // obtain the current command id 232 Integer Id = new Integer(replyPacket.getId()); 233 234 // obtain the current command packet by command id 235 CommandPacket command = commands.remove(Id); 236 if (command == null) { 237 // we received reply's id that does not correspond to any 238 // command 239 throw new IOException( 240 "Reply id is corresponded to no command. Id = " 241 + Id); 242 } 243 244 synchronized (command) { 245 // put the reply in replies queue 246 synchronized (replies) { 247 replies.put(new Integer(replyPacket.getId()), 248 replyPacket); 249 } 250 // notify waiting thread 251 command.notifyAll(); 252 } 253 } 254 } 255 256 /** 257 * Sends command and waits for the reply during timeout. 258 * 259 * @param command 260 * instance of CommandPacket 261 * @param timeout 262 * reply wait timeout 263 * @return 264 * a reply packet 265 * @throws TimeoutException 266 * if no reply was received 267 */ 268 public ReplyPacket waitForReply(CommandPacket command, long timeout) 269 throws InterruptedException, IOException { 270 271 synchronized (command) { 272 273 // if connection is already closed reraise the exception 274 if (connectionException != null) 275 throw connectionException; 276 277 // obtain new command id 278 Integer Id = new Integer(getNextId()); 279 command.setId(Id.intValue()); 280 281 // add command into commands hashtable 282 synchronized (commands) { 283 commands.put(Id, command); 284 285 // below is trace for sent coomasnds 286 if (commandsNumberForTrace > 0) { 287 int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace 288 : 1; 289 if (Id.intValue() >= begCommandId) { 290 if ((Id.intValue() - begCommandId) < commandsNumberForTrace) { 291 logWriter 292 .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = " 293 + Id.intValue() 294 + "; CommandSet = " 295 + command.getCommandSet() 296 + "; Command = " 297 + command.getCommand() + "..."); 298 } 299 } 300 } 301 302 // write this package to connection 303 connection.writePacket(command.toBytesArray()); 304 } 305 306 // if connection is already closed reraise the exception 307 if (connectionException != null) 308 throw connectionException; 309 310 // wait for reply 311 command.wait(timeout); 312 313 // receive the reply 314 ReplyPacket currentReply = null; 315 synchronized (replies) { 316 currentReply = replies.remove(Id); 317 } 318 319 // if reply is ok, return it 320 if (currentReply != null) { 321 return currentReply; 322 } 323 324 // if connection is already closed reraise the exception 325 if (connectionException != null) 326 throw connectionException; 327 } 328 329 // no event was occurred during timeout 330 throw new TimeoutException(false); 331 } 332 333 /** 334 * Sends command without waiting for the reply and returns id of the 335 * sent command. 336 * 337 * @param command 338 * instance of CommandPacket 339 * @return command id 340 * @throws IOException 341 */ 342 public int sendCommand(CommandPacket command) throws IOException { 343 344 // if connection is already closed reraise the exception 345 if (connectionException != null) 346 throw connectionException; 347 348 // obtain new command id 349 Integer Id = new Integer(getNextId()); 350 command.setId(Id.intValue()); 351 352 // add command into commands hashtable 353 synchronized (commands) { 354 commands.put(Id, command); 355 356 // below is trace for sent coomasnds 357 if (commandsNumberForTrace > 0) { 358 int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace 359 : 1; 360 if (Id.intValue() >= begCommandId) { 361 if ((Id.intValue() - begCommandId) < commandsNumberForTrace) { 362 logWriter 363 .println(">>>>>>>>>> PacketDispatcher: PERFORM command: ID = " 364 + Id.intValue() 365 + "; CommandSet = " 366 + command.getCommandSet() 367 + "; Command = " 368 + command.getCommand() + "..."); 369 } 370 } 371 } 372 373 // write this package to connection 374 connection.writePacket(command.toBytesArray()); 375 } 376 377 // if connection is already closed reraise the exception 378 if (connectionException != null) { 379 throw connectionException; 380 } 381 382 return Id.intValue(); 383 384 } 385 386 /** 387 * Receives the reply during timeout for command with specified command 388 * ID. 389 * 390 * @param commandId 391 * id of previously sent commend 392 * @param timeout 393 * receive timeout 394 * @return received ReplyPacket 395 * @throws TimeoutException 396 * if no reply was received 397 */ 398 public ReplyPacket receiveReply(int commandId, long timeout) 399 throws InterruptedException, IOException { 400 401 // if connection is already closed reraise the exception 402 if (connectionException != null) 403 throw connectionException; 404 405 // receive the reply 406 long endTimeMlsecForWait = System.currentTimeMillis() + timeout; 407 synchronized (replies) { 408 while (true) { 409 ReplyPacket currentReply = replies.remove(new Integer(commandId)); 410 // if reply is ok, return it 411 if (currentReply != null) { 412 return currentReply; 413 } 414 // if connection is already closed reraise the exception 415 if (connectionException != null) { 416 throw connectionException; 417 } 418 if (System.currentTimeMillis() >= endTimeMlsecForWait) { 419 break; 420 } 421 replies.wait(100); 422 } 423 } 424 // no expected reply was found during timeout 425 throw new TimeoutException(false); 426 } 427 428 /** 429 * This method is called when connection is closed. It notifies all the 430 * waiting threads. 431 * 432 */ 433 public void terminate() { 434 435 synchronized (commands) { 436 // enumerate all waiting commands 437 for (CommandPacket command : commands.values()) { 438 synchronized (command) { 439 // notify the waiting object 440 command.notifyAll(); 441 } 442 } 443 } 444 } 445 } 446 447 /** Transport which is used to sent and receive packets. */ 448 private TransportWrapper connection; 449 450 /** Current test run configuration. */ 451 TestOptions config; 452 453 private CommandsSynchronyzer commandsSynchronyzer; 454 455 private EventsSynchronyzer eventsSynchronyzer; 456 457 private LogWriter logWriter; 458 459 private IOException connectionException; 460 461 /** 462 * Creates new PacketDispatcher instance. 463 * 464 * @param connection 465 * open connection for reading and writing packets 466 * @param config 467 * test run options 468 * @param logWriter 469 * LogWriter object 470 */ 471 public PacketDispatcher(TransportWrapper connection, TestOptions config, 472 LogWriter logWriter) { 473 474 this.connection = connection; 475 this.config = config; 476 this.logWriter = logWriter; 477 478 commandsSynchronyzer = new CommandsSynchronyzer(); 479 eventsSynchronyzer = new EventsSynchronyzer(); 480 481 // make thread daemon 482 setDaemon(true); 483 484 // start the thread 485 start(); 486 } 487 488 /** 489 * Reads packets from connection and dispatches them between waiting 490 * threads. 491 */ 492 @Override 493 public void run() { 494 495 connectionException = null; 496 497 try { 498 // start listening for replies 499 while (!isInterrupted()) { 500 501 // read packet from transport 502 byte[] packet = connection.readPacket(); 503 504 // break cycle if empty packet 505 if (packet == null || packet.length == 0) 506 break; 507 508 // check flags 509 if (packet.length < Packet.FLAGS_INDEX) { 510 logWriter 511 .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet size = " 512 + packet.length); 513 } else { 514 int flag = packet[Packet.FLAGS_INDEX] & 0xFF; 515 if (flag != 0) { 516 if (flag != Packet.REPLY_PACKET_FLAG) { 517 logWriter 518 .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet flags = " 519 + Integer.toHexString(flag)); 520 } 521 } 522 } 523 524 // check the reply flag 525 if (Packet.isReply(packet)) { 526 // new reply 527 ReplyPacket replyPacket = new ReplyPacket(packet); 528 529 // check for received reply packet length 530 int packetLength = replyPacket.getLength(); 531 if (packetLength < Packet.HEADER_SIZE) { 532 logWriter 533 .println(">>>>>>>>>> PacketDispatcher WARNING: WRONG received packet length = " 534 + packetLength); 535 } 536 537 // below is trace for received coomasnds 538 if (commandsNumberForTrace > 0) { 539 int replyID = replyPacket.getId(); 540 int begCommandId = begCommandIdForTrace > 1 ? begCommandIdForTrace 541 : 1; 542 if (replyID >= begCommandId) { 543 if ((replyID - begCommandId) < commandsNumberForTrace) { 544 logWriter 545 .println(">>>>>>>>>> PacketDispatcher: Received REPLY ID = " 546 + replyID); 547 } 548 } 549 } 550 551 commandsSynchronyzer.notifyThread(replyPacket); 552 } else { 553 // new event 554 EventPacket eventPacket = new EventPacket(packet); 555 // below is to check received events for correctness 556 557 // below is trace for received events 558 ParsedEvent[] parsedEvents = ParsedEvent 559 .parseEventPacket(eventPacket); 560 if ((eventRequestIDForTrace >= 0) 561 || (eventKindForTrace > 0)) { 562 for (int i = 0; i < parsedEvents.length; i++) { 563 boolean trace = false; 564 int eventRequestID = parsedEvents[i].getRequestID(); 565 if (eventRequestIDForTrace == 0) { 566 trace = true; 567 } else { 568 if (eventRequestID == eventRequestIDForTrace) { 569 trace = true; 570 } 571 } 572 byte eventKind = parsedEvents[i].getEventKind(); 573 if (eventKind == eventKindForTrace) { 574 trace = true; 575 } 576 if (trace) { 577 logWriter 578 .println(">>>>>>>>>> PacketDispatcher: Received_EVENT[" 579 + i 580 + "]: eventRequestID= " 581 + eventRequestID 582 + "; eventKind = " 583 + eventKind 584 + "(" 585 + JDWPConstants.EventKind 586 .getName(eventKind) 587 + ")"); 588 } 589 } 590 } 591 eventsSynchronyzer.notifyThread(eventPacket); 592 } 593 } 594 595 // this exception is send for all waiting threads 596 connectionException = new TimeoutException(true); 597 } catch (IOException e) { 598 // connection exception is send for all waiting threads 599 connectionException = e; 600 601 // print stack trace 602 e.printStackTrace(); 603 } catch (InterruptedException e) { 604 // connection exception is send for all waiting threads 605 connectionException = new InterruptedIOException(e.getMessage()); 606 connectionException.initCause(e); 607 608 // print stack trace 609 e.printStackTrace(); 610 } 611 612 // notify all the waiting threads 613 eventsSynchronyzer.terminate(); 614 commandsSynchronyzer.terminate(); 615 } 616 617 /** 618 * Receives event from event queue if there are any events or waits during 619 * timeout for any event occurrence. This method should not be used 620 * simultaneously from different threads. If there were no reply during the 621 * timeout, TimeoutException is thrown. 622 * 623 * @param timeout 624 * timeout in milliseconds 625 * @return received event packet 626 * @throws IOException 627 * is any connection error occurred 628 * @throws InterruptedException 629 * if reading packet was interrupted 630 * @throws TimeoutException 631 * if timeout exceeded 632 */ 633 public EventPacket receiveEvent(long timeout) throws IOException, 634 InterruptedException, TimeoutException { 635 636 return eventsSynchronyzer.waitForNextEvent(timeout); 637 } 638 639 /** 640 * Sends JDWP command packet and waits for reply packet during default 641 * timeout. If there were no reply packet during the timeout, 642 * TimeoutException is thrown. 643 * 644 * @return received reply packet 645 * @throws InterruptedException 646 * if reading packet was interrupted 647 * @throws IOException 648 * if any connection error occurred 649 * @throws TimeoutException 650 * if timeout exceeded 651 */ 652 public ReplyPacket performCommand(CommandPacket command) 653 throws InterruptedException, IOException, TimeoutException { 654 655 return performCommand(command, config.getTimeout()); 656 } 657 658 /** 659 * Sends JDWP command packet and waits for reply packet during certain 660 * timeout. If there were no reply packet during the timeout, 661 * TimeoutException is thrown. 662 * 663 * @param command 664 * command packet to send 665 * @param timeout 666 * timeout in milliseconds 667 * @return received reply packet 668 * @throws InterruptedException 669 * if packet reading was interrupted 670 * @throws IOException 671 * if any connection error occurred 672 * @throws TimeoutException 673 * if timeout exceeded 674 */ 675 public ReplyPacket performCommand(CommandPacket command, long timeout) 676 throws InterruptedException, IOException, TimeoutException { 677 678 return commandsSynchronyzer.waitForReply(command, timeout); 679 } 680 681 /** 682 * Sends CommandPacket to debuggee VM without waiting for the reply. This 683 * method is intended for special cases when there is need to divide 684 * command's performing into two actions: command's sending and receiving 685 * reply (e.g. for asynchronous JDWP commands' testing). After this method 686 * the 'receiveReply()' method must be used latter for receiving reply for 687 * sent command. It is NOT recommended to use this method for usual cases - 688 * 'performCommand()' method must be used. 689 * 690 * @param command 691 * Command packet to be sent 692 * @return command ID of sent command 693 * @throws IOException 694 * if any connection error occurred 695 */ 696 public int sendCommand(CommandPacket command) throws IOException { 697 return commandsSynchronyzer.sendCommand(command); 698 } 699 700 /** 701 * Waits for reply for command which was sent before by 'sendCommand()' 702 * method. Specified timeout is used as time limit for waiting. This method 703 * (jointly with 'sendCommand()') is intended for special cases when there 704 * is need to divide command's performing into two actions: command's 705 * sending and receiving reply (e.g. for asynchronous JDWP commands' 706 * testing). It is NOT recommended to use 'sendCommand()- receiveReply()' 707 * pair for usual cases - 'performCommand()' method must be used. 708 * 709 * @param commandId 710 * Command ID of sent before command, reply from which is 711 * expected to be received 712 * @param timeout 713 * Specified timeout in milliseconds to wait for reply 714 * @return received ReplyPacket 715 * @throws IOException 716 * if any connection error occurred 717 * @throws InterruptedException 718 * if reply packet's waiting was interrupted 719 * @throws TimeoutException 720 * if timeout exceeded 721 */ 722 public ReplyPacket receiveReply(int commandId, long timeout) 723 throws InterruptedException, IOException, TimeoutException { 724 return commandsSynchronyzer.receiveReply(commandId, timeout); 725 } 726 727 } 728