Home | History | Annotate | Download | only in source
      1 /*
      2  * Copyright (C) 2015 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.tv.tuner.source;
     18 
     19 import android.content.Context;
     20 import android.util.Log;
     21 import android.util.Pair;
     22 
     23 import com.google.android.exoplayer.C;
     24 import com.google.android.exoplayer.upstream.DataSpec;
     25 import com.android.tv.common.SoftPreconditions;
     26 import com.android.tv.tuner.ChannelScanFileParser;
     27 import com.android.tv.tuner.TunerHal;
     28 import com.android.tv.tuner.TunerPreferences;
     29 import com.android.tv.tuner.data.TunerChannel;
     30 import com.android.tv.tuner.tvinput.EventDetector;
     31 import com.android.tv.tuner.tvinput.EventDetector.EventListener;
     32 
     33 import java.io.IOException;
     34 import java.util.ArrayList;
     35 import java.util.List;
     36 import java.util.concurrent.atomic.AtomicLong;
     37 
     38 /**
     39  * Provides MPEG-2 TS stream sources for channel playing from an underlying tuner device.
     40  */
     41 public class TunerTsStreamer implements TsStreamer {
     42     private static final String TAG = "TunerTsStreamer";
     43 
     44     private static final int MIN_READ_UNIT = 1500;
     45     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~15KB
     46     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 20000;  // ~ 30MB
     47     private static final int TS_PACKET_SIZE = 188;
     48 
     49     private static final int READ_TIMEOUT_MS = 5000; // 5 secs.
     50     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
     51     private static final int READ_ERROR_STREAMING_ENDED = -1;
     52     private static final int READ_ERROR_BUFFER_OVERWRITTEN = -2;
     53 
     54     private final Object mCircularBufferMonitor = new Object();
     55     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
     56     private long mBytesFetched;
     57     private final AtomicLong mLastReadPosition = new AtomicLong();
     58     private boolean mStreaming;
     59 
     60     private final TunerHal mTunerHal;
     61     private TunerChannel mChannel;
     62     private Thread mStreamingThread;
     63     private final EventDetector mEventDetector;
     64     private final List<Pair<EventListener, Boolean>> mEventListenerActions = new ArrayList<>();
     65 
     66     private final TsStreamWriter mTsStreamWriter;
     67     private String mChannelNumber;
     68 
     69     public static class TunerDataSource extends TsDataSource {
     70         private final TunerTsStreamer mTsStreamer;
     71         private final AtomicLong mLastReadPosition = new AtomicLong(0);
     72         private long mStartBufferedPosition;
     73 
     74         private TunerDataSource(TunerTsStreamer tsStreamer) {
     75             mTsStreamer = tsStreamer;
     76             mStartBufferedPosition = tsStreamer.getBufferedPosition();
     77         }
     78 
     79         @Override
     80         public long getBufferedPosition() {
     81             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
     82         }
     83 
     84         @Override
     85         public long getLastReadPosition() {
     86             return mLastReadPosition.get();
     87         }
     88 
     89         @Override
     90         public void shiftStartPosition(long offset) {
     91             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
     92             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
     93             mStartBufferedPosition += offset;
     94         }
     95 
     96         @Override
     97         public long open(DataSpec dataSpec) throws IOException {
     98             mLastReadPosition.set(0);
     99             return C.LENGTH_UNBOUNDED;
    100         }
    101 
    102         @Override
    103         public void close() {
    104         }
    105 
    106         @Override
    107         public int read(byte[] buffer, int offset, int readLength) throws IOException {
    108             int ret = mTsStreamer.readAt(mStartBufferedPosition + mLastReadPosition.get(), buffer,
    109                     offset, readLength);
    110             if (ret > 0) {
    111                 mLastReadPosition.addAndGet(ret);
    112             } else if (ret == READ_ERROR_BUFFER_OVERWRITTEN) {
    113                 long currentPosition = mStartBufferedPosition + mLastReadPosition.get();
    114                 long endPosition = mTsStreamer.getBufferedPosition();
    115                 long diff = ((endPosition - currentPosition + TS_PACKET_SIZE - 1) / TS_PACKET_SIZE)
    116                         * TS_PACKET_SIZE;
    117                 Log.w(TAG, "Demux position jump by overwritten buffer: " + diff);
    118                 mStartBufferedPosition = currentPosition + diff;
    119                 mLastReadPosition.set(0);
    120                 return 0;
    121             }
    122             return ret;
    123         }
    124     }
    125     /**
    126      * Creates {@link TsStreamer} for playing or recording the specified channel.
    127      * @param tunerHal the HAL for tuner device
    128      * @param eventListener the listener for channel & program information
    129      */
    130     public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener, Context context) {
    131         mTunerHal = tunerHal;
    132         mEventDetector = new EventDetector(mTunerHal);
    133         if (eventListener != null) {
    134             mEventDetector.registerListener(eventListener);
    135         }
    136         mTsStreamWriter = context != null && TunerPreferences.getStoreTsStream(context) ?
    137                 new TsStreamWriter(context) : null;
    138     }
    139 
    140     public TunerTsStreamer(TunerHal tunerHal, EventListener eventListener) {
    141         this(tunerHal, eventListener, null);
    142     }
    143 
    144     @Override
    145     public boolean startStream(TunerChannel channel) {
    146         if (mTunerHal.tune(channel.getFrequency(), channel.getModulation(),
    147                 channel.getDisplayNumber(false))) {
    148             if (channel.hasVideo()) {
    149                 mTunerHal.addPidFilter(channel.getVideoPid(),
    150                         TunerHal.FILTER_TYPE_VIDEO);
    151             }
    152             boolean audioFilterSet = false;
    153             for (Integer audioPid : channel.getAudioPids()) {
    154                 if (!audioFilterSet) {
    155                     mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_AUDIO);
    156                     audioFilterSet = true;
    157                 } else {
    158                     // FILTER_TYPE_AUDIO overrides the previous filter for audio. We use
    159                     // FILTER_TYPE_OTHER from the secondary one to get the all audio tracks.
    160                     mTunerHal.addPidFilter(audioPid, TunerHal.FILTER_TYPE_OTHER);
    161                 }
    162             }
    163             mTunerHal.addPidFilter(channel.getPcrPid(),
    164                     TunerHal.FILTER_TYPE_PCR);
    165             if (mEventDetector != null) {
    166                 mEventDetector.startDetecting(channel.getFrequency(), channel.getModulation(),
    167                         channel.getProgramNumber());
    168             }
    169             mChannel = channel;
    170             mChannelNumber = channel.getDisplayNumber();
    171             synchronized (mCircularBufferMonitor) {
    172                 if (mStreaming) {
    173                     Log.w(TAG, "Streaming should be stopped before start streaming");
    174                     return true;
    175                 }
    176                 mStreaming = true;
    177                 mBytesFetched = 0;
    178                 mLastReadPosition.set(0L);
    179             }
    180             if (mTsStreamWriter != null) {
    181                 mTsStreamWriter.setChannel(mChannel);
    182                 mTsStreamWriter.openFile();
    183             }
    184             mStreamingThread = new StreamingThread();
    185             mStreamingThread.start();
    186             Log.i(TAG, "Streaming started");
    187             return true;
    188         }
    189         return false;
    190     }
    191 
    192     @Override
    193     public boolean startStream(ChannelScanFileParser.ScanChannel channel) {
    194         if (mTunerHal.tune(channel.frequency, channel.modulation, null)) {
    195             mEventDetector.startDetecting(
    196                     channel.frequency, channel.modulation, EventDetector.ALL_PROGRAM_NUMBERS);
    197             synchronized (mCircularBufferMonitor) {
    198                 if (mStreaming) {
    199                     Log.w(TAG, "Streaming should be stopped before start streaming");
    200                     return true;
    201                 }
    202                 mStreaming = true;
    203                 mBytesFetched = 0;
    204                 mLastReadPosition.set(0L);
    205             }
    206             mStreamingThread = new StreamingThread();
    207             mStreamingThread.start();
    208             Log.i(TAG, "Streaming started");
    209             return true;
    210         }
    211         return false;
    212     }
    213 
    214     /**
    215      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
    216      * device is overloaded this can take a while, but usually it returns pretty quickly.
    217      */
    218     @Override
    219     public void stopStream() {
    220         mChannel = null;
    221         synchronized (mCircularBufferMonitor) {
    222             mStreaming = false;
    223             mCircularBufferMonitor.notifyAll();
    224         }
    225 
    226         try {
    227             if (mStreamingThread != null) {
    228                 mStreamingThread.join();
    229             }
    230         } catch (InterruptedException e) {
    231             Thread.currentThread().interrupt();
    232         }
    233         if (mTsStreamWriter != null) {
    234             mTsStreamWriter.closeFile(true);
    235             mTsStreamWriter.setChannel(null);
    236         }
    237     }
    238 
    239     @Override
    240     public TsDataSource createDataSource() {
    241         return new TunerDataSource(this);
    242     }
    243 
    244     /**
    245      * Returns incomplete channel lists which was scanned so far. Incomplete channel means
    246      * the channel whose channel information is not complete or is not well-formed.
    247      * @return {@link List} of {@link TunerChannel}
    248      */
    249     public List<TunerChannel> getMalFormedChannels() {
    250         return mEventDetector.getMalFormedChannels();
    251     }
    252 
    253     /**
    254      * Returns the current {@link TunerHal} which provides MPEG-TS stream for TunerTsStreamer.
    255      * @return {@link TunerHal}
    256      */
    257     public TunerHal getTunerHal() {
    258         return mTunerHal;
    259     }
    260 
    261     /**
    262      * Returns the current tuned channel for TunerTsStreamer.
    263      * @return {@link TunerChannel}
    264      */
    265     public TunerChannel getChannel() {
    266         return mChannel;
    267     }
    268 
    269     /**
    270      * Returns the current buffered position from tuner.
    271      * @return the current buffered position
    272      */
    273     public long getBufferedPosition() {
    274         synchronized (mCircularBufferMonitor) {
    275             return mBytesFetched;
    276         }
    277     }
    278 
    279     public String getStreamerInfo() {
    280         return "Channel: " + mChannelNumber + ", Streaming: " + mStreaming;
    281     }
    282 
    283     public void registerListener(EventListener listener) {
    284         if (mEventDetector != null && listener != null) {
    285             synchronized (mEventListenerActions) {
    286                 mEventListenerActions.add(new Pair<>(listener, true));
    287             }
    288         }
    289     }
    290 
    291     public void unregisterListener(EventListener listener) {
    292         if (mEventDetector != null) {
    293             synchronized (mEventListenerActions) {
    294                 mEventListenerActions.add(new Pair(listener, false));
    295             }
    296         }
    297     }
    298 
    299     private class StreamingThread extends Thread {
    300         @Override
    301         public void run() {
    302             // Buffers for streaming data from the tuner and the internal buffer.
    303             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
    304 
    305             while (true) {
    306                 synchronized (mCircularBufferMonitor) {
    307                     if (!mStreaming) {
    308                         break;
    309                     }
    310                 }
    311 
    312                 if (mEventDetector != null) {
    313                     synchronized (mEventListenerActions) {
    314                         for (Pair listenerAction : mEventListenerActions) {
    315                             EventListener listener = (EventListener) listenerAction.first;
    316                             if ((boolean) listenerAction.second) {
    317                                 mEventDetector.registerListener(listener);
    318                             } else {
    319                                 mEventDetector.unregisterListener(listener);
    320                             }
    321                         }
    322                         mEventListenerActions.clear();
    323                     }
    324                 }
    325 
    326                 int bytesWritten = mTunerHal.readTsStream(dataBuffer, dataBuffer.length);
    327                 if (bytesWritten <= 0) {
    328                     try {
    329                         // When buffer is underrun, we sleep for short time to prevent
    330                         // unnecessary CPU draining.
    331                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
    332                     } catch (InterruptedException e) {
    333                         Thread.currentThread().interrupt();
    334                     }
    335                     continue;
    336                 }
    337 
    338                 if (mTsStreamWriter != null) {
    339                     mTsStreamWriter.writeToFile(dataBuffer, bytesWritten);
    340                 }
    341 
    342                 if (mEventDetector != null) {
    343                     mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
    344                 }
    345                 synchronized (mCircularBufferMonitor) {
    346                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
    347                     int bytesToCopyInFirstPass = bytesWritten;
    348                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
    349                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
    350                     }
    351                     System.arraycopy(dataBuffer, 0, mCircularBuffer, posInBuffer,
    352                             bytesToCopyInFirstPass);
    353                     if (bytesToCopyInFirstPass < bytesWritten) {
    354                         System.arraycopy(dataBuffer, bytesToCopyInFirstPass, mCircularBuffer, 0,
    355                                 bytesWritten - bytesToCopyInFirstPass);
    356                     }
    357                     mBytesFetched += bytesWritten;
    358                     mCircularBufferMonitor.notifyAll();
    359                 }
    360             }
    361 
    362             Log.i(TAG, "Streaming stopped");
    363         }
    364     }
    365 
    366     /**
    367      * Reads data from internal buffer.
    368      * @param pos the position to read from
    369      * @param buffer to read
    370      * @param offset start position of the read buffer
    371      * @param amount number of bytes to read
    372      * @return number of read bytes when successful, {@code -1} otherwise
    373      * @throws IOException
    374      */
    375     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
    376         while (true) {
    377             synchronized (mCircularBufferMonitor) {
    378                 if (!mStreaming) {
    379                     return READ_ERROR_STREAMING_ENDED;
    380                 }
    381                 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
    382                     Log.w(TAG, "Demux is requesting the data which is already overwritten.");
    383                     return READ_ERROR_BUFFER_OVERWRITTEN;
    384                 }
    385                 if (mBytesFetched < pos + amount) {
    386                     try {
    387                         mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
    388                     } catch (InterruptedException e) {
    389                         Thread.currentThread().interrupt();
    390                     }
    391                     // Try again to prevent starvation.
    392                     // Give chances to read from other threads.
    393                     continue;
    394                 }
    395                 int startPos = (int) (pos % CIRCULAR_BUFFER_SIZE);
    396                 int endPos = (int) ((pos + amount) % CIRCULAR_BUFFER_SIZE);
    397                 int firstLength = (startPos > endPos ? CIRCULAR_BUFFER_SIZE : endPos) - startPos;
    398                 System.arraycopy(mCircularBuffer, startPos, buffer, offset, firstLength);
    399                 if (firstLength < amount) {
    400                     System.arraycopy(mCircularBuffer, 0, buffer, offset + firstLength,
    401                             amount - firstLength);
    402                 }
    403                 mCircularBufferMonitor.notifyAll();
    404                 return amount;
    405             }
    406         }
    407     }
    408 }
    409