Home | History | Annotate | Download | only in linux
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "remoting/host/linux/audio_pipe_reader.h"
      6 
      7 #include <fcntl.h>
      8 #include <sys/stat.h>
      9 #include <sys/types.h>
     10 #include <unistd.h>
     11 
     12 #include "base/files/file_path.h"
     13 #include "base/logging.h"
     14 #include "base/posix/eintr_wrapper.h"
     15 #include "base/stl_util.h"
     16 
     17 namespace remoting {
     18 
     19 namespace {
     20 
     21 // PulseAudio's module-pipe-sink must be configured to use the following
     22 // parameters for the sink we read from.
     23 const int kSamplesPerSecond = 48000;
     24 const int kChannels = 2;
     25 const int kBytesPerSample = 2;
     26 const int kSampleBytesPerSecond =
     27     kSamplesPerSecond * kChannels * kBytesPerSample;
     28 
     29 // Read data from the pipe every 40ms.
     30 const int kCapturingPeriodMs = 40;
     31 
     32 // Size of the pipe buffer in milliseconds.
     33 const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
     34 
     35 // Size of the pipe buffer in bytes.
     36 const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
     37     base::Time::kMillisecondsPerSecond;
     38 
     39 #if !defined(F_SETPIPE_SZ)
     40 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
     41 // to compile this code on machines with older kernel.
     42 #define F_SETPIPE_SZ 1031
     43 #endif  // defined(F_SETPIPE_SZ)
     44 
     45 }  // namespace
     46 
     47 // static
     48 scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
     49     scoped_refptr<base::SingleThreadTaskRunner> task_runner,
     50     const base::FilePath& pipe_name) {
     51   // Create a reference to the new AudioPipeReader before posting the
     52   // StartOnAudioThread task, otherwise it may be deleted on the audio
     53   // thread before we return.
     54   scoped_refptr<AudioPipeReader> pipe_reader =
     55       new AudioPipeReader(task_runner);
     56   task_runner->PostTask(FROM_HERE, base::Bind(
     57       &AudioPipeReader::StartOnAudioThread, pipe_reader, pipe_name));
     58   return pipe_reader;
     59 }
     60 
     61 void AudioPipeReader::StartOnAudioThread(const base::FilePath& pipe_name) {
     62   DCHECK(task_runner_->BelongsToCurrentThread());
     63 
     64   pipe_fd_ = HANDLE_EINTR(open(
     65       pipe_name.value().c_str(), O_RDONLY | O_NONBLOCK));
     66   if (pipe_fd_ < 0) {
     67     LOG(ERROR) << "Failed to open " << pipe_name.value();
     68     return;
     69   }
     70 
     71   // Set buffer size for the pipe.
     72   int result = HANDLE_EINTR(
     73       fcntl(pipe_fd_, F_SETPIPE_SZ, kPipeBufferSizeBytes));
     74   if (result < 0) {
     75     PLOG(ERROR) << "fcntl";
     76   }
     77 
     78   WaitForPipeReadable();
     79 }
     80 
     81 AudioPipeReader::AudioPipeReader(
     82     scoped_refptr<base::SingleThreadTaskRunner> task_runner)
     83     : task_runner_(task_runner),
     84       observers_(new ObserverListThreadSafe<StreamObserver>()) {
     85 }
     86 
     87 AudioPipeReader::~AudioPipeReader() {
     88 }
     89 
     90 void AudioPipeReader::AddObserver(StreamObserver* observer) {
     91   observers_->AddObserver(observer);
     92 }
     93 void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
     94   observers_->RemoveObserver(observer);
     95 }
     96 
     97 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
     98   DCHECK_EQ(fd, pipe_fd_);
     99   StartTimer();
    100 }
    101 
    102 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
    103   NOTREACHED();
    104 }
    105 
    106 void AudioPipeReader::StartTimer() {
    107   DCHECK(task_runner_->BelongsToCurrentThread());
    108   started_time_ = base::TimeTicks::Now();
    109   last_capture_position_ = 0;
    110   timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
    111                this, &AudioPipeReader::DoCapture);
    112 }
    113 
    114 void AudioPipeReader::DoCapture() {
    115   DCHECK(task_runner_->BelongsToCurrentThread());
    116   DCHECK_GT(pipe_fd_, 0);
    117 
    118   // Calculate how much we need read from the pipe. Pulseaudio doesn't control
    119   // how much data it writes to the pipe, so we need to pace the stream, so
    120   // that we read the exact number of the samples per second we need.
    121   base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
    122   int64 stream_position_bytes = stream_position.InMilliseconds() *
    123       kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
    124   int64 bytes_to_read = stream_position_bytes - last_capture_position_;
    125 
    126   std::string data = left_over_bytes_;
    127   size_t pos = data.size();
    128   left_over_bytes_.clear();
    129   data.resize(pos + bytes_to_read);
    130 
    131   while (pos < data.size()) {
    132     int read_result = HANDLE_EINTR(
    133        read(pipe_fd_, string_as_array(&data) + pos, data.size() - pos));
    134     if (read_result > 0) {
    135       pos += read_result;
    136     } else {
    137       if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
    138         PLOG(ERROR) << "read";
    139       break;
    140     }
    141   }
    142 
    143   // Stop reading from the pipe if PulseAudio isn't writing anything.
    144   if (pos == 0) {
    145     WaitForPipeReadable();
    146     return;
    147   }
    148 
    149   // Save any incomplete samples we've read for later. Each packet should
    150   // contain integer number of samples.
    151   int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
    152   left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
    153                           incomplete_samples_bytes);
    154   data.resize(pos - incomplete_samples_bytes);
    155 
    156   last_capture_position_ += data.size();
    157   // Normally PulseAudio will keep pipe buffer full, so we should always be able
    158   // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
    159   // sure that |stream_position_bytes| doesn't go out of sync with the current
    160   // stream position.
    161   if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
    162     last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
    163   DCHECK_LE(last_capture_position_, stream_position_bytes);
    164 
    165   // Dispatch asynchronous notification to the stream observers.
    166   scoped_refptr<base::RefCountedString> data_ref =
    167       base::RefCountedString::TakeString(&data);
    168   observers_->Notify(&StreamObserver::OnDataRead, data_ref);
    169 }
    170 
    171 void AudioPipeReader::WaitForPipeReadable() {
    172   timer_.Stop();
    173   base::MessageLoopForIO::current()->WatchFileDescriptor(
    174       pipe_fd_,
    175       false,
    176       base::MessageLoopForIO::WATCH_READ,
    177       &file_descriptor_watcher_,
    178       this);
    179 }
    180 
    181 // static
    182 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
    183   audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
    184 }
    185 
    186 }  // namespace remoting
    187