1 /** 2 * $RCSfile$ 3 * $Revision$ 4 * $Date$ 5 * 6 * Copyright 2003-2006 Jive Software. 7 * 8 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); 9 * you may not use this file except in compliance with the License. 10 * You may obtain a copy of the License at 11 * 12 * http://www.apache.org/licenses/LICENSE-2.0 13 * 14 * Unless required by applicable law or agreed to in writing, software 15 * distributed under the License is distributed on an "AS IS" BASIS, 16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17 * See the License for the specific language governing permissions and 18 * limitations under the License. 19 */ 20 package org.jivesoftware.smackx.filetransfer; 21 22 import org.jivesoftware.smack.PacketCollector; 23 import org.jivesoftware.smack.SmackConfiguration; 24 import org.jivesoftware.smack.Connection; 25 import org.jivesoftware.smack.XMPPException; 26 import org.jivesoftware.smack.filter.OrFilter; 27 import org.jivesoftware.smack.filter.PacketFilter; 28 import org.jivesoftware.smack.packet.Packet; 29 import org.jivesoftware.smackx.packet.StreamInitiation; 30 31 import java.io.InputStream; 32 import java.io.OutputStream; 33 import java.util.concurrent.*; 34 import java.util.List; 35 import java.util.ArrayList; 36 37 38 /** 39 * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary 40 * negotiator. If the primary negotiator fails during the stream negotiaton process, the second 41 * negotiator is used. 42 */ 43 public class FaultTolerantNegotiator extends StreamNegotiator { 44 45 private StreamNegotiator primaryNegotiator; 46 private StreamNegotiator secondaryNegotiator; 47 private Connection connection; 48 private PacketFilter primaryFilter; 49 private PacketFilter secondaryFilter; 50 51 public FaultTolerantNegotiator(Connection connection, StreamNegotiator primary, 52 StreamNegotiator secondary) { 53 this.primaryNegotiator = primary; 54 this.secondaryNegotiator = secondary; 55 this.connection = connection; 56 } 57 58 public PacketFilter getInitiationPacketFilter(String from, String streamID) { 59 if (primaryFilter == null || secondaryFilter == null) { 60 primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID); 61 secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID); 62 } 63 return new OrFilter(primaryFilter, secondaryFilter); 64 } 65 66 InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException { 67 throw new UnsupportedOperationException("Negotiation only handled by create incoming " + 68 "stream method."); 69 } 70 71 final Packet initiateIncomingStream(Connection connection, StreamInitiation initiation) { 72 throw new UnsupportedOperationException("Initiation handled by createIncomingStream " + 73 "method"); 74 } 75 76 public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException { 77 PacketCollector collector = connection.createPacketCollector( 78 getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID())); 79 80 connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces())); 81 82 ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2); 83 CompletionService<InputStream> service 84 = new ExecutorCompletionService<InputStream>(threadPoolExecutor); 85 List<Future<InputStream>> futures = new ArrayList<Future<InputStream>>(); 86 InputStream stream = null; 87 XMPPException exception = null; 88 try { 89 futures.add(service.submit(new NegotiatorService(collector))); 90 futures.add(service.submit(new NegotiatorService(collector))); 91 92 int i = 0; 93 while (stream == null && i < futures.size()) { 94 Future<InputStream> future; 95 try { 96 i++; 97 future = service.poll(10, TimeUnit.SECONDS); 98 } 99 catch (InterruptedException e) { 100 continue; 101 } 102 103 if (future == null) { 104 continue; 105 } 106 107 try { 108 stream = future.get(); 109 } 110 catch (InterruptedException e) { 111 /* Do Nothing */ 112 } 113 catch (ExecutionException e) { 114 exception = new XMPPException(e.getCause()); 115 } 116 } 117 } 118 finally { 119 for (Future<InputStream> future : futures) { 120 future.cancel(true); 121 } 122 collector.cancel(); 123 threadPoolExecutor.shutdownNow(); 124 } 125 if (stream == null) { 126 if (exception != null) { 127 throw exception; 128 } 129 else { 130 throw new XMPPException("File transfer negotiation failed."); 131 } 132 } 133 134 return stream; 135 } 136 137 private StreamNegotiator determineNegotiator(Packet streamInitiation) { 138 return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator; 139 } 140 141 public OutputStream createOutgoingStream(String streamID, String initiator, String target) 142 throws XMPPException { 143 OutputStream stream; 144 try { 145 stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target); 146 } 147 catch (XMPPException ex) { 148 stream = secondaryNegotiator.createOutgoingStream(streamID, initiator, target); 149 } 150 151 return stream; 152 } 153 154 public String[] getNamespaces() { 155 String[] primary = primaryNegotiator.getNamespaces(); 156 String[] secondary = secondaryNegotiator.getNamespaces(); 157 158 String[] namespaces = new String[primary.length + secondary.length]; 159 System.arraycopy(primary, 0, namespaces, 0, primary.length); 160 System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length); 161 162 return namespaces; 163 } 164 165 public void cleanup() { 166 } 167 168 private class NegotiatorService implements Callable<InputStream> { 169 170 private PacketCollector collector; 171 172 NegotiatorService(PacketCollector collector) { 173 this.collector = collector; 174 } 175 176 public InputStream call() throws Exception { 177 Packet streamInitiation = collector.nextResult( 178 SmackConfiguration.getPacketReplyTimeout() * 2); 179 if (streamInitiation == null) { 180 throw new XMPPException("No response from remote client"); 181 } 182 StreamNegotiator negotiator = determineNegotiator(streamInitiation); 183 return negotiator.negotiateIncomingStream(streamInitiation); 184 } 185 } 186 } 187