Home | History | Annotate | Download | only in source
      1 /*
      2  * Copyright (C) 2015 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.tv.tuner.source;
     18 
     19 import android.content.Context;
     20 import android.os.Environment;
     21 import android.util.Log;
     22 import android.util.SparseBooleanArray;
     23 import com.android.tv.common.SoftPreconditions;
     24 import com.android.tv.tuner.ChannelScanFileParser.ScanChannel;
     25 import com.android.tv.tuner.TunerFeatures;
     26 import com.android.tv.tuner.data.TunerChannel;
     27 import com.android.tv.tuner.ts.TsParser;
     28 import com.android.tv.tuner.tvinput.EventDetector;
     29 import com.android.tv.tuner.tvinput.FileSourceEventDetector;
     30 import com.google.android.exoplayer.C;
     31 import com.google.android.exoplayer.upstream.DataSpec;
     32 import java.io.BufferedInputStream;
     33 import java.io.File;
     34 import java.io.FileInputStream;
     35 import java.io.IOException;
     36 import java.util.List;
     37 import java.util.concurrent.atomic.AtomicLong;
     38 
     39 /**
     40  * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file
     41  * generated by capturing TV signal.
     42  */
     43 public class FileTsStreamer implements TsStreamer {
     44     private static final String TAG = "FileTsStreamer";
     45 
     46     private static final int TS_PACKET_SIZE = 188;
     47     private static final int TS_SYNC_BYTE = 0x47;
     48     private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10;
     49     private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB
     50     private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB
     51     private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB
     52     private static final int READ_TIMEOUT_MS = 10000; // 10 secs.
     53     private static final int BUFFER_UNDERRUN_SLEEP_MS = 10;
     54     private static final String FILE_DIR =
     55             new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath();
     56 
     57     // Virtual frequency base used for file-based source
     58     public static final int FREQ_BASE = 100;
     59 
     60     private final Object mCircularBufferMonitor = new Object();
     61     private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE];
     62     private final FileSourceEventDetector mEventDetector;
     63     private final Context mContext;
     64 
     65     private long mBytesFetched;
     66     private long mLastReadPosition;
     67     private boolean mStreaming;
     68 
     69     private Thread mStreamingThread;
     70     private StreamProvider mSource;
     71 
     72     public static class FileDataSource extends TsDataSource {
     73         private final FileTsStreamer mTsStreamer;
     74         private final AtomicLong mLastReadPosition = new AtomicLong(0);
     75         private long mStartBufferedPosition;
     76 
     77         private FileDataSource(FileTsStreamer tsStreamer) {
     78             mTsStreamer = tsStreamer;
     79             mStartBufferedPosition = tsStreamer.getBufferedPosition();
     80         }
     81 
     82         @Override
     83         public long getBufferedPosition() {
     84             return mTsStreamer.getBufferedPosition() - mStartBufferedPosition;
     85         }
     86 
     87         @Override
     88         public long getLastReadPosition() {
     89             return mLastReadPosition.get();
     90         }
     91 
     92         @Override
     93         public void shiftStartPosition(long offset) {
     94             SoftPreconditions.checkState(mLastReadPosition.get() == 0);
     95             SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition());
     96             mStartBufferedPosition += offset;
     97         }
     98 
     99         @Override
    100         public long open(DataSpec dataSpec) throws IOException {
    101             mLastReadPosition.set(0);
    102             return C.LENGTH_UNBOUNDED;
    103         }
    104 
    105         @Override
    106         public void close() {}
    107 
    108         @Override
    109         public int read(byte[] buffer, int offset, int readLength) throws IOException {
    110             int ret =
    111                     mTsStreamer.readAt(
    112                             mStartBufferedPosition + mLastReadPosition.get(),
    113                             buffer,
    114                             offset,
    115                             readLength);
    116             if (ret > 0) {
    117                 mLastReadPosition.addAndGet(ret);
    118             }
    119             return ret;
    120         }
    121     }
    122 
    123     /**
    124      * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file.
    125      *
    126      * @param eventListener the listener for channel & program information
    127      */
    128     public FileTsStreamer(EventDetector.EventListener eventListener, Context context) {
    129         mEventDetector =
    130                 new FileSourceEventDetector(
    131                         eventListener, TunerFeatures.ENABLE_FILE_DVB.isEnabled(context));
    132         mContext = context;
    133     }
    134 
    135     @Override
    136     public boolean startStream(ScanChannel channel) {
    137         String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath();
    138         mSource = new StreamProvider(filepath);
    139         if (!mSource.isReady()) {
    140             return false;
    141         }
    142         mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS);
    143         mSource.addPidFilter(TsParser.PAT_PID);
    144         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
    145         if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) {
    146             mSource.addPidFilter(TsParser.DVB_EIT_PID);
    147             mSource.addPidFilter(TsParser.DVB_SDT_PID);
    148         }
    149         synchronized (mCircularBufferMonitor) {
    150             if (mStreaming) {
    151                 return true;
    152             }
    153             mStreaming = true;
    154         }
    155 
    156         mStreamingThread = new StreamingThread();
    157         mStreamingThread.start();
    158         Log.i(TAG, "Streaming started");
    159         return true;
    160     }
    161 
    162     @Override
    163     public boolean startStream(TunerChannel channel) {
    164         Log.i(TAG, "tuneToChannel with: " + channel.getFilepath());
    165         mSource = new StreamProvider(channel.getFilepath());
    166         if (!mSource.isReady()) {
    167             return false;
    168         }
    169         mEventDetector.start(mSource, channel.getProgramNumber());
    170         mSource.addPidFilter(channel.getVideoPid());
    171         for (Integer i : channel.getAudioPids()) {
    172             mSource.addPidFilter(i);
    173         }
    174         mSource.addPidFilter(channel.getPcrPid());
    175         mSource.addPidFilter(TsParser.PAT_PID);
    176         mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID);
    177         if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) {
    178             mSource.addPidFilter(TsParser.DVB_EIT_PID);
    179             mSource.addPidFilter(TsParser.DVB_SDT_PID);
    180         }
    181         synchronized (mCircularBufferMonitor) {
    182             if (mStreaming) {
    183                 return true;
    184             }
    185             mStreaming = true;
    186         }
    187 
    188         mStreamingThread = new StreamingThread();
    189         mStreamingThread.start();
    190         Log.i(TAG, "Streaming started");
    191         return true;
    192     }
    193 
    194     /**
    195      * Blocks the current thread until the streaming thread stops. In rare cases when the tuner
    196      * device is overloaded this can take a while, but usually it returns pretty quickly.
    197      */
    198     @Override
    199     public void stopStream() {
    200         synchronized (mCircularBufferMonitor) {
    201             mStreaming = false;
    202             mCircularBufferMonitor.notify();
    203         }
    204 
    205         try {
    206             if (mStreamingThread != null) {
    207                 mStreamingThread.join();
    208             }
    209         } catch (InterruptedException e) {
    210             Thread.currentThread().interrupt();
    211         }
    212     }
    213 
    214     @Override
    215     public TsDataSource createDataSource() {
    216         return new FileDataSource(this);
    217     }
    218 
    219     /**
    220      * Returns the current buffered position from the file.
    221      *
    222      * @return the current buffered position
    223      */
    224     public long getBufferedPosition() {
    225         synchronized (mCircularBufferMonitor) {
    226             return mBytesFetched;
    227         }
    228     }
    229 
    230     /** Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. */
    231     public static class StreamProvider {
    232         private final String mFilepath;
    233         private final SparseBooleanArray mPids = new SparseBooleanArray();
    234         private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE];
    235 
    236         private BufferedInputStream mInputStream;
    237 
    238         private StreamProvider(String filepath) {
    239             mFilepath = filepath;
    240             open(filepath);
    241         }
    242 
    243         private void open(String filepath) {
    244             try {
    245                 mInputStream = new BufferedInputStream(new FileInputStream(filepath));
    246             } catch (IOException e) {
    247                 Log.e(TAG, "Error opening input stream", e);
    248                 mInputStream = null;
    249             }
    250         }
    251 
    252         private boolean isReady() {
    253             return mInputStream != null;
    254         }
    255 
    256         /** Returns the file path of the MPEG-2 TS file. */
    257         public String getFilepath() {
    258             return mFilepath;
    259         }
    260 
    261         /** Adds a pid for filtering from the MPEG-2 TS file. */
    262         public void addPidFilter(int pid) {
    263             mPids.put(pid, true);
    264         }
    265 
    266         /** Returns whether the current pid filter is empty or not. */
    267         public boolean isFilterEmpty() {
    268             return mPids.size() == 0;
    269         }
    270 
    271         /** Clears the current pid filter. */
    272         public void clearPidFilter() {
    273             mPids.clear();
    274         }
    275 
    276         /**
    277          * Returns whether a pid is in the pid filter or not.
    278          *
    279          * @param pid the pid to check
    280          */
    281         public boolean isInFilter(int pid) {
    282             return mPids.get(pid);
    283         }
    284 
    285         /**
    286          * Reads from the MPEG-2 TS file to buffer.
    287          *
    288          * @param inputBuffer to read
    289          * @return the number of read bytes
    290          */
    291         private int read(byte[] inputBuffer) {
    292             int readSize = readInternal();
    293             if (readSize <= 0) {
    294                 // Reached the end of stream. Restart from the beginning.
    295                 close();
    296                 open(mFilepath);
    297                 if (mInputStream == null) {
    298                     return -1;
    299                 }
    300                 readSize = readInternal();
    301             }
    302 
    303             if (mPreBuffer[0] != TS_SYNC_BYTE) {
    304                 Log.e(TAG, "Error reading input stream - no TS sync found");
    305                 return -1;
    306             }
    307             int filteredSize = 0;
    308             for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) {
    309                 if (mPreBuffer[i] == TS_SYNC_BYTE) {
    310                     int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff);
    311                     if (mPids.get(pid)) {
    312                         System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE);
    313                         destPos += TS_PACKET_SIZE;
    314                         filteredSize += TS_PACKET_SIZE;
    315                     }
    316                 }
    317             }
    318             return filteredSize;
    319         }
    320 
    321         private int readInternal() {
    322             int readSize;
    323             try {
    324                 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length);
    325             } catch (IOException e) {
    326                 Log.e(TAG, "Error reading input stream", e);
    327                 return -1;
    328             }
    329             return readSize;
    330         }
    331 
    332         private void close() {
    333             try {
    334                 mInputStream.close();
    335             } catch (IOException e) {
    336                 Log.e(TAG, "Error closing input stream:", e);
    337             }
    338             mInputStream = null;
    339         }
    340     }
    341 
    342     /**
    343      * Reads data from internal buffer.
    344      *
    345      * @param pos the position to read from
    346      * @param buffer to read
    347      * @param offset start position of the read buffer
    348      * @param amount number of bytes to read
    349      * @return number of read bytes when successful, {@code -1} otherwise
    350      * @throws IOException
    351      */
    352     public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException {
    353         synchronized (mCircularBufferMonitor) {
    354             long initialBytesFetched = mBytesFetched;
    355             while (mBytesFetched < pos + amount && mStreaming) {
    356                 try {
    357                     mCircularBufferMonitor.wait(READ_TIMEOUT_MS);
    358                 } catch (InterruptedException e) {
    359                     // Wait again.
    360                     Thread.currentThread().interrupt();
    361                 }
    362                 if (initialBytesFetched == mBytesFetched) {
    363                     Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1.");
    364 
    365                     // Returning -1 will make demux report EOS so that the input service can retry
    366                     // the playback.
    367                     return -1;
    368                 }
    369             }
    370             if (!mStreaming) {
    371                 Log.w(TAG, "Stream is already stopped.");
    372                 return -1;
    373             }
    374             if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) {
    375                 Log.e(TAG, "Demux is requesting the data which is already overwritten.");
    376                 return -1;
    377             }
    378             int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE);
    379             int bytesToCopyInFirstPass = amount;
    380             if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
    381                 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
    382             }
    383             System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass);
    384             if (bytesToCopyInFirstPass < amount) {
    385                 System.arraycopy(
    386                         mCircularBuffer,
    387                         0,
    388                         buffer,
    389                         offset + bytesToCopyInFirstPass,
    390                         amount - bytesToCopyInFirstPass);
    391             }
    392             mLastReadPosition = pos + amount;
    393             mCircularBufferMonitor.notify();
    394             return amount;
    395         }
    396     }
    397 
    398     /**
    399      * Adds {@link ScanChannel} instance for local files.
    400      *
    401      * @param output a list of channels where the results will be placed in
    402      */
    403     public static void addLocalStreamFiles(List<ScanChannel> output) {
    404         File dir = new File(FILE_DIR);
    405         if (!dir.exists()) return;
    406 
    407         File[] tsFiles = dir.listFiles();
    408         if (tsFiles == null) return;
    409         int freq = FileTsStreamer.FREQ_BASE;
    410         for (File file : tsFiles) {
    411             if (!file.isFile()) continue;
    412             output.add(ScanChannel.forFile(freq, file.getName()));
    413             freq += 100;
    414         }
    415     }
    416 
    417     /**
    418      * A thread managing a circular buffer that holds stream data to be consumed by player. Keeps
    419      * reading data in from a {@link StreamProvider} to hold enough amount for buffering. Started
    420      * and stopped by {@link #startStream()} and {@link #stopStream()}, respectively.
    421      */
    422     private class StreamingThread extends Thread {
    423         @Override
    424         public void run() {
    425             byte[] dataBuffer = new byte[READ_BUFFER_SIZE];
    426 
    427             synchronized (mCircularBufferMonitor) {
    428                 mBytesFetched = 0;
    429                 mLastReadPosition = 0;
    430             }
    431 
    432             while (true) {
    433                 synchronized (mCircularBufferMonitor) {
    434                     while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE
    435                             && mStreaming) {
    436                         try {
    437                             mCircularBufferMonitor.wait();
    438                         } catch (InterruptedException e) {
    439                             // Wait again.
    440                             Thread.currentThread().interrupt();
    441                         }
    442                     }
    443                     if (!mStreaming) {
    444                         break;
    445                     }
    446                 }
    447 
    448                 int bytesWritten = mSource.read(dataBuffer);
    449                 if (bytesWritten <= 0) {
    450                     try {
    451                         // When buffer is underrun, we sleep for short time to prevent
    452                         // unnecessary CPU draining.
    453                         sleep(BUFFER_UNDERRUN_SLEEP_MS);
    454                     } catch (InterruptedException e) {
    455                         Thread.currentThread().interrupt();
    456                     }
    457                     continue;
    458                 }
    459 
    460                 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten);
    461 
    462                 synchronized (mCircularBufferMonitor) {
    463                     int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE);
    464                     int bytesToCopyInFirstPass = bytesWritten;
    465                     if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) {
    466                         bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer;
    467                     }
    468                     System.arraycopy(
    469                             dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass);
    470                     if (bytesToCopyInFirstPass < bytesWritten) {
    471                         System.arraycopy(
    472                                 dataBuffer,
    473                                 bytesToCopyInFirstPass,
    474                                 mCircularBuffer,
    475                                 0,
    476                                 bytesWritten - bytesToCopyInFirstPass);
    477                     }
    478                     mBytesFetched += bytesWritten;
    479                     mCircularBufferMonitor.notify();
    480                 }
    481             }
    482 
    483             Log.i(TAG, "Streaming stopped");
    484             mSource.close();
    485         }
    486     }
    487 }
    488