Home | History | Annotate | Download | only in filetransfer
      1 /**
      2  * $RCSfile$
      3  * $Revision$
      4  * $Date$
      5  *
      6  * Copyright 2003-2006 Jive Software.
      7  *
      8  * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
      9  * you may not use this file except in compliance with the License.
     10  * You may obtain a copy of the License at
     11  *
     12  *     http://www.apache.org/licenses/LICENSE-2.0
     13  *
     14  * Unless required by applicable law or agreed to in writing, software
     15  * distributed under the License is distributed on an "AS IS" BASIS,
     16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     17  * See the License for the specific language governing permissions and
     18  * limitations under the License.
     19  */
     20 package org.jivesoftware.smackx.filetransfer;
     21 
     22 import org.jivesoftware.smack.PacketCollector;
     23 import org.jivesoftware.smack.SmackConfiguration;
     24 import org.jivesoftware.smack.Connection;
     25 import org.jivesoftware.smack.XMPPException;
     26 import org.jivesoftware.smack.filter.OrFilter;
     27 import org.jivesoftware.smack.filter.PacketFilter;
     28 import org.jivesoftware.smack.packet.Packet;
     29 import org.jivesoftware.smackx.packet.StreamInitiation;
     30 
     31 import java.io.InputStream;
     32 import java.io.OutputStream;
     33 import java.util.concurrent.*;
     34 import java.util.List;
     35 import java.util.ArrayList;
     36 
     37 
     38 /**
     39  * The fault tolerant negotiator takes two stream negotiators, the primary and the secondary
     40  * negotiator. If the primary negotiator fails during the stream negotiaton process, the second
     41  * negotiator is used.
     42  */
     43 public class FaultTolerantNegotiator extends StreamNegotiator {
     44 
     45     private StreamNegotiator primaryNegotiator;
     46     private StreamNegotiator secondaryNegotiator;
     47     private Connection connection;
     48     private PacketFilter primaryFilter;
     49     private PacketFilter secondaryFilter;
     50 
     51     public FaultTolerantNegotiator(Connection connection, StreamNegotiator primary,
     52             StreamNegotiator secondary) {
     53         this.primaryNegotiator = primary;
     54         this.secondaryNegotiator = secondary;
     55         this.connection = connection;
     56     }
     57 
     58     public PacketFilter getInitiationPacketFilter(String from, String streamID) {
     59         if (primaryFilter == null || secondaryFilter == null) {
     60             primaryFilter = primaryNegotiator.getInitiationPacketFilter(from, streamID);
     61             secondaryFilter = secondaryNegotiator.getInitiationPacketFilter(from, streamID);
     62         }
     63         return new OrFilter(primaryFilter, secondaryFilter);
     64     }
     65 
     66     InputStream negotiateIncomingStream(Packet streamInitiation) throws XMPPException {
     67         throw new UnsupportedOperationException("Negotiation only handled by create incoming " +
     68                 "stream method.");
     69     }
     70 
     71     final Packet initiateIncomingStream(Connection connection, StreamInitiation initiation) {
     72         throw new UnsupportedOperationException("Initiation handled by createIncomingStream " +
     73                 "method");
     74     }
     75 
     76     public InputStream createIncomingStream(StreamInitiation initiation) throws XMPPException {
     77         PacketCollector collector = connection.createPacketCollector(
     78                 getInitiationPacketFilter(initiation.getFrom(), initiation.getSessionID()));
     79 
     80         connection.sendPacket(super.createInitiationAccept(initiation, getNamespaces()));
     81 
     82         ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
     83         CompletionService<InputStream> service
     84                 = new ExecutorCompletionService<InputStream>(threadPoolExecutor);
     85         List<Future<InputStream>> futures = new ArrayList<Future<InputStream>>();
     86         InputStream stream = null;
     87         XMPPException exception = null;
     88         try {
     89             futures.add(service.submit(new NegotiatorService(collector)));
     90             futures.add(service.submit(new NegotiatorService(collector)));
     91 
     92             int i = 0;
     93             while (stream == null && i < futures.size()) {
     94                 Future<InputStream> future;
     95                 try {
     96                     i++;
     97                     future = service.poll(10, TimeUnit.SECONDS);
     98                 }
     99                 catch (InterruptedException e) {
    100                     continue;
    101                 }
    102 
    103                 if (future == null) {
    104                     continue;
    105                 }
    106 
    107                 try {
    108                     stream = future.get();
    109                 }
    110                 catch (InterruptedException e) {
    111                     /* Do Nothing */
    112                 }
    113                 catch (ExecutionException e) {
    114                     exception = new XMPPException(e.getCause());
    115                 }
    116             }
    117         }
    118         finally {
    119             for (Future<InputStream> future : futures) {
    120                 future.cancel(true);
    121             }
    122             collector.cancel();
    123             threadPoolExecutor.shutdownNow();
    124         }
    125         if (stream == null) {
    126             if (exception != null) {
    127                 throw exception;
    128             }
    129             else {
    130                 throw new XMPPException("File transfer negotiation failed.");
    131             }
    132         }
    133 
    134         return stream;
    135     }
    136 
    137     private StreamNegotiator determineNegotiator(Packet streamInitiation) {
    138         return primaryFilter.accept(streamInitiation) ? primaryNegotiator : secondaryNegotiator;
    139     }
    140 
    141     public OutputStream createOutgoingStream(String streamID, String initiator, String target)
    142             throws XMPPException {
    143         OutputStream stream;
    144         try {
    145             stream = primaryNegotiator.createOutgoingStream(streamID, initiator, target);
    146         }
    147         catch (XMPPException ex) {
    148             stream = secondaryNegotiator.createOutgoingStream(streamID, initiator, target);
    149         }
    150 
    151         return stream;
    152     }
    153 
    154     public String[] getNamespaces() {
    155         String[] primary = primaryNegotiator.getNamespaces();
    156         String[] secondary = secondaryNegotiator.getNamespaces();
    157 
    158         String[] namespaces = new String[primary.length + secondary.length];
    159         System.arraycopy(primary, 0, namespaces, 0, primary.length);
    160         System.arraycopy(secondary, 0, namespaces, primary.length, secondary.length);
    161 
    162         return namespaces;
    163     }
    164 
    165     public void cleanup() {
    166     }
    167 
    168     private class NegotiatorService implements Callable<InputStream> {
    169 
    170         private PacketCollector collector;
    171 
    172         NegotiatorService(PacketCollector collector) {
    173             this.collector = collector;
    174         }
    175 
    176         public InputStream call() throws Exception {
    177             Packet streamInitiation = collector.nextResult(
    178                     SmackConfiguration.getPacketReplyTimeout() * 2);
    179             if (streamInitiation == null) {
    180                 throw new XMPPException("No response from remote client");
    181             }
    182             StreamNegotiator negotiator = determineNegotiator(streamInitiation);
    183             return negotiator.negotiateIncomingStream(streamInitiation);
    184         }
    185     }
    186 }
    187