1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "remoting/host/linux/audio_pipe_reader.h" 6 7 #include <fcntl.h> 8 #include <sys/stat.h> 9 #include <sys/types.h> 10 #include <unistd.h> 11 12 #include "base/files/file_path.h" 13 #include "base/logging.h" 14 #include "base/posix/eintr_wrapper.h" 15 #include "base/stl_util.h" 16 17 namespace remoting { 18 19 namespace { 20 21 // PulseAudio's module-pipe-sink must be configured to use the following 22 // parameters for the sink we read from. 23 const int kSamplesPerSecond = 48000; 24 const int kChannels = 2; 25 const int kBytesPerSample = 2; 26 const int kSampleBytesPerSecond = 27 kSamplesPerSecond * kChannels * kBytesPerSample; 28 29 // Read data from the pipe every 40ms. 30 const int kCapturingPeriodMs = 40; 31 32 // Size of the pipe buffer in milliseconds. 33 const int kPipeBufferSizeMs = kCapturingPeriodMs * 2; 34 35 // Size of the pipe buffer in bytes. 36 const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond / 37 base::Time::kMillisecondsPerSecond; 38 39 #if !defined(F_SETPIPE_SZ) 40 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able 41 // to compile this code on machines with older kernel. 42 #define F_SETPIPE_SZ 1031 43 #endif // defined(F_SETPIPE_SZ) 44 45 } // namespace 46 47 // static 48 scoped_refptr<AudioPipeReader> AudioPipeReader::Create( 49 scoped_refptr<base::SingleThreadTaskRunner> task_runner, 50 const base::FilePath& pipe_name) { 51 // Create a reference to the new AudioPipeReader before posting the 52 // StartOnAudioThread task, otherwise it may be deleted on the audio 53 // thread before we return. 54 scoped_refptr<AudioPipeReader> pipe_reader = 55 new AudioPipeReader(task_runner); 56 task_runner->PostTask(FROM_HERE, base::Bind( 57 &AudioPipeReader::StartOnAudioThread, pipe_reader, pipe_name)); 58 return pipe_reader; 59 } 60 61 void AudioPipeReader::StartOnAudioThread(const base::FilePath& pipe_name) { 62 DCHECK(task_runner_->BelongsToCurrentThread()); 63 64 pipe_fd_ = HANDLE_EINTR(open( 65 pipe_name.value().c_str(), O_RDONLY | O_NONBLOCK)); 66 if (pipe_fd_ < 0) { 67 LOG(ERROR) << "Failed to open " << pipe_name.value(); 68 return; 69 } 70 71 // Set buffer size for the pipe. 72 int result = HANDLE_EINTR( 73 fcntl(pipe_fd_, F_SETPIPE_SZ, kPipeBufferSizeBytes)); 74 if (result < 0) { 75 PLOG(ERROR) << "fcntl"; 76 } 77 78 WaitForPipeReadable(); 79 } 80 81 AudioPipeReader::AudioPipeReader( 82 scoped_refptr<base::SingleThreadTaskRunner> task_runner) 83 : task_runner_(task_runner), 84 observers_(new ObserverListThreadSafe<StreamObserver>()) { 85 } 86 87 AudioPipeReader::~AudioPipeReader() { 88 } 89 90 void AudioPipeReader::AddObserver(StreamObserver* observer) { 91 observers_->AddObserver(observer); 92 } 93 void AudioPipeReader::RemoveObserver(StreamObserver* observer) { 94 observers_->RemoveObserver(observer); 95 } 96 97 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) { 98 DCHECK_EQ(fd, pipe_fd_); 99 StartTimer(); 100 } 101 102 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) { 103 NOTREACHED(); 104 } 105 106 void AudioPipeReader::StartTimer() { 107 DCHECK(task_runner_->BelongsToCurrentThread()); 108 started_time_ = base::TimeTicks::Now(); 109 last_capture_position_ = 0; 110 timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs), 111 this, &AudioPipeReader::DoCapture); 112 } 113 114 void AudioPipeReader::DoCapture() { 115 DCHECK(task_runner_->BelongsToCurrentThread()); 116 DCHECK_GT(pipe_fd_, 0); 117 118 // Calculate how much we need read from the pipe. Pulseaudio doesn't control 119 // how much data it writes to the pipe, so we need to pace the stream, so 120 // that we read the exact number of the samples per second we need. 121 base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_; 122 int64 stream_position_bytes = stream_position.InMilliseconds() * 123 kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond; 124 int64 bytes_to_read = stream_position_bytes - last_capture_position_; 125 126 std::string data = left_over_bytes_; 127 size_t pos = data.size(); 128 left_over_bytes_.clear(); 129 data.resize(pos + bytes_to_read); 130 131 while (pos < data.size()) { 132 int read_result = HANDLE_EINTR( 133 read(pipe_fd_, string_as_array(&data) + pos, data.size() - pos)); 134 if (read_result > 0) { 135 pos += read_result; 136 } else { 137 if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN) 138 PLOG(ERROR) << "read"; 139 break; 140 } 141 } 142 143 // Stop reading from the pipe if PulseAudio isn't writing anything. 144 if (pos == 0) { 145 WaitForPipeReadable(); 146 return; 147 } 148 149 // Save any incomplete samples we've read for later. Each packet should 150 // contain integer number of samples. 151 int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample); 152 left_over_bytes_.assign(data, pos - incomplete_samples_bytes, 153 incomplete_samples_bytes); 154 data.resize(pos - incomplete_samples_bytes); 155 156 last_capture_position_ += data.size(); 157 // Normally PulseAudio will keep pipe buffer full, so we should always be able 158 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make 159 // sure that |stream_position_bytes| doesn't go out of sync with the current 160 // stream position. 161 if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes) 162 last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes; 163 DCHECK_LE(last_capture_position_, stream_position_bytes); 164 165 // Dispatch asynchronous notification to the stream observers. 166 scoped_refptr<base::RefCountedString> data_ref = 167 base::RefCountedString::TakeString(&data); 168 observers_->Notify(&StreamObserver::OnDataRead, data_ref); 169 } 170 171 void AudioPipeReader::WaitForPipeReadable() { 172 timer_.Stop(); 173 base::MessageLoopForIO::current()->WatchFileDescriptor( 174 pipe_fd_, 175 false, 176 base::MessageLoopForIO::WATCH_READ, 177 &file_descriptor_watcher_, 178 this); 179 } 180 181 // static 182 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) { 183 audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader); 184 } 185 186 } // namespace remoting 187