Home | History | Annotate | Download | only in linux
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "remoting/host/linux/audio_pipe_reader.h"
      6 
      7 #include <fcntl.h>
      8 #include <sys/stat.h>
      9 #include <sys/types.h>
     10 #include <unistd.h>
     11 
     12 #include "base/logging.h"
     13 #include "base/posix/eintr_wrapper.h"
     14 #include "base/stl_util.h"
     15 
     16 namespace remoting {
     17 
     18 namespace {
     19 
     20 const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate *
     21                                   AudioPipeReader::kChannels *
     22                                   AudioPipeReader::kBytesPerSample;
     23 
     24 // Read data from the pipe every 40ms.
     25 const int kCapturingPeriodMs = 40;
     26 
     27 // Size of the pipe buffer in milliseconds.
     28 const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
     29 
     30 // Size of the pipe buffer in bytes.
     31 const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
     32     base::Time::kMillisecondsPerSecond;
     33 
     34 #if !defined(F_SETPIPE_SZ)
     35 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
     36 // to compile this code on machines with older kernel.
     37 #define F_SETPIPE_SZ 1031
     38 #endif  // defined(F_SETPIPE_SZ)
     39 
     40 }  // namespace
     41 
     42 // static
     43 scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
     44     scoped_refptr<base::SingleThreadTaskRunner> task_runner,
     45     const base::FilePath& pipe_path) {
     46   // Create a reference to the new AudioPipeReader before posting the
     47   // StartOnAudioThread task, otherwise it may be deleted on the audio
     48   // thread before we return.
     49   scoped_refptr<AudioPipeReader> pipe_reader =
     50       new AudioPipeReader(task_runner, pipe_path);
     51   task_runner->PostTask(
     52       FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader));
     53   return pipe_reader;
     54 }
     55 
     56 AudioPipeReader::AudioPipeReader(
     57     scoped_refptr<base::SingleThreadTaskRunner> task_runner,
     58     const base::FilePath& pipe_path)
     59     : task_runner_(task_runner),
     60       pipe_path_(pipe_path),
     61       observers_(new ObserverListThreadSafe<StreamObserver>()) {
     62 }
     63 
     64 AudioPipeReader::~AudioPipeReader() {}
     65 
     66 void AudioPipeReader::AddObserver(StreamObserver* observer) {
     67   observers_->AddObserver(observer);
     68 }
     69 void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
     70   observers_->RemoveObserver(observer);
     71 }
     72 
     73 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
     74   DCHECK_EQ(fd, pipe_.GetPlatformFile());
     75   StartTimer();
     76 }
     77 
     78 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
     79   NOTREACHED();
     80 }
     81 
     82 void AudioPipeReader::StartOnAudioThread() {
     83   DCHECK(task_runner_->BelongsToCurrentThread());
     84 
     85   if (!file_watcher_.Watch(pipe_path_.DirName(), true,
     86                            base::Bind(&AudioPipeReader::OnDirectoryChanged,
     87                                       base::Unretained(this)))) {
     88     LOG(ERROR) << "Failed to watch pulseaudio directory "
     89                << pipe_path_.DirName().value();
     90   }
     91 
     92   TryOpenPipe();
     93 }
     94 
     95 void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path,
     96                                          bool error) {
     97   DCHECK(task_runner_->BelongsToCurrentThread());
     98 
     99   if (error) {
    100     LOG(ERROR) << "File watcher returned an error.";
    101     return;
    102   }
    103 
    104   TryOpenPipe();
    105 }
    106 
    107 void AudioPipeReader::TryOpenPipe() {
    108   DCHECK(task_runner_->BelongsToCurrentThread());
    109 
    110   base::File new_pipe;
    111   new_pipe.Initialize(
    112       pipe_path_,
    113       base::File::FLAG_OPEN | base::File::FLAG_READ | base::File::FLAG_ASYNC);
    114 
    115   // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
    116   // file descriptors. Don't need to do anything if inode hasn't changed.
    117   if (new_pipe.IsValid() && pipe_.IsValid()) {
    118     struct stat old_stat;
    119     struct stat new_stat;
    120     if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 &&
    121         fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 &&
    122         old_stat.st_ino == new_stat.st_ino) {
    123       return;
    124     }
    125   }
    126 
    127   file_descriptor_watcher_.StopWatchingFileDescriptor();
    128   timer_.Stop();
    129 
    130   pipe_ = new_pipe.Pass();
    131 
    132   if (pipe_.IsValid()) {
    133     // Set O_NONBLOCK flag.
    134     if (HANDLE_EINTR(fcntl(pipe_.GetPlatformFile(), F_SETFL, O_NONBLOCK)) < 0) {
    135       PLOG(ERROR) << "fcntl";
    136       pipe_.Close();
    137       return;
    138     }
    139 
    140     // Set buffer size for the pipe.
    141     if (HANDLE_EINTR(fcntl(
    142             pipe_.GetPlatformFile(), F_SETPIPE_SZ, kPipeBufferSizeBytes)) < 0) {
    143       PLOG(ERROR) << "fcntl";
    144     }
    145 
    146     WaitForPipeReadable();
    147   }
    148 }
    149 
    150 void AudioPipeReader::StartTimer() {
    151   DCHECK(task_runner_->BelongsToCurrentThread());
    152   started_time_ = base::TimeTicks::Now();
    153   last_capture_position_ = 0;
    154   timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
    155                this, &AudioPipeReader::DoCapture);
    156 }
    157 
    158 void AudioPipeReader::DoCapture() {
    159   DCHECK(task_runner_->BelongsToCurrentThread());
    160   DCHECK(pipe_.IsValid());
    161 
    162   // Calculate how much we need read from the pipe. Pulseaudio doesn't control
    163   // how much data it writes to the pipe, so we need to pace the stream.
    164   base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
    165   int64 stream_position_bytes = stream_position.InMilliseconds() *
    166       kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
    167   int64 bytes_to_read = stream_position_bytes - last_capture_position_;
    168 
    169   std::string data = left_over_bytes_;
    170   size_t pos = data.size();
    171   left_over_bytes_.clear();
    172   data.resize(pos + bytes_to_read);
    173 
    174   while (pos < data.size()) {
    175     int read_result =
    176         pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos);
    177     if (read_result > 0) {
    178       pos += read_result;
    179     } else {
    180       if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
    181         PLOG(ERROR) << "read";
    182       break;
    183     }
    184   }
    185 
    186   // Stop reading from the pipe if PulseAudio isn't writing anything.
    187   if (pos == 0) {
    188     WaitForPipeReadable();
    189     return;
    190   }
    191 
    192   // Save any incomplete samples we've read for later. Each packet should
    193   // contain integer number of samples.
    194   int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
    195   left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
    196                           incomplete_samples_bytes);
    197   data.resize(pos - incomplete_samples_bytes);
    198 
    199   last_capture_position_ += data.size();
    200   // Normally PulseAudio will keep pipe buffer full, so we should always be able
    201   // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
    202   // sure that |stream_position_bytes| doesn't go out of sync with the current
    203   // stream position.
    204   if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
    205     last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
    206   DCHECK_LE(last_capture_position_, stream_position_bytes);
    207 
    208   // Dispatch asynchronous notification to the stream observers.
    209   scoped_refptr<base::RefCountedString> data_ref =
    210       base::RefCountedString::TakeString(&data);
    211   observers_->Notify(&StreamObserver::OnDataRead, data_ref);
    212 }
    213 
    214 void AudioPipeReader::WaitForPipeReadable() {
    215   timer_.Stop();
    216   base::MessageLoopForIO::current()->WatchFileDescriptor(
    217       pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ,
    218       &file_descriptor_watcher_, this);
    219 }
    220 
    221 // static
    222 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
    223   audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
    224 }
    225 
    226 }  // namespace remoting
    227