1 /* 2 * Copyright (C) 2007-2008 Esmertec AG. 3 * Copyright (C) 2007-2008 The Android Open Source Project 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package com.android.im.imps; 19 20 import java.io.ByteArrayInputStream; 21 import java.io.ByteArrayOutputStream; 22 import java.io.IOException; 23 import java.io.InputStream; 24 import java.net.HttpURLConnection; 25 import java.net.URI; 26 import java.net.URISyntaxException; 27 import java.util.concurrent.LinkedBlockingQueue; 28 import java.util.concurrent.atomic.AtomicBoolean; 29 30 import org.apache.http.Header; 31 import org.apache.http.HttpEntity; 32 import org.apache.http.HttpResponse; 33 import org.apache.http.StatusLine; 34 import org.apache.http.client.methods.HttpPost; 35 import org.apache.http.entity.ByteArrayEntity; 36 import org.apache.http.message.BasicHeader; 37 import org.apache.http.params.HttpConnectionParams; 38 import org.apache.http.params.HttpParams; 39 40 import android.net.http.AndroidHttpClient; 41 import android.os.SystemClock; 42 import android.util.Log; 43 44 import com.android.im.engine.HeartbeatService; 45 import com.android.im.engine.ImErrorInfo; 46 import com.android.im.engine.ImException; 47 import com.android.im.engine.SystemService; 48 import com.android.im.imps.Primitive.TransactionMode; 49 50 /** 51 * The <code>HttpDataChannel</code> is an implementation of IMPS data channel 52 * in which the protocol binding is HTTP. 53 */ 54 class HttpDataChannel extends DataChannel implements Runnable, HeartbeatService.Callback { 55 56 private static final int MAX_RETRY_COUNT = 10; 57 private static final int INIT_RETRY_DELAY_MS = 5000; 58 private static final int MAX_RETRY_DELAY_MS = 300 * 1000; 59 60 private Thread mSendThread; 61 private boolean mStopped; 62 private boolean mSuspended; 63 private boolean mConnected; 64 private boolean mStopRetry; 65 private Object mRetryLock = new Object(); 66 private LinkedBlockingQueue<Primitive> mSendQueue; 67 private LinkedBlockingQueue<Primitive> mReceiveQueue; 68 69 private long mLastActive; 70 private long mKeepAliveMillis; 71 private Primitive mKeepAlivePrimitive; 72 73 private AtomicBoolean mHasPendingPolling = new AtomicBoolean(false); 74 75 private final AndroidHttpClient mHttpClient; 76 private final Header mContentTypeHeader; 77 private final Header mMsisdnHeader; 78 private URI mPostUri; 79 80 private ImpsTransactionManager mTxManager; 81 82 /** 83 * Constructs a new HttpDataChannel for a connection. 84 * 85 * @param connection the connection which uses the data channel. 86 */ 87 public HttpDataChannel(ImpsConnection connection) throws ImException { 88 super(connection); 89 mTxManager = connection.getTransactionManager(); 90 ImpsConnectionConfig cfg = connection.getConfig(); 91 try { 92 String host = cfg.getHost(); 93 if (host == null || host.length() == 0) { 94 throw new ImException(ImErrorInfo.INVALID_HOST_NAME, 95 "Empty host name."); 96 } 97 mPostUri = new URI(cfg.getHost()); 98 if (mPostUri.getPath() == null || "".equals(mPostUri.getPath())) { 99 mPostUri = new URI(cfg.getHost() + "/"); 100 } 101 if (!"http".equalsIgnoreCase(mPostUri.getScheme()) 102 && !"https".equalsIgnoreCase(mPostUri.getScheme())) { 103 throw new ImException(ImErrorInfo.INVALID_HOST_NAME, 104 "Non HTTP/HTTPS host name."); 105 } 106 107 mHttpClient = AndroidHttpClient.newInstance("Android-Imps/0.1"); 108 109 HttpParams params = mHttpClient.getParams(); 110 HttpConnectionParams.setConnectionTimeout(params, cfg.getReplyTimeout()); 111 HttpConnectionParams.setSoTimeout(params, cfg.getReplyTimeout()); 112 } catch (URISyntaxException e) { 113 throw new ImException(ImErrorInfo.INVALID_HOST_NAME, 114 e.getLocalizedMessage()); 115 } 116 117 mContentTypeHeader = new BasicHeader("Content-Type", cfg.getTransportContentType()); 118 String msisdn = cfg.getMsisdn(); 119 mMsisdnHeader = (msisdn != null) ? new BasicHeader("MSISDN", msisdn) : null; 120 121 mParser = cfg.createPrimitiveParser(); 122 mSerializer = cfg.createPrimitiveSerializer(); 123 } 124 125 @Override 126 public void connect() throws ImException { 127 if (mConnected) { 128 throw new ImException("Already connected"); 129 } 130 mStopped = false; 131 mStopRetry = false; 132 133 mSendQueue = new LinkedBlockingQueue<Primitive>(); 134 mReceiveQueue = new LinkedBlockingQueue<Primitive>(); 135 136 mSendThread = new Thread(this, "HttpDataChannel"); 137 mSendThread.setDaemon(true); 138 mSendThread.start(); 139 140 mConnected = true; 141 } 142 143 @Override 144 public void suspend() { 145 mSuspended = true; 146 } 147 148 @Override 149 public boolean resume() { 150 long now = SystemClock.elapsedRealtime(); 151 if (now - mLastActive > mKeepAliveMillis) { 152 shutdown(); 153 return false; 154 } else { 155 mSuspended = false; 156 157 // Send a polling request after resume in case we missed some 158 // updates while we are suspended. 159 Primitive polling = new Primitive(ImpsTags.Polling_Request); 160 polling.setSession(mConnection.getSession().getID()); 161 sendPrimitive(polling); 162 startHeartbeat(); 163 164 return true; 165 } 166 } 167 168 @Override 169 public void shutdown() { 170 HeartbeatService heartbeatService 171 = SystemService.getDefault().getHeartbeatService(); 172 if (heartbeatService != null) { 173 heartbeatService.stopHeartbeat(this); 174 } 175 // Stop the sending thread 176 mStopped = true; 177 mSendThread.interrupt(); 178 mConnected = false; 179 } 180 181 @Override 182 public void sendPrimitive(Primitive p) { 183 if (!mConnected || mStopped) { 184 ImpsLog.log("DataChannel not connected, ignore primitive " + p.getType()); 185 return; 186 } 187 188 if (ImpsTags.Polling_Request.equals(p.getType())) { 189 if (!mHasPendingPolling.compareAndSet(false, true)) { 190 ImpsLog.log("HttpDataChannel: Ignoring Polling-Request"); 191 return; 192 } 193 } else if (ImpsTags.Logout_Request.equals(p.getType())) { 194 mStopRetry = true; 195 synchronized (mRetryLock) { 196 mRetryLock.notify(); 197 } 198 } 199 if (!mSendQueue.offer(p)) { 200 // This is almost impossible for a LinkedBlockingQueue. We don't 201 // even bother to assign an error code for this. ;) 202 mTxManager.notifyErrorResponse(p.getTransactionID(), 203 ImErrorInfo.UNKNOWN_ERROR, "sending queue full"); 204 } 205 } 206 207 @Override 208 public Primitive receivePrimitive() throws InterruptedException { 209 if (!mConnected || mStopped) { 210 throw new IllegalStateException(); 211 } 212 213 return mReceiveQueue.take(); 214 } 215 216 @Override 217 public void startKeepAlive(long interval) { 218 if (!mConnected || mStopped) { 219 throw new IllegalStateException(); 220 } 221 222 if (interval <= 0) { 223 interval = mConnection.getConfig().getDefaultKeepAliveInterval(); 224 } 225 226 mKeepAliveMillis = interval * 1000; 227 if (mKeepAliveMillis < 0) { 228 ImpsLog.log("Negative keep alive time. Won't send keep-alive"); 229 } 230 mKeepAlivePrimitive = new Primitive(ImpsTags.KeepAlive_Request); 231 startHeartbeat(); 232 } 233 234 private void startHeartbeat() { 235 HeartbeatService heartbeatService 236 = SystemService.getDefault().getHeartbeatService(); 237 if (heartbeatService != null) { 238 heartbeatService.startHeartbeat(this, mKeepAliveMillis); 239 } 240 } 241 242 public long sendHeartbeat() { 243 if (mSuspended) { 244 return 0; 245 } 246 247 long inactiveTime = SystemClock.elapsedRealtime() - mLastActive; 248 if (needSendKeepAlive(inactiveTime)) { 249 sendKeepAlive(); 250 return mKeepAliveMillis; 251 } else { 252 return mKeepAliveMillis - inactiveTime; 253 } 254 } 255 256 private boolean needSendKeepAlive(long inactiveTime) { 257 return mKeepAliveMillis - inactiveTime <= 500; 258 } 259 260 @Override 261 public long getLastActiveTime() { 262 return mLastActive; 263 } 264 265 @Override 266 public boolean isSendingQueueEmpty() { 267 if (!mConnected || mStopped) { 268 throw new IllegalStateException(); 269 } 270 return mSendQueue.isEmpty(); 271 } 272 273 public void run() { 274 while (!mStopped) { 275 try { 276 Primitive primitive = mSendQueue.take(); 277 if (primitive.getType().equals(ImpsTags.Polling_Request)) { 278 mHasPendingPolling.set(false); 279 } 280 doSendPrimitive(primitive); 281 } catch (InterruptedException e) { 282 } 283 } 284 mHttpClient.close(); 285 } 286 287 private void sendKeepAlive() { 288 ImpsTransactionManager tm = mConnection.getTransactionManager(); 289 AsyncTransaction tx = new AsyncTransaction(tm) { 290 @Override 291 public void onResponseError(ImpsErrorInfo error) { 292 } 293 294 @Override 295 public void onResponseOk(Primitive response) { 296 // Since we never request a new timeout value, the response 297 // can be ignored 298 } 299 }; 300 tx.sendRequest(mKeepAlivePrimitive); 301 } 302 303 /** 304 * Sends a primitive to the IMPS server through HTTP. 305 * 306 * @param p The primitive to send. 307 */ 308 private void doSendPrimitive(Primitive p) { 309 String errorInfo = null; 310 int retryCount = 0; 311 long retryDelay = INIT_RETRY_DELAY_MS; 312 while (retryCount < MAX_RETRY_COUNT) { 313 try { 314 trySend(p); 315 return; 316 } catch (IOException e) { 317 errorInfo = e.getLocalizedMessage(); 318 String type = p.getType(); 319 if (ImpsTags.Login_Request.equals(type) 320 || ImpsTags.Logout_Request.equals(type)) { 321 // we don't retry to send login/logout request. The request 322 // might be sent to the server successfully but we failed to 323 // get the response from the server. Retry in this case might 324 // cause multiple login which is not allowed by some server. 325 break; 326 } 327 if (p.getTransactionMode() == TransactionMode.Response) { 328 // Ignore the failure of sending response to the server since 329 // it's only an acknowledgment. When we get here, the 330 // primitive might have been sent successfully but failed to 331 // get the http response. The server might or might not send 332 // the request again if it does not receive the acknowledgment, 333 // the client is ok to either case. 334 return; 335 } 336 retryCount++; 337 // sleep for a while and retry to send the primitive in a new 338 // transaction if we havn't met the max retry count. 339 if (retryCount < MAX_RETRY_COUNT) { 340 mTxManager.reassignTransactionId(p); 341 Log.w(ImpsLog.TAG, "Send primitive failed, retry after " + retryDelay + "ms"); 342 synchronized (mRetryLock) { 343 try { 344 mRetryLock.wait(retryDelay); 345 } catch (InterruptedException ignore) { 346 } 347 if (mStopRetry) { 348 break; 349 } 350 } 351 retryDelay = retryDelay * 2; 352 if (retryDelay > MAX_RETRY_DELAY_MS) { 353 retryDelay = MAX_RETRY_DELAY_MS; 354 } 355 } 356 } 357 } 358 Log.w(ImpsLog.TAG, "Failed to send primitive after " + MAX_RETRY_COUNT + " retries"); 359 mTxManager.notifyErrorResponse(p.getTransactionID(), 360 ImErrorInfo.NETWORK_ERROR, errorInfo); 361 } 362 363 private void trySend(Primitive p) throws IOException { 364 ByteArrayOutputStream out = new ByteArrayOutputStream(); 365 try { 366 mSerializer.serialize(p, out); 367 } catch (SerializerException e) { 368 mTxManager.notifyErrorResponse(p.getTransactionID(), 369 ImErrorInfo.SERIALIZER_ERROR, 370 "Internal serializer error, primitive: " + p.getType()); 371 out.close(); 372 return; 373 } 374 375 HttpPost req = new HttpPost(mPostUri); 376 req.addHeader(mContentTypeHeader); 377 if (mMsisdnHeader != null) { 378 req.addHeader(mMsisdnHeader); 379 } 380 ByteArrayEntity entity = new ByteArrayEntity(out.toByteArray()); 381 req.setEntity(entity); 382 383 mLastActive = SystemClock.elapsedRealtime(); 384 if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) { 385 long sendBytes = entity.getContentLength() + 176 /* approx. header length */; 386 ImpsLog.log(mConnection.getLoginUserName() + " >> " + p.getType() + " HTTP payload approx. " + sendBytes + " bytes"); 387 } 388 if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) { 389 ImpsLog.dumpRawPacket(out.toByteArray()); 390 ImpsLog.dumpPrimitive(p); 391 } 392 393 HttpResponse res = mHttpClient.execute(req); 394 StatusLine statusLine = res.getStatusLine(); 395 HttpEntity resEntity = res.getEntity(); 396 397 InputStream in = resEntity.getContent(); 398 399 if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) { 400 Log.d(ImpsLog.PACKET_TAG, statusLine.toString()); 401 Header[] headers = res.getAllHeaders(); 402 for (Header h : headers) { 403 Log.d(ImpsLog.PACKET_TAG, h.toString()); 404 } 405 int len = (int) resEntity.getContentLength(); 406 if (len > 0) { 407 byte[] content = new byte[len]; 408 int offset = 0; 409 int bytesRead = 0; 410 do { 411 bytesRead = in.read(content, offset, len); 412 offset += bytesRead; 413 len -= bytesRead; 414 } while (bytesRead > 0); 415 in.close(); 416 ImpsLog.dumpRawPacket(content); 417 in = new ByteArrayInputStream(content); 418 } 419 } 420 421 try { 422 if (statusLine.getStatusCode() != HttpURLConnection.HTTP_OK) { 423 mTxManager.notifyErrorResponse(p.getTransactionID(), statusLine.getStatusCode(), 424 statusLine.getReasonPhrase()); 425 return; 426 } 427 if (resEntity.getContentLength() == 0) { 428 // empty responses are only valid for Polling-Request or 429 // server initiated transactions 430 if ((p.getTransactionMode() != TransactionMode.Response) 431 && !p.getType().equals(ImpsTags.Polling_Request)) { 432 mTxManager.notifyErrorResponse(p.getTransactionID(), 433 ImErrorInfo.ILLEGAL_SERVER_RESPONSE, 434 "bad response from server"); 435 } 436 return; 437 } 438 439 Primitive response = mParser.parse(in); 440 441 if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) { 442 ImpsLog.dumpPrimitive(response); 443 } 444 445 if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) { 446 long len = 2 + resEntity.getContentLength() + statusLine.toString().length() + 2; 447 Header[] headers = res.getAllHeaders(); 448 for (Header header : headers) { 449 len += header.getName().length() + header.getValue().length() + 4; 450 } 451 ImpsLog.log(mConnection.getLoginUserName() + " << " 452 + response.getType() + " HTTP payload approx. " + len + "bytes"); 453 } 454 455 if (!mReceiveQueue.offer(response)) { 456 // This is almost impossible for a LinkedBlockingQueue. 457 // We don't even bother to assign an error code for it. 458 mTxManager.notifyErrorResponse(p.getTransactionID(), 459 ImErrorInfo.UNKNOWN_ERROR, "receiving queue full"); 460 } 461 } catch (ParserException e) { 462 ImpsLog.logError(e); 463 mTxManager.notifyErrorResponse(p.getTransactionID(), 464 ImErrorInfo.PARSER_ERROR, 465 "Parser error, received a bad response from server"); 466 } finally { 467 //consume all the content so that the connection can be re-used. 468 resEntity.consumeContent(); 469 } 470 } 471 472 } 473