Home | History | Annotate | Download | only in imps
      1 /*
      2  * Copyright (C) 2007-2008 Esmertec AG.
      3  * Copyright (C) 2007-2008 The Android Open Source Project
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *      http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 
     18 package com.android.im.imps;
     19 
     20 import java.io.ByteArrayInputStream;
     21 import java.io.ByteArrayOutputStream;
     22 import java.io.IOException;
     23 import java.io.InputStream;
     24 import java.net.HttpURLConnection;
     25 import java.net.URI;
     26 import java.net.URISyntaxException;
     27 import java.util.concurrent.LinkedBlockingQueue;
     28 import java.util.concurrent.atomic.AtomicBoolean;
     29 
     30 import org.apache.http.Header;
     31 import org.apache.http.HttpEntity;
     32 import org.apache.http.HttpResponse;
     33 import org.apache.http.StatusLine;
     34 import org.apache.http.client.methods.HttpPost;
     35 import org.apache.http.entity.ByteArrayEntity;
     36 import org.apache.http.message.BasicHeader;
     37 import org.apache.http.params.HttpConnectionParams;
     38 import org.apache.http.params.HttpParams;
     39 
     40 import android.net.http.AndroidHttpClient;
     41 import android.os.SystemClock;
     42 import android.util.Log;
     43 
     44 import com.android.im.engine.HeartbeatService;
     45 import com.android.im.engine.ImErrorInfo;
     46 import com.android.im.engine.ImException;
     47 import com.android.im.engine.SystemService;
     48 import com.android.im.imps.Primitive.TransactionMode;
     49 
     50 /**
     51  * The <code>HttpDataChannel</code> is an implementation of IMPS data channel
     52  * in which the protocol binding is HTTP.
     53  */
     54 class HttpDataChannel extends DataChannel implements Runnable, HeartbeatService.Callback {
     55 
     56     private static final int MAX_RETRY_COUNT = 10;
     57     private static final int INIT_RETRY_DELAY_MS = 5000;
     58     private static final int MAX_RETRY_DELAY_MS = 300 * 1000;
     59 
     60     private Thread mSendThread;
     61     private boolean mStopped;
     62     private boolean mSuspended;
     63     private boolean mConnected;
     64     private boolean mStopRetry;
     65     private Object mRetryLock = new Object();
     66     private LinkedBlockingQueue<Primitive> mSendQueue;
     67     private LinkedBlockingQueue<Primitive> mReceiveQueue;
     68 
     69     private long mLastActive;
     70     private long mKeepAliveMillis;
     71     private Primitive mKeepAlivePrimitive;
     72 
     73     private AtomicBoolean mHasPendingPolling = new AtomicBoolean(false);
     74 
     75     private final AndroidHttpClient mHttpClient;
     76     private final Header mContentTypeHeader;
     77     private final Header mMsisdnHeader;
     78     private URI mPostUri;
     79 
     80     private ImpsTransactionManager mTxManager;
     81 
     82     /**
     83      * Constructs a new HttpDataChannel for a connection.
     84      *
     85      * @param connection the connection which uses the data channel.
     86      */
     87     public HttpDataChannel(ImpsConnection connection) throws ImException {
     88         super(connection);
     89         mTxManager = connection.getTransactionManager();
     90         ImpsConnectionConfig cfg = connection.getConfig();
     91         try {
     92             String host = cfg.getHost();
     93             if (host == null || host.length() == 0) {
     94                 throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
     95                        "Empty host name.");
     96             }
     97             mPostUri = new URI(cfg.getHost());
     98             if (mPostUri.getPath() == null || "".equals(mPostUri.getPath())) {
     99                 mPostUri = new URI(cfg.getHost() + "/");
    100             }
    101             if (!"http".equalsIgnoreCase(mPostUri.getScheme())
    102                     && !"https".equalsIgnoreCase(mPostUri.getScheme())) {
    103                 throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
    104                         "Non HTTP/HTTPS host name.");
    105             }
    106 
    107             mHttpClient = AndroidHttpClient.newInstance("Android-Imps/0.1");
    108 
    109             HttpParams params = mHttpClient.getParams();
    110             HttpConnectionParams.setConnectionTimeout(params, cfg.getReplyTimeout());
    111             HttpConnectionParams.setSoTimeout(params, cfg.getReplyTimeout());
    112         } catch (URISyntaxException e) {
    113             throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
    114                     e.getLocalizedMessage());
    115         }
    116 
    117         mContentTypeHeader = new BasicHeader("Content-Type", cfg.getTransportContentType());
    118         String msisdn = cfg.getMsisdn();
    119         mMsisdnHeader = (msisdn != null) ? new BasicHeader("MSISDN", msisdn) : null;
    120 
    121         mParser = cfg.createPrimitiveParser();
    122         mSerializer = cfg.createPrimitiveSerializer();
    123     }
    124 
    125     @Override
    126     public void connect() throws ImException {
    127         if (mConnected) {
    128             throw new ImException("Already connected");
    129         }
    130         mStopped = false;
    131         mStopRetry = false;
    132 
    133         mSendQueue = new LinkedBlockingQueue<Primitive>();
    134         mReceiveQueue = new LinkedBlockingQueue<Primitive>();
    135 
    136         mSendThread = new Thread(this, "HttpDataChannel");
    137         mSendThread.setDaemon(true);
    138         mSendThread.start();
    139 
    140         mConnected = true;
    141     }
    142 
    143     @Override
    144     public void suspend() {
    145         mSuspended = true;
    146     }
    147 
    148     @Override
    149     public boolean resume() {
    150         long now = SystemClock.elapsedRealtime();
    151         if (now - mLastActive > mKeepAliveMillis) {
    152             shutdown();
    153             return false;
    154         } else {
    155             mSuspended = false;
    156 
    157             // Send a polling request after resume in case we missed some
    158             // updates while we are suspended.
    159             Primitive polling = new Primitive(ImpsTags.Polling_Request);
    160             polling.setSession(mConnection.getSession().getID());
    161             sendPrimitive(polling);
    162             startHeartbeat();
    163 
    164             return true;
    165         }
    166     }
    167 
    168     @Override
    169     public void shutdown() {
    170         HeartbeatService heartbeatService
    171             = SystemService.getDefault().getHeartbeatService();
    172         if (heartbeatService != null) {
    173             heartbeatService.stopHeartbeat(this);
    174         }
    175         // Stop the sending thread
    176         mStopped = true;
    177         mSendThread.interrupt();
    178         mConnected = false;
    179     }
    180 
    181     @Override
    182     public void sendPrimitive(Primitive p) {
    183         if (!mConnected || mStopped) {
    184             ImpsLog.log("DataChannel not connected, ignore primitive " + p.getType());
    185             return;
    186         }
    187 
    188         if (ImpsTags.Polling_Request.equals(p.getType())) {
    189             if (!mHasPendingPolling.compareAndSet(false, true)) {
    190                 ImpsLog.log("HttpDataChannel: Ignoring Polling-Request");
    191                 return;
    192             }
    193         } else if (ImpsTags.Logout_Request.equals(p.getType())) {
    194             mStopRetry = true;
    195             synchronized (mRetryLock) {
    196                 mRetryLock.notify();
    197             }
    198         }
    199         if (!mSendQueue.offer(p)) {
    200             // This is almost impossible for a LinkedBlockingQueue. We don't
    201             // even bother to assign an error code for this. ;)
    202             mTxManager.notifyErrorResponse(p.getTransactionID(),
    203                     ImErrorInfo.UNKNOWN_ERROR, "sending queue full");
    204         }
    205     }
    206 
    207     @Override
    208     public Primitive receivePrimitive() throws InterruptedException {
    209         if (!mConnected || mStopped) {
    210             throw new IllegalStateException();
    211         }
    212 
    213         return mReceiveQueue.take();
    214     }
    215 
    216     @Override
    217     public void startKeepAlive(long interval) {
    218         if (!mConnected || mStopped) {
    219             throw new IllegalStateException();
    220         }
    221 
    222         if (interval <= 0) {
    223             interval = mConnection.getConfig().getDefaultKeepAliveInterval();
    224         }
    225 
    226         mKeepAliveMillis = interval * 1000;
    227         if (mKeepAliveMillis < 0) {
    228             ImpsLog.log("Negative keep alive time. Won't send keep-alive");
    229         }
    230         mKeepAlivePrimitive = new Primitive(ImpsTags.KeepAlive_Request);
    231         startHeartbeat();
    232     }
    233 
    234     private void startHeartbeat() {
    235         HeartbeatService heartbeatService
    236             = SystemService.getDefault().getHeartbeatService();
    237         if (heartbeatService != null) {
    238             heartbeatService.startHeartbeat(this, mKeepAliveMillis);
    239         }
    240     }
    241 
    242     public long sendHeartbeat() {
    243         if (mSuspended) {
    244             return 0;
    245         }
    246 
    247         long inactiveTime = SystemClock.elapsedRealtime() - mLastActive;
    248         if (needSendKeepAlive(inactiveTime)) {
    249             sendKeepAlive();
    250             return mKeepAliveMillis;
    251         } else {
    252             return mKeepAliveMillis - inactiveTime;
    253         }
    254     }
    255 
    256     private boolean needSendKeepAlive(long inactiveTime) {
    257         return mKeepAliveMillis - inactiveTime <= 500;
    258     }
    259 
    260     @Override
    261     public long getLastActiveTime() {
    262         return mLastActive;
    263     }
    264 
    265     @Override
    266     public boolean isSendingQueueEmpty() {
    267         if (!mConnected || mStopped) {
    268             throw new IllegalStateException();
    269         }
    270         return mSendQueue.isEmpty();
    271     }
    272 
    273     public void run() {
    274         while (!mStopped) {
    275             try {
    276                 Primitive primitive = mSendQueue.take();
    277                 if (primitive.getType().equals(ImpsTags.Polling_Request)) {
    278                     mHasPendingPolling.set(false);
    279                 }
    280                 doSendPrimitive(primitive);
    281             } catch (InterruptedException e) {
    282             }
    283         }
    284         mHttpClient.close();
    285     }
    286 
    287     private void sendKeepAlive() {
    288         ImpsTransactionManager tm = mConnection.getTransactionManager();
    289         AsyncTransaction tx = new AsyncTransaction(tm) {
    290             @Override
    291             public void onResponseError(ImpsErrorInfo error) {
    292             }
    293 
    294             @Override
    295             public void onResponseOk(Primitive response) {
    296                 // Since we never request a new timeout value, the response
    297                 // can be ignored
    298             }
    299         };
    300         tx.sendRequest(mKeepAlivePrimitive);
    301     }
    302 
    303     /**
    304      * Sends a primitive to the IMPS server through HTTP.
    305      *
    306      * @param p The primitive to send.
    307      */
    308     private void doSendPrimitive(Primitive p) {
    309         String errorInfo = null;
    310         int retryCount = 0;
    311         long retryDelay = INIT_RETRY_DELAY_MS;
    312         while (retryCount < MAX_RETRY_COUNT) {
    313             try {
    314                 trySend(p);
    315                 return;
    316             } catch (IOException e) {
    317                 errorInfo = e.getLocalizedMessage();
    318                 String type = p.getType();
    319                 if (ImpsTags.Login_Request.equals(type)
    320                         || ImpsTags.Logout_Request.equals(type)) {
    321                     // we don't retry to send login/logout request. The request
    322                     // might be sent to the server successfully but we failed to
    323                     // get the response from the server. Retry in this case might
    324                     // cause multiple login which is not allowed by some server.
    325                     break;
    326                 }
    327                 if (p.getTransactionMode() == TransactionMode.Response) {
    328                     // Ignore the failure of sending response to the server since
    329                     // it's only an acknowledgment. When we get here, the
    330                     // primitive might have been sent successfully but failed to
    331                     // get the http response. The server might or might not send
    332                     // the request again if it does not receive the acknowledgment,
    333                     // the client is ok to either case.
    334                     return;
    335                 }
    336                 retryCount++;
    337                 // sleep for a while and retry to send the primitive in a new
    338                 // transaction if we havn't met the max retry count.
    339                 if (retryCount < MAX_RETRY_COUNT) {
    340                    mTxManager.reassignTransactionId(p);
    341                     Log.w(ImpsLog.TAG, "Send primitive failed, retry after " + retryDelay + "ms");
    342                     synchronized (mRetryLock) {
    343                         try {
    344                             mRetryLock.wait(retryDelay);
    345                         } catch (InterruptedException ignore) {
    346                         }
    347                         if (mStopRetry) {
    348                             break;
    349                         }
    350                     }
    351                     retryDelay = retryDelay * 2;
    352                     if (retryDelay > MAX_RETRY_DELAY_MS) {
    353                         retryDelay = MAX_RETRY_DELAY_MS;
    354                     }
    355                 }
    356             }
    357         }
    358         Log.w(ImpsLog.TAG, "Failed to send primitive after " + MAX_RETRY_COUNT + " retries");
    359         mTxManager.notifyErrorResponse(p.getTransactionID(),
    360                 ImErrorInfo.NETWORK_ERROR, errorInfo);
    361     }
    362 
    363     private void trySend(Primitive p) throws IOException {
    364         ByteArrayOutputStream out = new ByteArrayOutputStream();
    365         try {
    366             mSerializer.serialize(p, out);
    367         } catch (SerializerException e) {
    368             mTxManager.notifyErrorResponse(p.getTransactionID(),
    369                     ImErrorInfo.SERIALIZER_ERROR,
    370                     "Internal serializer error, primitive: " + p.getType());
    371             out.close();
    372             return;
    373         }
    374 
    375         HttpPost req = new HttpPost(mPostUri);
    376         req.addHeader(mContentTypeHeader);
    377         if (mMsisdnHeader != null) {
    378             req.addHeader(mMsisdnHeader);
    379         }
    380         ByteArrayEntity entity = new ByteArrayEntity(out.toByteArray());
    381         req.setEntity(entity);
    382 
    383         mLastActive = SystemClock.elapsedRealtime();
    384         if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
    385             long sendBytes = entity.getContentLength() + 176 /* approx. header length */;
    386             ImpsLog.log(mConnection.getLoginUserName() + " >> " + p.getType() + " HTTP payload approx. " + sendBytes + " bytes");
    387         }
    388         if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
    389             ImpsLog.dumpRawPacket(out.toByteArray());
    390             ImpsLog.dumpPrimitive(p);
    391         }
    392 
    393         HttpResponse res = mHttpClient.execute(req);
    394         StatusLine statusLine = res.getStatusLine();
    395         HttpEntity resEntity = res.getEntity();
    396 
    397         InputStream in = resEntity.getContent();
    398 
    399         if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
    400             Log.d(ImpsLog.PACKET_TAG, statusLine.toString());
    401             Header[] headers = res.getAllHeaders();
    402             for (Header h : headers) {
    403                 Log.d(ImpsLog.PACKET_TAG, h.toString());
    404             }
    405             int len = (int) resEntity.getContentLength();
    406             if (len > 0) {
    407                 byte[] content = new byte[len];
    408                 int offset = 0;
    409                 int bytesRead = 0;
    410                 do {
    411                     bytesRead = in.read(content, offset, len);
    412                     offset += bytesRead;
    413                     len -= bytesRead;
    414                 } while (bytesRead > 0);
    415                 in.close();
    416                 ImpsLog.dumpRawPacket(content);
    417                 in = new ByteArrayInputStream(content);
    418             }
    419         }
    420 
    421         try {
    422             if (statusLine.getStatusCode() != HttpURLConnection.HTTP_OK) {
    423                 mTxManager.notifyErrorResponse(p.getTransactionID(), statusLine.getStatusCode(),
    424                         statusLine.getReasonPhrase());
    425                 return;
    426             }
    427             if (resEntity.getContentLength() == 0) {
    428                 // empty responses are only valid for Polling-Request or
    429                 // server initiated transactions
    430                 if ((p.getTransactionMode() != TransactionMode.Response)
    431                         && !p.getType().equals(ImpsTags.Polling_Request)) {
    432                     mTxManager.notifyErrorResponse(p.getTransactionID(),
    433                             ImErrorInfo.ILLEGAL_SERVER_RESPONSE,
    434                             "bad response from server");
    435                 }
    436                 return;
    437             }
    438 
    439             Primitive response = mParser.parse(in);
    440 
    441             if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
    442                 ImpsLog.dumpPrimitive(response);
    443             }
    444 
    445             if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
    446                 long len = 2 + resEntity.getContentLength() + statusLine.toString().length() + 2;
    447                 Header[] headers = res.getAllHeaders();
    448                 for (Header header : headers) {
    449                     len += header.getName().length() + header.getValue().length() + 4;
    450                 }
    451                 ImpsLog.log(mConnection.getLoginUserName() + " << "
    452                         + response.getType() + " HTTP payload approx. " + len + "bytes");
    453             }
    454 
    455             if (!mReceiveQueue.offer(response)) {
    456                 // This is almost impossible for a LinkedBlockingQueue.
    457                 // We don't even bother to assign an error code for it.
    458                 mTxManager.notifyErrorResponse(p.getTransactionID(),
    459                         ImErrorInfo.UNKNOWN_ERROR, "receiving queue full");
    460             }
    461         } catch (ParserException e) {
    462             ImpsLog.logError(e);
    463             mTxManager.notifyErrorResponse(p.getTransactionID(),
    464                     ImErrorInfo.PARSER_ERROR,
    465                     "Parser error, received a bad response from server");
    466         } finally {
    467             //consume all the content so that the connection can be re-used.
    468             resEntity.consumeContent();
    469         }
    470     }
    471 
    472 }
    473