1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "remoting/host/linux/audio_pipe_reader.h" 6 7 #include <fcntl.h> 8 #include <sys/stat.h> 9 #include <sys/types.h> 10 #include <unistd.h> 11 12 #include "base/logging.h" 13 #include "base/posix/eintr_wrapper.h" 14 #include "base/stl_util.h" 15 16 namespace remoting { 17 18 namespace { 19 20 const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate * 21 AudioPipeReader::kChannels * 22 AudioPipeReader::kBytesPerSample; 23 24 // Read data from the pipe every 40ms. 25 const int kCapturingPeriodMs = 40; 26 27 // Size of the pipe buffer in milliseconds. 28 const int kPipeBufferSizeMs = kCapturingPeriodMs * 2; 29 30 // Size of the pipe buffer in bytes. 31 const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond / 32 base::Time::kMillisecondsPerSecond; 33 34 #if !defined(F_SETPIPE_SZ) 35 // F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able 36 // to compile this code on machines with older kernel. 37 #define F_SETPIPE_SZ 1031 38 #endif // defined(F_SETPIPE_SZ) 39 40 } // namespace 41 42 // static 43 scoped_refptr<AudioPipeReader> AudioPipeReader::Create( 44 scoped_refptr<base::SingleThreadTaskRunner> task_runner, 45 const base::FilePath& pipe_path) { 46 // Create a reference to the new AudioPipeReader before posting the 47 // StartOnAudioThread task, otherwise it may be deleted on the audio 48 // thread before we return. 49 scoped_refptr<AudioPipeReader> pipe_reader = 50 new AudioPipeReader(task_runner, pipe_path); 51 task_runner->PostTask( 52 FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader)); 53 return pipe_reader; 54 } 55 56 AudioPipeReader::AudioPipeReader( 57 scoped_refptr<base::SingleThreadTaskRunner> task_runner, 58 const base::FilePath& pipe_path) 59 : task_runner_(task_runner), 60 pipe_path_(pipe_path), 61 observers_(new ObserverListThreadSafe<StreamObserver>()) { 62 } 63 64 AudioPipeReader::~AudioPipeReader() {} 65 66 void AudioPipeReader::AddObserver(StreamObserver* observer) { 67 observers_->AddObserver(observer); 68 } 69 void AudioPipeReader::RemoveObserver(StreamObserver* observer) { 70 observers_->RemoveObserver(observer); 71 } 72 73 void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) { 74 DCHECK_EQ(fd, pipe_.GetPlatformFile()); 75 StartTimer(); 76 } 77 78 void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) { 79 NOTREACHED(); 80 } 81 82 void AudioPipeReader::StartOnAudioThread() { 83 DCHECK(task_runner_->BelongsToCurrentThread()); 84 85 if (!file_watcher_.Watch(pipe_path_.DirName(), true, 86 base::Bind(&AudioPipeReader::OnDirectoryChanged, 87 base::Unretained(this)))) { 88 LOG(ERROR) << "Failed to watch pulseaudio directory " 89 << pipe_path_.DirName().value(); 90 } 91 92 TryOpenPipe(); 93 } 94 95 void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path, 96 bool error) { 97 DCHECK(task_runner_->BelongsToCurrentThread()); 98 99 if (error) { 100 LOG(ERROR) << "File watcher returned an error."; 101 return; 102 } 103 104 TryOpenPipe(); 105 } 106 107 void AudioPipeReader::TryOpenPipe() { 108 DCHECK(task_runner_->BelongsToCurrentThread()); 109 110 base::File new_pipe; 111 new_pipe.Initialize( 112 pipe_path_, 113 base::File::FLAG_OPEN | base::File::FLAG_READ | base::File::FLAG_ASYNC); 114 115 // If both |pipe_| and |new_pipe| are valid then compare inodes for the two 116 // file descriptors. Don't need to do anything if inode hasn't changed. 117 if (new_pipe.IsValid() && pipe_.IsValid()) { 118 struct stat old_stat; 119 struct stat new_stat; 120 if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 && 121 fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 && 122 old_stat.st_ino == new_stat.st_ino) { 123 return; 124 } 125 } 126 127 file_descriptor_watcher_.StopWatchingFileDescriptor(); 128 timer_.Stop(); 129 130 pipe_ = new_pipe.Pass(); 131 132 if (pipe_.IsValid()) { 133 // Set O_NONBLOCK flag. 134 if (HANDLE_EINTR(fcntl(pipe_.GetPlatformFile(), F_SETFL, O_NONBLOCK)) < 0) { 135 PLOG(ERROR) << "fcntl"; 136 pipe_.Close(); 137 return; 138 } 139 140 // Set buffer size for the pipe. 141 if (HANDLE_EINTR(fcntl( 142 pipe_.GetPlatformFile(), F_SETPIPE_SZ, kPipeBufferSizeBytes)) < 0) { 143 PLOG(ERROR) << "fcntl"; 144 } 145 146 WaitForPipeReadable(); 147 } 148 } 149 150 void AudioPipeReader::StartTimer() { 151 DCHECK(task_runner_->BelongsToCurrentThread()); 152 started_time_ = base::TimeTicks::Now(); 153 last_capture_position_ = 0; 154 timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs), 155 this, &AudioPipeReader::DoCapture); 156 } 157 158 void AudioPipeReader::DoCapture() { 159 DCHECK(task_runner_->BelongsToCurrentThread()); 160 DCHECK(pipe_.IsValid()); 161 162 // Calculate how much we need read from the pipe. Pulseaudio doesn't control 163 // how much data it writes to the pipe, so we need to pace the stream. 164 base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_; 165 int64 stream_position_bytes = stream_position.InMilliseconds() * 166 kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond; 167 int64 bytes_to_read = stream_position_bytes - last_capture_position_; 168 169 std::string data = left_over_bytes_; 170 size_t pos = data.size(); 171 left_over_bytes_.clear(); 172 data.resize(pos + bytes_to_read); 173 174 while (pos < data.size()) { 175 int read_result = 176 pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos); 177 if (read_result > 0) { 178 pos += read_result; 179 } else { 180 if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN) 181 PLOG(ERROR) << "read"; 182 break; 183 } 184 } 185 186 // Stop reading from the pipe if PulseAudio isn't writing anything. 187 if (pos == 0) { 188 WaitForPipeReadable(); 189 return; 190 } 191 192 // Save any incomplete samples we've read for later. Each packet should 193 // contain integer number of samples. 194 int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample); 195 left_over_bytes_.assign(data, pos - incomplete_samples_bytes, 196 incomplete_samples_bytes); 197 data.resize(pos - incomplete_samples_bytes); 198 199 last_capture_position_ += data.size(); 200 // Normally PulseAudio will keep pipe buffer full, so we should always be able 201 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make 202 // sure that |stream_position_bytes| doesn't go out of sync with the current 203 // stream position. 204 if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes) 205 last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes; 206 DCHECK_LE(last_capture_position_, stream_position_bytes); 207 208 // Dispatch asynchronous notification to the stream observers. 209 scoped_refptr<base::RefCountedString> data_ref = 210 base::RefCountedString::TakeString(&data); 211 observers_->Notify(&StreamObserver::OnDataRead, data_ref); 212 } 213 214 void AudioPipeReader::WaitForPipeReadable() { 215 timer_.Stop(); 216 base::MessageLoopForIO::current()->WatchFileDescriptor( 217 pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ, 218 &file_descriptor_watcher_, this); 219 } 220 221 // static 222 void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) { 223 audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader); 224 } 225 226 } // namespace remoting 227