Home | History | Annotate | Download | only in common
      1 /*
      2  * Copyright (C) 2013 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.accessorydisplay.common;
     18 
     19 import android.os.Handler;
     20 import android.os.Looper;
     21 import android.os.Message;
     22 import android.util.SparseArray;
     23 
     24 import java.io.IOException;
     25 import java.nio.ByteBuffer;
     26 
     27 /**
     28  * A simple message transport.
     29  * <p>
     30  * This object's interface is thread-safe, however incoming messages
     31  * are always delivered on the {@link Looper} thread on which the transport
     32  * was created.
     33  * </p>
     34  */
     35 public abstract class Transport {
     36     private static final int MAX_INPUT_BUFFERS = 8;
     37 
     38     private final Logger mLogger;
     39 
     40     // The transport thread looper and handler.
     41     private final TransportHandler mHandler;
     42 
     43     // Lock to guard all mutable state.
     44     private final Object mLock = new Object();
     45 
     46     // The output buffer.  Set to null when the transport is closed.
     47     private ByteBuffer mOutputBuffer;
     48 
     49     // The input buffer pool.
     50     private BufferPool mInputBufferPool;
     51 
     52     // The reader thread.  Initialized when reading starts.
     53     private ReaderThread mThread;
     54 
     55     // The list of callbacks indexed by service id.
     56     private final SparseArray<Callback> mServices = new SparseArray<Callback>();
     57 
     58     public Transport(Logger logger, int maxPacketSize) {
     59         mLogger = logger;
     60         mHandler = new TransportHandler();
     61         mOutputBuffer = ByteBuffer.allocate(maxPacketSize);
     62         mInputBufferPool = new BufferPool(
     63                 maxPacketSize, Protocol.MAX_ENVELOPE_SIZE, MAX_INPUT_BUFFERS);
     64     }
     65 
     66     /**
     67      * Gets the logger for debugging.
     68      */
     69     public Logger getLogger() {
     70         return mLogger;
     71     }
     72 
     73     /**
     74      * Gets the handler on the transport's thread.
     75      */
     76     public Handler getHandler() {
     77         return mHandler;
     78     }
     79 
     80     /**
     81      * Closes the transport.
     82      */
     83     public void close() {
     84         synchronized (mLock) {
     85             if (mOutputBuffer != null) {
     86                 if (mThread == null) {
     87                     ioClose();
     88                 } else {
     89                     // If the thread was started then it will be responsible for
     90                     // closing the stream when it quits because it may currently
     91                     // be in the process of reading from the stream so we can't simply
     92                     // shut it down right now.
     93                     mThread.quit();
     94                 }
     95                 mOutputBuffer = null;
     96             }
     97         }
     98     }
     99 
    100     /**
    101      * Sends a message.
    102      *
    103      * @param service The service to whom the message is addressed.
    104      * @param what The message type.
    105      * @param content The content, or null if there is none.
    106      * @return True if the message was sent successfully, false if an error occurred.
    107      */
    108     public boolean sendMessage(int service, int what, ByteBuffer content) {
    109         checkServiceId(service);
    110         checkMessageId(what);
    111 
    112         try {
    113             synchronized (mLock) {
    114                 if (mOutputBuffer == null) {
    115                     mLogger.logError("Send message failed because transport was closed.");
    116                     return false;
    117                 }
    118 
    119                 final byte[] outputArray = mOutputBuffer.array();
    120                 final int capacity = mOutputBuffer.capacity();
    121                 mOutputBuffer.clear();
    122                 mOutputBuffer.putShort((short)service);
    123                 mOutputBuffer.putShort((short)what);
    124                 if (content == null) {
    125                     mOutputBuffer.putInt(0);
    126                 } else {
    127                     final int contentLimit = content.limit();
    128                     int contentPosition = content.position();
    129                     int contentRemaining = contentLimit - contentPosition;
    130                     if (contentRemaining > Protocol.MAX_CONTENT_SIZE) {
    131                         throw new IllegalArgumentException("Message content too large: "
    132                                 + contentRemaining + " > " + Protocol.MAX_CONTENT_SIZE);
    133                     }
    134                     mOutputBuffer.putInt(contentRemaining);
    135                     while (contentRemaining != 0) {
    136                         final int outputAvailable = capacity - mOutputBuffer.position();
    137                         if (contentRemaining <= outputAvailable) {
    138                             mOutputBuffer.put(content);
    139                             break;
    140                         }
    141                         content.limit(contentPosition + outputAvailable);
    142                         mOutputBuffer.put(content);
    143                         content.limit(contentLimit);
    144                         ioWrite(outputArray, 0, capacity);
    145                         contentPosition += outputAvailable;
    146                         contentRemaining -= outputAvailable;
    147                         mOutputBuffer.clear();
    148                     }
    149                 }
    150                 ioWrite(outputArray, 0, mOutputBuffer.position());
    151                 return true;
    152             }
    153         } catch (IOException ex) {
    154             mLogger.logError("Send message failed: " + ex);
    155             return false;
    156         }
    157     }
    158 
    159     /**
    160      * Starts reading messages on a separate thread.
    161      */
    162     public void startReading() {
    163         synchronized (mLock) {
    164             if (mOutputBuffer == null) {
    165                 throw new IllegalStateException("Transport has been closed");
    166             }
    167 
    168             mThread = new ReaderThread();
    169             mThread.start();
    170         }
    171     }
    172 
    173     /**
    174      * Registers a service and provides a callback to receive messages.
    175      *
    176      * @param service The service id.
    177      * @param callback The callback to use.
    178      */
    179     public void registerService(int service, Callback callback) {
    180         checkServiceId(service);
    181         if (callback == null) {
    182             throw new IllegalArgumentException("callback must not be null");
    183         }
    184 
    185         synchronized (mLock) {
    186             mServices.put(service, callback);
    187         }
    188     }
    189 
    190     /**
    191      * Unregisters a service.
    192      *
    193      * @param service The service to unregister.
    194      */
    195     public void unregisterService(int service) {
    196         checkServiceId(service);
    197 
    198         synchronized (mLock) {
    199             mServices.remove(service);
    200         }
    201     }
    202 
    203     private void dispatchMessageReceived(int service, int what, ByteBuffer content) {
    204         final Callback callback;
    205         synchronized (mLock) {
    206             callback = mServices.get(service);
    207         }
    208         if (callback != null) {
    209             callback.onMessageReceived(service, what, content);
    210         } else {
    211             mLogger.log("Discarding message " + what
    212                     + " for unregistered service " + service);
    213         }
    214     }
    215 
    216     private static void checkServiceId(int service) {
    217         if (service < 0 || service > 0xffff) {
    218             throw new IllegalArgumentException("service id out of range: " + service);
    219         }
    220     }
    221 
    222     private static void checkMessageId(int what) {
    223         if (what < 0 || what > 0xffff) {
    224             throw new IllegalArgumentException("message id out of range: " + what);
    225         }
    226     }
    227 
    228     // The IO methods must be safe to call on any thread.
    229     // They may be called concurrently.
    230     protected abstract void ioClose();
    231     protected abstract int ioRead(byte[] buffer, int offset, int count)
    232             throws IOException;
    233     protected abstract void ioWrite(byte[] buffer, int offset, int count)
    234             throws IOException;
    235 
    236     /**
    237      * Callback for services that handle received messages.
    238      */
    239     public interface Callback {
    240         /**
    241          * Indicates that a message was received.
    242          *
    243          * @param service The service to whom the message is addressed.
    244          * @param what The message type.
    245          * @param content The content, or null if there is none.
    246          */
    247         public void onMessageReceived(int service, int what, ByteBuffer content);
    248     }
    249 
    250     final class TransportHandler extends Handler {
    251         @Override
    252         public void handleMessage(Message msg) {
    253             final ByteBuffer buffer = (ByteBuffer)msg.obj;
    254             try {
    255                 final int limit = buffer.limit();
    256                 while (buffer.position() < limit) {
    257                     final int service = buffer.getShort() & 0xffff;
    258                     final int what = buffer.getShort() & 0xffff;
    259                     final int contentSize = buffer.getInt();
    260                     if (contentSize == 0) {
    261                         dispatchMessageReceived(service, what, null);
    262                     } else {
    263                         final int end = buffer.position() + contentSize;
    264                         buffer.limit(end);
    265                         dispatchMessageReceived(service, what, buffer);
    266                         buffer.limit(limit);
    267                         buffer.position(end);
    268                     }
    269                 }
    270             } finally {
    271                 mInputBufferPool.release(buffer);
    272             }
    273         }
    274     }
    275 
    276     final class ReaderThread extends Thread {
    277         // Set to true when quitting.
    278         private volatile boolean mQuitting;
    279 
    280         public ReaderThread() {
    281             super("Accessory Display Transport");
    282         }
    283 
    284         @Override
    285         public void run() {
    286             loop();
    287             ioClose();
    288         }
    289 
    290         private void loop() {
    291             ByteBuffer buffer = null;
    292             int length = Protocol.HEADER_SIZE;
    293             int contentSize = -1;
    294             outer: while (!mQuitting) {
    295                 // Get a buffer.
    296                 if (buffer == null) {
    297                     buffer = mInputBufferPool.acquire(length);
    298                 } else {
    299                     buffer = mInputBufferPool.grow(buffer, length);
    300                 }
    301 
    302                 // Read more data until needed number of bytes obtained.
    303                 int position = buffer.position();
    304                 int count;
    305                 try {
    306                     count = ioRead(buffer.array(), position, buffer.capacity() - position);
    307                     if (count < 0) {
    308                         break; // end of stream
    309                     }
    310                 } catch (IOException ex) {
    311                     mLogger.logError("Read failed: " + ex);
    312                     break; // error
    313                 }
    314                 position += count;
    315                 buffer.position(position);
    316                 if (contentSize < 0 && position >= Protocol.HEADER_SIZE) {
    317                     contentSize = buffer.getInt(4);
    318                     if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
    319                         mLogger.logError("Encountered invalid content size: " + contentSize);
    320                         break; // malformed stream
    321                     }
    322                     length += contentSize;
    323                 }
    324                 if (position < length) {
    325                     continue; // need more data
    326                 }
    327 
    328                 // There is at least one complete message in the buffer.
    329                 // Find the end of a contiguous chunk of complete messages.
    330                 int next = length;
    331                 int remaining;
    332                 for (;;) {
    333                     length = Protocol.HEADER_SIZE;
    334                     remaining = position - next;
    335                     if (remaining < length) {
    336                         contentSize = -1;
    337                         break; // incomplete header, need more data
    338                     }
    339                     contentSize = buffer.getInt(next + 4);
    340                     if (contentSize < 0 || contentSize > Protocol.MAX_CONTENT_SIZE) {
    341                         mLogger.logError("Encountered invalid content size: " + contentSize);
    342                         break outer; // malformed stream
    343                     }
    344                     length += contentSize;
    345                     if (remaining < length) {
    346                         break; // incomplete content, need more data
    347                     }
    348                     next += length;
    349                 }
    350 
    351                 // Post the buffer then don't modify it anymore.
    352                 // Now this is kind of sneaky.  We know that no other threads will
    353                 // be acquiring buffers from the buffer pool so we can keep on
    354                 // referring to this buffer as long as we don't modify its contents.
    355                 // This allows us to operate in a single-buffered mode if desired.
    356                 buffer.limit(next);
    357                 buffer.rewind();
    358                 mHandler.obtainMessage(0, buffer).sendToTarget();
    359 
    360                 // If there is an incomplete message at the end, then we will need
    361                 // to copy it to a fresh buffer before continuing.  In the single-buffered
    362                 // case, we may acquire the same buffer as before which is fine.
    363                 if (remaining == 0) {
    364                     buffer = null;
    365                 } else {
    366                     final ByteBuffer oldBuffer = buffer;
    367                     buffer = mInputBufferPool.acquire(length);
    368                     System.arraycopy(oldBuffer.array(), next, buffer.array(), 0, remaining);
    369                     buffer.position(remaining);
    370                 }
    371             }
    372 
    373             if (buffer != null) {
    374                 mInputBufferPool.release(buffer);
    375             }
    376         }
    377 
    378         public void quit() {
    379             mQuitting = true;
    380         }
    381     }
    382 }
    383