Home | History | Annotate | Download | only in media
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "chrome/browser/media/webrtc_rtp_dump_writer.h"
      6 
      7 #include "base/big_endian.h"
      8 #include "base/file_util.h"
      9 #include "base/logging.h"
     10 #include "content/public/browser/browser_thread.h"
     11 #include "third_party/zlib/zlib.h"
     12 
     13 using content::BrowserThread;
     14 
     15 namespace {
     16 
     17 static const size_t kMinimumGzipOutputBufferSize = 256;  // In bytes.
     18 
     19 const unsigned char kRtpDumpFileHeaderFirstLine[] = "#!rtpplay1.0 0.0.0.0/0\n";
     20 static const size_t kRtpDumpFileHeaderSize = 16;  // In bytes.
     21 
     22 // A helper for writing the header of the dump file.
     23 void WriteRtpDumpFileHeaderBigEndian(base::TimeTicks start,
     24                                      std::vector<uint8>* output) {
     25   size_t buffer_start_pos = output->size();
     26   output->resize(output->size() + kRtpDumpFileHeaderSize);
     27 
     28   char* buffer = reinterpret_cast<char*>(&(*output)[buffer_start_pos]);
     29 
     30   base::TimeDelta delta = start - base::TimeTicks();
     31   uint32 start_sec = delta.InSeconds();
     32   base::WriteBigEndian(buffer, start_sec);
     33   buffer += sizeof(start_sec);
     34 
     35   uint32 start_usec =
     36       delta.InMilliseconds() * base::Time::kMicrosecondsPerMillisecond;
     37   base::WriteBigEndian(buffer, start_usec);
     38   buffer += sizeof(start_usec);
     39 
     40   // Network source, always 0.
     41   base::WriteBigEndian(buffer, uint32(0));
     42   buffer += sizeof(uint32);
     43 
     44   // UDP port, always 0.
     45   base::WriteBigEndian(buffer, uint16(0));
     46   buffer += sizeof(uint16);
     47 
     48   // 2 bytes padding.
     49   base::WriteBigEndian(buffer, uint16(0));
     50 }
     51 
     52 // The header size for each packet dump.
     53 static const size_t kPacketDumpHeaderSize = 8;  // In bytes.
     54 
     55 // A helper for writing the header for each packet dump.
     56 // |start| is the time when the recording is started.
     57 // |dump_length| is the length of the packet dump including this header.
     58 // |packet_length| is the length of the RTP packet header.
     59 void WritePacketDumpHeaderBigEndian(const base::TimeTicks& start,
     60                                     uint16 dump_length,
     61                                     uint16 packet_length,
     62                                     std::vector<uint8>* output) {
     63   size_t buffer_start_pos = output->size();
     64   output->resize(output->size() + kPacketDumpHeaderSize);
     65 
     66   char* buffer = reinterpret_cast<char*>(&(*output)[buffer_start_pos]);
     67 
     68   base::WriteBigEndian(buffer, dump_length);
     69   buffer += sizeof(dump_length);
     70 
     71   base::WriteBigEndian(buffer, packet_length);
     72   buffer += sizeof(packet_length);
     73 
     74   uint32 elapsed =
     75       static_cast<uint32>((base::TimeTicks::Now() - start).InMilliseconds());
     76   base::WriteBigEndian(buffer, elapsed);
     77 }
     78 
     79 // Append |src_len| bytes from |src| to |dest|.
     80 void AppendToBuffer(const uint8* src,
     81                     size_t src_len,
     82                     std::vector<uint8>* dest) {
     83   size_t old_dest_size = dest->size();
     84   dest->resize(old_dest_size + src_len);
     85   memcpy(&(*dest)[old_dest_size], src, src_len);
     86 }
     87 
     88 }  // namespace
     89 
     90 // This class is running on the FILE thread for compressing and writing the
     91 // dump buffer to disk.
     92 class WebRtcRtpDumpWriter::FileThreadWorker {
     93  public:
     94   explicit FileThreadWorker(const base::FilePath& dump_path)
     95       : dump_path_(dump_path) {
     96     thread_checker_.DetachFromThread();
     97 
     98     memset(&stream_, 0, sizeof(stream_));
     99     int result = deflateInit2(&stream_,
    100                               Z_DEFAULT_COMPRESSION,
    101                               Z_DEFLATED,
    102                               // windowBits = 15 is default, 16 is added to
    103                               // produce a gzip header + trailer.
    104                               15 + 16,
    105                               8,  // memLevel = 8 is default.
    106                               Z_DEFAULT_STRATEGY);
    107     DCHECK_EQ(Z_OK, result);
    108   }
    109 
    110   ~FileThreadWorker() {
    111     DCHECK(thread_checker_.CalledOnValidThread());
    112 
    113     // Makes sure all allocations are freed.
    114     deflateEnd(&stream_);
    115   }
    116 
    117   // Compresses the data in |buffer| and write to the dump file. If |end_stream|
    118   // is true, the compression stream will be ended and the dump file cannot be
    119   // written to any more.
    120   void CompressAndWriteToFileOnFileThread(
    121       scoped_ptr<std::vector<uint8> > buffer,
    122       bool end_stream,
    123       FlushResult* result,
    124       size_t* bytes_written) {
    125     DCHECK(thread_checker_.CalledOnValidThread());
    126 
    127     // This is called either when the in-memory buffer is full or the dump
    128     // should be ended.
    129     DCHECK(!buffer->empty() || end_stream);
    130 
    131     *result = FLUSH_RESULT_SUCCESS;
    132     *bytes_written = 0;
    133 
    134     // There may be nothing to compress/write if there is no RTP packet since
    135     // the last flush.
    136     if (!buffer->empty()) {
    137       *bytes_written = CompressAndWriteBufferToFile(buffer.get(), result);
    138     } else if (!base::PathExists(dump_path_)) {
    139       // If the dump does not exist, it means there is no RTP packet recorded.
    140       // Return FLUSH_RESULT_NO_DATA to indicate no dump file created.
    141       *result = FLUSH_RESULT_NO_DATA;
    142     }
    143 
    144     if (end_stream && !EndDumpFile())
    145       *result = FLUSH_RESULT_FAILURE;
    146   }
    147 
    148  private:
    149   // Helper for CompressAndWriteToFileOnFileThread to compress and write one
    150   // dump.
    151   size_t CompressAndWriteBufferToFile(std::vector<uint8>* buffer,
    152                                       FlushResult* result) {
    153     DCHECK(thread_checker_.CalledOnValidThread());
    154     DCHECK(buffer->size());
    155 
    156     *result = FLUSH_RESULT_SUCCESS;
    157 
    158     std::vector<uint8> compressed_buffer;
    159     if (!Compress(buffer, &compressed_buffer)) {
    160       DVLOG(2) << "Compressing buffer failed.";
    161       *result = FLUSH_RESULT_FAILURE;
    162       return 0;
    163     }
    164 
    165     int bytes_written = -1;
    166 
    167     if (base::PathExists(dump_path_)) {
    168       bytes_written = base::AppendToFile(
    169           dump_path_,
    170           reinterpret_cast<const char*>(&compressed_buffer[0]),
    171           compressed_buffer.size());
    172     } else {
    173       bytes_written = base::WriteFile(
    174           dump_path_,
    175           reinterpret_cast<const char*>(&compressed_buffer[0]),
    176           compressed_buffer.size());
    177     }
    178 
    179     if (bytes_written == -1) {
    180       DVLOG(2) << "Writing file failed: " << dump_path_.value();
    181       *result = FLUSH_RESULT_FAILURE;
    182       return 0;
    183     }
    184 
    185     DCHECK_EQ(static_cast<size_t>(bytes_written), compressed_buffer.size());
    186     return bytes_written;
    187   }
    188 
    189   // Compresses |input| into |output|.
    190   bool Compress(std::vector<uint8>* input, std::vector<uint8>* output) {
    191     DCHECK(thread_checker_.CalledOnValidThread());
    192     int result = Z_OK;
    193 
    194     output->resize(std::max(kMinimumGzipOutputBufferSize, input->size()));
    195 
    196     stream_.next_in = &(*input)[0];
    197     stream_.avail_in = input->size();
    198     stream_.next_out = &(*output)[0];
    199     stream_.avail_out = output->size();
    200 
    201     result = deflate(&stream_, Z_SYNC_FLUSH);
    202     DCHECK_EQ(Z_OK, result);
    203     DCHECK_EQ(0U, stream_.avail_in);
    204 
    205     output->resize(output->size() - stream_.avail_out);
    206 
    207     stream_.next_in = NULL;
    208     stream_.next_out = NULL;
    209     stream_.avail_out = 0;
    210     return true;
    211   }
    212 
    213   // Ends the compression stream and completes the dump file.
    214   bool EndDumpFile() {
    215     DCHECK(thread_checker_.CalledOnValidThread());
    216 
    217     std::vector<uint8> output_buffer;
    218     output_buffer.resize(kMinimumGzipOutputBufferSize);
    219 
    220     stream_.next_in = NULL;
    221     stream_.avail_in = 0;
    222     stream_.next_out = &output_buffer[0];
    223     stream_.avail_out = output_buffer.size();
    224 
    225     int result = deflate(&stream_, Z_FINISH);
    226     DCHECK_EQ(Z_STREAM_END, result);
    227 
    228     result = deflateEnd(&stream_);
    229     DCHECK_EQ(Z_OK, result);
    230 
    231     output_buffer.resize(output_buffer.size() - stream_.avail_out);
    232 
    233     memset(&stream_, 0, sizeof(z_stream));
    234 
    235     DCHECK(!output_buffer.empty());
    236     int bytes_written =
    237         base::AppendToFile(dump_path_,
    238                            reinterpret_cast<const char*>(&output_buffer[0]),
    239                            output_buffer.size());
    240 
    241     return bytes_written > 0;
    242   }
    243 
    244   const base::FilePath dump_path_;
    245 
    246   z_stream stream_;
    247 
    248   base::ThreadChecker thread_checker_;
    249 
    250   DISALLOW_COPY_AND_ASSIGN(FileThreadWorker);
    251 };
    252 
    253 WebRtcRtpDumpWriter::WebRtcRtpDumpWriter(
    254     const base::FilePath& incoming_dump_path,
    255     const base::FilePath& outgoing_dump_path,
    256     size_t max_dump_size,
    257     const base::Closure& max_dump_size_reached_callback)
    258     : max_dump_size_(max_dump_size),
    259       max_dump_size_reached_callback_(max_dump_size_reached_callback),
    260       total_dump_size_on_disk_(0),
    261       incoming_file_thread_worker_(new FileThreadWorker(incoming_dump_path)),
    262       outgoing_file_thread_worker_(new FileThreadWorker(outgoing_dump_path)),
    263       weak_ptr_factory_(this) {
    264 }
    265 
    266 WebRtcRtpDumpWriter::~WebRtcRtpDumpWriter() {
    267   DCHECK(thread_checker_.CalledOnValidThread());
    268 
    269   bool success = BrowserThread::DeleteSoon(
    270       BrowserThread::FILE, FROM_HERE, incoming_file_thread_worker_.release());
    271   DCHECK(success);
    272 
    273   success = BrowserThread::DeleteSoon(
    274       BrowserThread::FILE, FROM_HERE, outgoing_file_thread_worker_.release());
    275   DCHECK(success);
    276 }
    277 
    278 void WebRtcRtpDumpWriter::WriteRtpPacket(const uint8* packet_header,
    279                                          size_t header_length,
    280                                          size_t packet_length,
    281                                          bool incoming) {
    282   DCHECK(thread_checker_.CalledOnValidThread());
    283 
    284   static const size_t kMaxInMemoryBufferSize = 65536;
    285 
    286   std::vector<uint8>* dest_buffer =
    287       incoming ? &incoming_buffer_ : &outgoing_buffer_;
    288 
    289   // We use the capacity of the buffer to indicate if the buffer has been
    290   // initialized and if the dump file header has been created.
    291   if (!dest_buffer->capacity()) {
    292     dest_buffer->reserve(std::min(kMaxInMemoryBufferSize, max_dump_size_));
    293 
    294     start_time_ = base::TimeTicks::Now();
    295 
    296     // Writes the dump file header.
    297     AppendToBuffer(kRtpDumpFileHeaderFirstLine,
    298                    arraysize(kRtpDumpFileHeaderFirstLine) - 1,
    299                    dest_buffer);
    300     WriteRtpDumpFileHeaderBigEndian(start_time_, dest_buffer);
    301   }
    302 
    303   size_t packet_dump_length = kPacketDumpHeaderSize + header_length;
    304 
    305   // Flushes the buffer to disk if the buffer is full.
    306   if (dest_buffer->size() + packet_dump_length > dest_buffer->capacity())
    307     FlushBuffer(incoming, false, FlushDoneCallback());
    308 
    309   WritePacketDumpHeaderBigEndian(
    310       start_time_, packet_dump_length, packet_length, dest_buffer);
    311 
    312   // Writes the actual RTP packet header.
    313   AppendToBuffer(packet_header, header_length, dest_buffer);
    314 }
    315 
    316 void WebRtcRtpDumpWriter::EndDump(RtpDumpType type,
    317                                   const EndDumpCallback& finished_callback) {
    318   DCHECK(thread_checker_.CalledOnValidThread());
    319   DCHECK(type == RTP_DUMP_OUTGOING || incoming_file_thread_worker_ != NULL);
    320   DCHECK(type == RTP_DUMP_INCOMING || outgoing_file_thread_worker_ != NULL);
    321 
    322   bool incoming = (type == RTP_DUMP_BOTH || type == RTP_DUMP_INCOMING);
    323   EndDumpContext context(type, finished_callback);
    324 
    325   // End the incoming dump first if required. OnDumpEnded will continue to end
    326   // the outgoing dump if necessary.
    327   FlushBuffer(incoming,
    328               true,
    329               base::Bind(&WebRtcRtpDumpWriter::OnDumpEnded,
    330                          weak_ptr_factory_.GetWeakPtr(),
    331                          context,
    332                          incoming));
    333 }
    334 
    335 size_t WebRtcRtpDumpWriter::max_dump_size() const {
    336   DCHECK(thread_checker_.CalledOnValidThread());
    337   return max_dump_size_;
    338 }
    339 
    340 WebRtcRtpDumpWriter::EndDumpContext::EndDumpContext(
    341     RtpDumpType type,
    342     const EndDumpCallback& callback)
    343     : type(type),
    344       incoming_succeeded(false),
    345       outgoing_succeeded(false),
    346       callback(callback) {
    347 }
    348 
    349 WebRtcRtpDumpWriter::EndDumpContext::~EndDumpContext() {
    350 }
    351 
    352 void WebRtcRtpDumpWriter::FlushBuffer(bool incoming,
    353                                       bool end_stream,
    354                                       const FlushDoneCallback& callback) {
    355   DCHECK(thread_checker_.CalledOnValidThread());
    356 
    357   scoped_ptr<std::vector<uint8> > new_buffer(new std::vector<uint8>());
    358 
    359   if (incoming) {
    360     new_buffer->reserve(incoming_buffer_.capacity());
    361     new_buffer->swap(incoming_buffer_);
    362   } else {
    363     new_buffer->reserve(outgoing_buffer_.capacity());
    364     new_buffer->swap(outgoing_buffer_);
    365   }
    366 
    367   scoped_ptr<FlushResult> result(new FlushResult(FLUSH_RESULT_FAILURE));
    368 
    369   scoped_ptr<size_t> bytes_written(new size_t(0));
    370 
    371   FileThreadWorker* worker = incoming ? incoming_file_thread_worker_.get()
    372                                       : outgoing_file_thread_worker_.get();
    373 
    374   // Using "Unretained(worker)" because |worker| is owner by this object and it
    375   // guaranteed to be deleted on the FILE thread before this object goes away.
    376   base::Closure task =
    377       base::Bind(&FileThreadWorker::CompressAndWriteToFileOnFileThread,
    378                  base::Unretained(worker),
    379                  Passed(&new_buffer),
    380                  end_stream,
    381                  result.get(),
    382                  bytes_written.get());
    383 
    384   // OnFlushDone is necessary to avoid running the callback after this
    385   // object is gone.
    386   base::Closure reply = base::Bind(&WebRtcRtpDumpWriter::OnFlushDone,
    387                                    weak_ptr_factory_.GetWeakPtr(),
    388                                    callback,
    389                                    Passed(&result),
    390                                    Passed(&bytes_written));
    391 
    392   // Define the task and reply outside the method call so that getting and
    393   // passing the scoped_ptr does not depend on the argument evaluation order.
    394   BrowserThread::PostTaskAndReply(BrowserThread::FILE, FROM_HERE, task, reply);
    395 
    396   if (end_stream) {
    397     bool success = BrowserThread::DeleteSoon(
    398         BrowserThread::FILE,
    399         FROM_HERE,
    400         incoming ? incoming_file_thread_worker_.release()
    401                  : outgoing_file_thread_worker_.release());
    402     DCHECK(success);
    403   }
    404 }
    405 
    406 void WebRtcRtpDumpWriter::OnFlushDone(const FlushDoneCallback& callback,
    407                                       const scoped_ptr<FlushResult>& result,
    408                                       const scoped_ptr<size_t>& bytes_written) {
    409   DCHECK(thread_checker_.CalledOnValidThread());
    410 
    411   total_dump_size_on_disk_ += *bytes_written;
    412 
    413   if (total_dump_size_on_disk_ >= max_dump_size_ &&
    414       !max_dump_size_reached_callback_.is_null()) {
    415     max_dump_size_reached_callback_.Run();
    416   }
    417 
    418   // Returns success for FLUSH_RESULT_MAX_SIZE_REACHED since the dump is still
    419   // valid.
    420   if (!callback.is_null()) {
    421     callback.Run(*result != FLUSH_RESULT_FAILURE &&
    422                  *result != FLUSH_RESULT_NO_DATA);
    423   }
    424 }
    425 
    426 void WebRtcRtpDumpWriter::OnDumpEnded(EndDumpContext context,
    427                                       bool incoming,
    428                                       bool success) {
    429   DCHECK(thread_checker_.CalledOnValidThread());
    430 
    431   DVLOG(2) << "Dump ended, incoming = " << incoming
    432            << ", succeeded = " << success;
    433 
    434   if (incoming)
    435     context.incoming_succeeded = success;
    436   else
    437     context.outgoing_succeeded = success;
    438 
    439   // End the outgoing dump if needed.
    440   if (incoming && context.type == RTP_DUMP_BOTH) {
    441     FlushBuffer(false,
    442                 true,
    443                 base::Bind(&WebRtcRtpDumpWriter::OnDumpEnded,
    444                            weak_ptr_factory_.GetWeakPtr(),
    445                            context,
    446                            false));
    447     return;
    448   }
    449 
    450   // This object might be deleted after running the callback.
    451   context.callback.Run(context.incoming_succeeded, context.outgoing_succeeded);
    452 }
    453