1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #define DEBUG false // STOPSHIP if true 17 #include "Log.h" 18 19 #include "ShellSubscriber.h" 20 21 #include <android-base/file.h> 22 #include "matchers/matcher_util.h" 23 #include "stats_log_util.h" 24 25 using android::util::ProtoOutputStream; 26 27 namespace android { 28 namespace os { 29 namespace statsd { 30 31 const static int FIELD_ID_ATOM = 1; 32 33 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver, 34 int timeoutSec) { 35 VLOG("start new shell subscription"); 36 { 37 std::lock_guard<std::mutex> lock(mMutex); 38 if (mResultReceiver != nullptr) { 39 VLOG("Only one shell subscriber is allowed."); 40 return; 41 } 42 mInput = in; 43 mOutput = out; 44 mResultReceiver = resultReceiver; 45 IInterface::asBinder(mResultReceiver)->linkToDeath(this); 46 } 47 48 // Note that the following is blocking, and it's intended as we cannot return until the shell 49 // cmd exits, otherwise all resources & FDs will be automatically closed. 50 51 // Read config forever until EOF is reached. Clients may send multiple configs -- each new 52 // config replace the previous one. 53 readConfig(in); 54 VLOG("timeout : %d", timeoutSec); 55 56 // Now we have read an EOF we now wait for the semaphore until the client exits. 57 VLOG("Now wait for client to exit"); 58 std::unique_lock<std::mutex> lk(mMutex); 59 60 if (timeoutSec > 0) { 61 mShellDied.wait_for(lk, timeoutSec * 1s, 62 [this, resultReceiver] { return mResultReceiver != resultReceiver; }); 63 } else { 64 mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; }); 65 } 66 } 67 68 void ShellSubscriber::updateConfig(const ShellSubscription& config) { 69 std::lock_guard<std::mutex> lock(mMutex); 70 mPushedMatchers.clear(); 71 mPulledInfo.clear(); 72 73 for (const auto& pushed : config.pushed()) { 74 mPushedMatchers.push_back(pushed); 75 VLOG("adding matcher for atom %d", pushed.atom_id()); 76 } 77 78 int64_t token = getElapsedRealtimeNs(); 79 mPullToken = token; 80 81 int64_t minInterval = -1; 82 for (const auto& pulled : config.pulled()) { 83 // All intervals need to be multiples of the min interval. 84 if (minInterval < 0 || pulled.freq_millis() < minInterval) { 85 minInterval = pulled.freq_millis(); 86 } 87 88 mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); 89 VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); 90 } 91 92 if (mPulledInfo.size() > 0 && minInterval > 0) { 93 // This thread is guaranteed to terminate after it detects the token is different or 94 // cleaned up. 95 std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); 96 puller.detach(); 97 } 98 } 99 100 void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data, 101 const SimpleAtomMatcher& matcher) { 102 if (mOutput == 0) return; 103 int count = 0; 104 mProto.clear(); 105 for (const auto& event : data) { 106 VLOG("%s", event->ToString().c_str()); 107 if (matchesSimple(*mUidMap, matcher, *event)) { 108 VLOG("matched"); 109 count++; 110 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | 111 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); 112 event->ToProto(mProto); 113 mProto.end(atomToken); 114 } 115 } 116 117 if (count > 0) { 118 // First write the payload size. 119 size_t bufferSize = mProto.size(); 120 write(mOutput, &bufferSize, sizeof(bufferSize)); 121 VLOG("%d atoms, proto size: %zu", count, bufferSize); 122 // Then write the payload. 123 mProto.flush(mOutput); 124 } 125 mProto.clear(); 126 } 127 128 void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { 129 while (1) { 130 int64_t nowMillis = getElapsedRealtimeMillis(); 131 { 132 std::lock_guard<std::mutex> lock(mMutex); 133 if (mPulledInfo.size() == 0 || mPullToken != token) { 134 VLOG("Pulling thread %lld done!", (long long)token); 135 return; 136 } 137 for (auto& pullInfo : mPulledInfo) { 138 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { 139 VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); 140 141 vector<std::shared_ptr<LogEvent>> data; 142 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); 143 VLOG("pulled %zu atoms", data.size()); 144 if (data.size() > 0) { 145 writeToOutputLocked(data, pullInfo.mPullerMatcher); 146 } 147 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; 148 } 149 } 150 } 151 VLOG("Pulling thread %lld sleep....", (long long)token); 152 std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); 153 } 154 } 155 156 void ShellSubscriber::readConfig(int in) { 157 if (in <= 0) { 158 return; 159 } 160 161 while (1) { 162 size_t bufferSize = 0; 163 int result = 0; 164 if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) { 165 VLOG("Done reading"); 166 break; 167 } else if (result < 0 || result != sizeof(bufferSize)) { 168 ALOGE("Error reading config size"); 169 break; 170 } 171 172 vector<uint8_t> buffer(bufferSize); 173 if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) { 174 ShellSubscription config; 175 if (config.ParseFromArray(buffer.data(), bufferSize)) { 176 updateConfig(config); 177 } else { 178 ALOGE("error parsing the config"); 179 break; 180 } 181 } else { 182 VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize); 183 break; 184 } 185 } 186 } 187 188 void ShellSubscriber::cleanUpLocked() { 189 // The file descriptors will be closed by binder. 190 mInput = 0; 191 mOutput = 0; 192 mResultReceiver = nullptr; 193 mPushedMatchers.clear(); 194 mPulledInfo.clear(); 195 mPullToken = 0; 196 VLOG("done clean up"); 197 } 198 199 void ShellSubscriber::onLogEvent(const LogEvent& event) { 200 std::lock_guard<std::mutex> lock(mMutex); 201 202 if (mOutput <= 0) { 203 return; 204 } 205 for (const auto& matcher : mPushedMatchers) { 206 if (matchesSimple(*mUidMap, matcher, event)) { 207 VLOG("%s", event.ToString().c_str()); 208 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | 209 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); 210 event.ToProto(mProto); 211 mProto.end(atomToken); 212 // First write the payload size. 213 size_t bufferSize = mProto.size(); 214 write(mOutput, &bufferSize, sizeof(bufferSize)); 215 216 // Then write the payload. 217 mProto.flush(mOutput); 218 mProto.clear(); 219 break; 220 } 221 } 222 } 223 224 void ShellSubscriber::binderDied(const wp<IBinder>& who) { 225 { 226 VLOG("Shell exits"); 227 std::lock_guard<std::mutex> lock(mMutex); 228 cleanUpLocked(); 229 } 230 mShellDied.notify_all(); 231 } 232 233 } // namespace statsd 234 } // namespace os 235 } // namespace android 236