1 /* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tv.tuner.source; 18 19 import android.content.Context; 20 import android.os.Environment; 21 import android.util.Log; 22 import android.util.SparseBooleanArray; 23 import com.android.tv.common.SoftPreconditions; 24 import com.android.tv.tuner.ChannelScanFileParser.ScanChannel; 25 import com.android.tv.tuner.TunerFeatures; 26 import com.android.tv.tuner.data.TunerChannel; 27 import com.android.tv.tuner.ts.TsParser; 28 import com.android.tv.tuner.tvinput.EventDetector; 29 import com.android.tv.tuner.tvinput.FileSourceEventDetector; 30 import com.google.android.exoplayer.C; 31 import com.google.android.exoplayer.upstream.DataSpec; 32 import java.io.BufferedInputStream; 33 import java.io.File; 34 import java.io.FileInputStream; 35 import java.io.IOException; 36 import java.util.List; 37 import java.util.concurrent.atomic.AtomicLong; 38 39 /** 40 * Provides MPEG-2 TS stream sources for both channel scanning and channel playing from a local file 41 * generated by capturing TV signal. 42 */ 43 public class FileTsStreamer implements TsStreamer { 44 private static final String TAG = "FileTsStreamer"; 45 46 private static final int TS_PACKET_SIZE = 188; 47 private static final int TS_SYNC_BYTE = 0x47; 48 private static final int MIN_READ_UNIT = TS_PACKET_SIZE * 10; 49 private static final int READ_BUFFER_SIZE = MIN_READ_UNIT * 10; // ~20KB 50 private static final int CIRCULAR_BUFFER_SIZE = MIN_READ_UNIT * 4000; // ~ 8MB 51 private static final int PADDING_SIZE = MIN_READ_UNIT * 1000; // ~2MB 52 private static final int READ_TIMEOUT_MS = 10000; // 10 secs. 53 private static final int BUFFER_UNDERRUN_SLEEP_MS = 10; 54 private static final String FILE_DIR = 55 new File(Environment.getExternalStorageDirectory(), "Streams").getAbsolutePath(); 56 57 // Virtual frequency base used for file-based source 58 public static final int FREQ_BASE = 100; 59 60 private final Object mCircularBufferMonitor = new Object(); 61 private final byte[] mCircularBuffer = new byte[CIRCULAR_BUFFER_SIZE]; 62 private final FileSourceEventDetector mEventDetector; 63 private final Context mContext; 64 65 private long mBytesFetched; 66 private long mLastReadPosition; 67 private boolean mStreaming; 68 69 private Thread mStreamingThread; 70 private StreamProvider mSource; 71 72 public static class FileDataSource extends TsDataSource { 73 private final FileTsStreamer mTsStreamer; 74 private final AtomicLong mLastReadPosition = new AtomicLong(0); 75 private long mStartBufferedPosition; 76 77 private FileDataSource(FileTsStreamer tsStreamer) { 78 mTsStreamer = tsStreamer; 79 mStartBufferedPosition = tsStreamer.getBufferedPosition(); 80 } 81 82 @Override 83 public long getBufferedPosition() { 84 return mTsStreamer.getBufferedPosition() - mStartBufferedPosition; 85 } 86 87 @Override 88 public long getLastReadPosition() { 89 return mLastReadPosition.get(); 90 } 91 92 @Override 93 public void shiftStartPosition(long offset) { 94 SoftPreconditions.checkState(mLastReadPosition.get() == 0); 95 SoftPreconditions.checkArgument(0 <= offset && offset <= getBufferedPosition()); 96 mStartBufferedPosition += offset; 97 } 98 99 @Override 100 public long open(DataSpec dataSpec) throws IOException { 101 mLastReadPosition.set(0); 102 return C.LENGTH_UNBOUNDED; 103 } 104 105 @Override 106 public void close() {} 107 108 @Override 109 public int read(byte[] buffer, int offset, int readLength) throws IOException { 110 int ret = 111 mTsStreamer.readAt( 112 mStartBufferedPosition + mLastReadPosition.get(), 113 buffer, 114 offset, 115 readLength); 116 if (ret > 0) { 117 mLastReadPosition.addAndGet(ret); 118 } 119 return ret; 120 } 121 } 122 123 /** 124 * Creates {@link TsStreamer} for scanning & playing MPEG-2 TS file. 125 * 126 * @param eventListener the listener for channel & program information 127 */ 128 public FileTsStreamer(EventDetector.EventListener eventListener, Context context) { 129 mEventDetector = 130 new FileSourceEventDetector( 131 eventListener, TunerFeatures.ENABLE_FILE_DVB.isEnabled(context)); 132 mContext = context; 133 } 134 135 @Override 136 public boolean startStream(ScanChannel channel) { 137 String filepath = new File(FILE_DIR, channel.filename).getAbsolutePath(); 138 mSource = new StreamProvider(filepath); 139 if (!mSource.isReady()) { 140 return false; 141 } 142 mEventDetector.start(mSource, FileSourceEventDetector.ALL_PROGRAM_NUMBERS); 143 mSource.addPidFilter(TsParser.PAT_PID); 144 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 145 if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) { 146 mSource.addPidFilter(TsParser.DVB_EIT_PID); 147 mSource.addPidFilter(TsParser.DVB_SDT_PID); 148 } 149 synchronized (mCircularBufferMonitor) { 150 if (mStreaming) { 151 return true; 152 } 153 mStreaming = true; 154 } 155 156 mStreamingThread = new StreamingThread(); 157 mStreamingThread.start(); 158 Log.i(TAG, "Streaming started"); 159 return true; 160 } 161 162 @Override 163 public boolean startStream(TunerChannel channel) { 164 Log.i(TAG, "tuneToChannel with: " + channel.getFilepath()); 165 mSource = new StreamProvider(channel.getFilepath()); 166 if (!mSource.isReady()) { 167 return false; 168 } 169 mEventDetector.start(mSource, channel.getProgramNumber()); 170 mSource.addPidFilter(channel.getVideoPid()); 171 for (Integer i : channel.getAudioPids()) { 172 mSource.addPidFilter(i); 173 } 174 mSource.addPidFilter(channel.getPcrPid()); 175 mSource.addPidFilter(TsParser.PAT_PID); 176 mSource.addPidFilter(TsParser.ATSC_SI_BASE_PID); 177 if (TunerFeatures.ENABLE_FILE_DVB.isEnabled(mContext)) { 178 mSource.addPidFilter(TsParser.DVB_EIT_PID); 179 mSource.addPidFilter(TsParser.DVB_SDT_PID); 180 } 181 synchronized (mCircularBufferMonitor) { 182 if (mStreaming) { 183 return true; 184 } 185 mStreaming = true; 186 } 187 188 mStreamingThread = new StreamingThread(); 189 mStreamingThread.start(); 190 Log.i(TAG, "Streaming started"); 191 return true; 192 } 193 194 /** 195 * Blocks the current thread until the streaming thread stops. In rare cases when the tuner 196 * device is overloaded this can take a while, but usually it returns pretty quickly. 197 */ 198 @Override 199 public void stopStream() { 200 synchronized (mCircularBufferMonitor) { 201 mStreaming = false; 202 mCircularBufferMonitor.notify(); 203 } 204 205 try { 206 if (mStreamingThread != null) { 207 mStreamingThread.join(); 208 } 209 } catch (InterruptedException e) { 210 Thread.currentThread().interrupt(); 211 } 212 } 213 214 @Override 215 public TsDataSource createDataSource() { 216 return new FileDataSource(this); 217 } 218 219 /** 220 * Returns the current buffered position from the file. 221 * 222 * @return the current buffered position 223 */ 224 public long getBufferedPosition() { 225 synchronized (mCircularBufferMonitor) { 226 return mBytesFetched; 227 } 228 } 229 230 /** Provides MPEG-2 transport stream from a local file. Stream can be filtered by PID. */ 231 public static class StreamProvider { 232 private final String mFilepath; 233 private final SparseBooleanArray mPids = new SparseBooleanArray(); 234 private final byte[] mPreBuffer = new byte[READ_BUFFER_SIZE]; 235 236 private BufferedInputStream mInputStream; 237 238 private StreamProvider(String filepath) { 239 mFilepath = filepath; 240 open(filepath); 241 } 242 243 private void open(String filepath) { 244 try { 245 mInputStream = new BufferedInputStream(new FileInputStream(filepath)); 246 } catch (IOException e) { 247 Log.e(TAG, "Error opening input stream", e); 248 mInputStream = null; 249 } 250 } 251 252 private boolean isReady() { 253 return mInputStream != null; 254 } 255 256 /** Returns the file path of the MPEG-2 TS file. */ 257 public String getFilepath() { 258 return mFilepath; 259 } 260 261 /** Adds a pid for filtering from the MPEG-2 TS file. */ 262 public void addPidFilter(int pid) { 263 mPids.put(pid, true); 264 } 265 266 /** Returns whether the current pid filter is empty or not. */ 267 public boolean isFilterEmpty() { 268 return mPids.size() == 0; 269 } 270 271 /** Clears the current pid filter. */ 272 public void clearPidFilter() { 273 mPids.clear(); 274 } 275 276 /** 277 * Returns whether a pid is in the pid filter or not. 278 * 279 * @param pid the pid to check 280 */ 281 public boolean isInFilter(int pid) { 282 return mPids.get(pid); 283 } 284 285 /** 286 * Reads from the MPEG-2 TS file to buffer. 287 * 288 * @param inputBuffer to read 289 * @return the number of read bytes 290 */ 291 private int read(byte[] inputBuffer) { 292 int readSize = readInternal(); 293 if (readSize <= 0) { 294 // Reached the end of stream. Restart from the beginning. 295 close(); 296 open(mFilepath); 297 if (mInputStream == null) { 298 return -1; 299 } 300 readSize = readInternal(); 301 } 302 303 if (mPreBuffer[0] != TS_SYNC_BYTE) { 304 Log.e(TAG, "Error reading input stream - no TS sync found"); 305 return -1; 306 } 307 int filteredSize = 0; 308 for (int i = 0, destPos = 0; i < readSize; i += TS_PACKET_SIZE) { 309 if (mPreBuffer[i] == TS_SYNC_BYTE) { 310 int pid = ((mPreBuffer[i + 1] & 0x1f) << 8) + (mPreBuffer[i + 2] & 0xff); 311 if (mPids.get(pid)) { 312 System.arraycopy(mPreBuffer, i, inputBuffer, destPos, TS_PACKET_SIZE); 313 destPos += TS_PACKET_SIZE; 314 filteredSize += TS_PACKET_SIZE; 315 } 316 } 317 } 318 return filteredSize; 319 } 320 321 private int readInternal() { 322 int readSize; 323 try { 324 readSize = mInputStream.read(mPreBuffer, 0, mPreBuffer.length); 325 } catch (IOException e) { 326 Log.e(TAG, "Error reading input stream", e); 327 return -1; 328 } 329 return readSize; 330 } 331 332 private void close() { 333 try { 334 mInputStream.close(); 335 } catch (IOException e) { 336 Log.e(TAG, "Error closing input stream:", e); 337 } 338 mInputStream = null; 339 } 340 } 341 342 /** 343 * Reads data from internal buffer. 344 * 345 * @param pos the position to read from 346 * @param buffer to read 347 * @param offset start position of the read buffer 348 * @param amount number of bytes to read 349 * @return number of read bytes when successful, {@code -1} otherwise 350 * @throws IOException 351 */ 352 public int readAt(long pos, byte[] buffer, int offset, int amount) throws IOException { 353 synchronized (mCircularBufferMonitor) { 354 long initialBytesFetched = mBytesFetched; 355 while (mBytesFetched < pos + amount && mStreaming) { 356 try { 357 mCircularBufferMonitor.wait(READ_TIMEOUT_MS); 358 } catch (InterruptedException e) { 359 // Wait again. 360 Thread.currentThread().interrupt(); 361 } 362 if (initialBytesFetched == mBytesFetched) { 363 Log.w(TAG, "No data update for " + READ_TIMEOUT_MS + "ms. returning -1."); 364 365 // Returning -1 will make demux report EOS so that the input service can retry 366 // the playback. 367 return -1; 368 } 369 } 370 if (!mStreaming) { 371 Log.w(TAG, "Stream is already stopped."); 372 return -1; 373 } 374 if (mBytesFetched - CIRCULAR_BUFFER_SIZE > pos) { 375 Log.e(TAG, "Demux is requesting the data which is already overwritten."); 376 return -1; 377 } 378 int posInBuffer = (int) (pos % CIRCULAR_BUFFER_SIZE); 379 int bytesToCopyInFirstPass = amount; 380 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 381 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 382 } 383 System.arraycopy(mCircularBuffer, posInBuffer, buffer, offset, bytesToCopyInFirstPass); 384 if (bytesToCopyInFirstPass < amount) { 385 System.arraycopy( 386 mCircularBuffer, 387 0, 388 buffer, 389 offset + bytesToCopyInFirstPass, 390 amount - bytesToCopyInFirstPass); 391 } 392 mLastReadPosition = pos + amount; 393 mCircularBufferMonitor.notify(); 394 return amount; 395 } 396 } 397 398 /** 399 * Adds {@link ScanChannel} instance for local files. 400 * 401 * @param output a list of channels where the results will be placed in 402 */ 403 public static void addLocalStreamFiles(List<ScanChannel> output) { 404 File dir = new File(FILE_DIR); 405 if (!dir.exists()) return; 406 407 File[] tsFiles = dir.listFiles(); 408 if (tsFiles == null) return; 409 int freq = FileTsStreamer.FREQ_BASE; 410 for (File file : tsFiles) { 411 if (!file.isFile()) continue; 412 output.add(ScanChannel.forFile(freq, file.getName())); 413 freq += 100; 414 } 415 } 416 417 /** 418 * A thread managing a circular buffer that holds stream data to be consumed by player. Keeps 419 * reading data in from a {@link StreamProvider} to hold enough amount for buffering. Started 420 * and stopped by {@link #startStream()} and {@link #stopStream()}, respectively. 421 */ 422 private class StreamingThread extends Thread { 423 @Override 424 public void run() { 425 byte[] dataBuffer = new byte[READ_BUFFER_SIZE]; 426 427 synchronized (mCircularBufferMonitor) { 428 mBytesFetched = 0; 429 mLastReadPosition = 0; 430 } 431 432 while (true) { 433 synchronized (mCircularBufferMonitor) { 434 while ((mBytesFetched - mLastReadPosition + PADDING_SIZE) > CIRCULAR_BUFFER_SIZE 435 && mStreaming) { 436 try { 437 mCircularBufferMonitor.wait(); 438 } catch (InterruptedException e) { 439 // Wait again. 440 Thread.currentThread().interrupt(); 441 } 442 } 443 if (!mStreaming) { 444 break; 445 } 446 } 447 448 int bytesWritten = mSource.read(dataBuffer); 449 if (bytesWritten <= 0) { 450 try { 451 // When buffer is underrun, we sleep for short time to prevent 452 // unnecessary CPU draining. 453 sleep(BUFFER_UNDERRUN_SLEEP_MS); 454 } catch (InterruptedException e) { 455 Thread.currentThread().interrupt(); 456 } 457 continue; 458 } 459 460 mEventDetector.feedTSStream(dataBuffer, 0, bytesWritten); 461 462 synchronized (mCircularBufferMonitor) { 463 int posInBuffer = (int) (mBytesFetched % CIRCULAR_BUFFER_SIZE); 464 int bytesToCopyInFirstPass = bytesWritten; 465 if (posInBuffer + bytesToCopyInFirstPass > mCircularBuffer.length) { 466 bytesToCopyInFirstPass = mCircularBuffer.length - posInBuffer; 467 } 468 System.arraycopy( 469 dataBuffer, 0, mCircularBuffer, posInBuffer, bytesToCopyInFirstPass); 470 if (bytesToCopyInFirstPass < bytesWritten) { 471 System.arraycopy( 472 dataBuffer, 473 bytesToCopyInFirstPass, 474 mCircularBuffer, 475 0, 476 bytesWritten - bytesToCopyInFirstPass); 477 } 478 mBytesFetched += bytesWritten; 479 mCircularBufferMonitor.notify(); 480 } 481 } 482 483 Log.i(TAG, "Streaming stopped"); 484 mSource.close(); 485 } 486 } 487 } 488