Home | History | Annotate | Download | only in shell
      1 /*
      2  * Copyright (C) 2018 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 #define DEBUG false  // STOPSHIP if true
     17 #include "Log.h"
     18 
     19 #include "ShellSubscriber.h"
     20 
     21 #include <android-base/file.h>
     22 #include "matchers/matcher_util.h"
     23 #include "stats_log_util.h"
     24 
     25 using android::util::ProtoOutputStream;
     26 
     27 namespace android {
     28 namespace os {
     29 namespace statsd {
     30 
     31 const static int FIELD_ID_ATOM = 1;
     32 
     33 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
     34                                            int timeoutSec) {
     35     VLOG("start new shell subscription");
     36     {
     37         std::lock_guard<std::mutex> lock(mMutex);
     38         if (mResultReceiver != nullptr) {
     39             VLOG("Only one shell subscriber is allowed.");
     40             return;
     41         }
     42         mInput = in;
     43         mOutput = out;
     44         mResultReceiver = resultReceiver;
     45         IInterface::asBinder(mResultReceiver)->linkToDeath(this);
     46     }
     47 
     48     // Note that the following is blocking, and it's intended as we cannot return until the shell
     49     // cmd exits, otherwise all resources & FDs will be automatically closed.
     50 
     51     // Read config forever until EOF is reached. Clients may send multiple configs -- each new
     52     // config replace the previous one.
     53     readConfig(in);
     54     VLOG("timeout : %d", timeoutSec);
     55 
     56     // Now we have read an EOF we now wait for the semaphore until the client exits.
     57     VLOG("Now wait for client to exit");
     58     std::unique_lock<std::mutex> lk(mMutex);
     59 
     60     if (timeoutSec > 0) {
     61         mShellDied.wait_for(lk, timeoutSec * 1s,
     62                             [this, resultReceiver] { return mResultReceiver != resultReceiver; });
     63     } else {
     64         mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
     65     }
     66 }
     67 
     68 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
     69     std::lock_guard<std::mutex> lock(mMutex);
     70     mPushedMatchers.clear();
     71     mPulledInfo.clear();
     72 
     73     for (const auto& pushed : config.pushed()) {
     74         mPushedMatchers.push_back(pushed);
     75         VLOG("adding matcher for atom %d", pushed.atom_id());
     76     }
     77 
     78     int64_t token = getElapsedRealtimeNs();
     79     mPullToken = token;
     80 
     81     int64_t minInterval = -1;
     82     for (const auto& pulled : config.pulled()) {
     83         // All intervals need to be multiples of the min interval.
     84         if (minInterval < 0 || pulled.freq_millis() < minInterval) {
     85             minInterval = pulled.freq_millis();
     86         }
     87 
     88         mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
     89         VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
     90     }
     91 
     92     if (mPulledInfo.size() > 0 && minInterval > 0) {
     93         // This thread is guaranteed to terminate after it detects the token is different or
     94         // cleaned up.
     95         std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
     96         puller.detach();
     97     }
     98 }
     99 
    100 void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
    101                                           const SimpleAtomMatcher& matcher) {
    102     if (mOutput == 0) return;
    103     int count = 0;
    104     mProto.clear();
    105     for (const auto& event : data) {
    106         VLOG("%s", event->ToString().c_str());
    107         if (matchesSimple(*mUidMap, matcher, *event)) {
    108             VLOG("matched");
    109             count++;
    110             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
    111                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
    112             event->ToProto(mProto);
    113             mProto.end(atomToken);
    114         }
    115     }
    116 
    117     if (count > 0) {
    118         // First write the payload size.
    119         size_t bufferSize = mProto.size();
    120         write(mOutput, &bufferSize, sizeof(bufferSize));
    121         VLOG("%d atoms, proto size: %zu", count, bufferSize);
    122         // Then write the payload.
    123         mProto.flush(mOutput);
    124     }
    125     mProto.clear();
    126 }
    127 
    128 void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
    129     while (1) {
    130         int64_t nowMillis = getElapsedRealtimeMillis();
    131         {
    132             std::lock_guard<std::mutex> lock(mMutex);
    133             if (mPulledInfo.size() == 0 || mPullToken != token) {
    134                 VLOG("Pulling thread %lld done!", (long long)token);
    135                 return;
    136             }
    137             for (auto& pullInfo : mPulledInfo) {
    138                 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
    139                     VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
    140 
    141                     vector<std::shared_ptr<LogEvent>> data;
    142                     mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
    143                     VLOG("pulled %zu atoms", data.size());
    144                     if (data.size() > 0) {
    145                         writeToOutputLocked(data, pullInfo.mPullerMatcher);
    146                     }
    147                     pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
    148                 }
    149             }
    150         }
    151         VLOG("Pulling thread %lld sleep....", (long long)token);
    152         std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
    153     }
    154 }
    155 
    156 void ShellSubscriber::readConfig(int in) {
    157     if (in <= 0) {
    158         return;
    159     }
    160 
    161     while (1) {
    162         size_t bufferSize = 0;
    163         int result = 0;
    164         if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
    165             VLOG("Done reading");
    166             break;
    167         } else if (result < 0 || result != sizeof(bufferSize)) {
    168             ALOGE("Error reading config size");
    169             break;
    170         }
    171 
    172         vector<uint8_t> buffer(bufferSize);
    173         if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
    174             ShellSubscription config;
    175             if (config.ParseFromArray(buffer.data(), bufferSize)) {
    176                 updateConfig(config);
    177             } else {
    178                 ALOGE("error parsing the config");
    179                 break;
    180             }
    181         } else {
    182             VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
    183             break;
    184         }
    185     }
    186 }
    187 
    188 void ShellSubscriber::cleanUpLocked() {
    189     // The file descriptors will be closed by binder.
    190     mInput = 0;
    191     mOutput = 0;
    192     mResultReceiver = nullptr;
    193     mPushedMatchers.clear();
    194     mPulledInfo.clear();
    195     mPullToken = 0;
    196     VLOG("done clean up");
    197 }
    198 
    199 void ShellSubscriber::onLogEvent(const LogEvent& event) {
    200     std::lock_guard<std::mutex> lock(mMutex);
    201 
    202     if (mOutput <= 0) {
    203         return;
    204     }
    205     for (const auto& matcher : mPushedMatchers) {
    206         if (matchesSimple(*mUidMap, matcher, event)) {
    207             VLOG("%s", event.ToString().c_str());
    208             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
    209                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
    210             event.ToProto(mProto);
    211             mProto.end(atomToken);
    212             // First write the payload size.
    213             size_t bufferSize = mProto.size();
    214             write(mOutput, &bufferSize, sizeof(bufferSize));
    215 
    216             // Then write the payload.
    217             mProto.flush(mOutput);
    218             mProto.clear();
    219             break;
    220         }
    221     }
    222 }
    223 
    224 void ShellSubscriber::binderDied(const wp<IBinder>& who) {
    225     {
    226         VLOG("Shell exits");
    227         std::lock_guard<std::mutex> lock(mMutex);
    228         cleanUpLocked();
    229     }
    230     mShellDied.notify_all();
    231 }
    232 
    233 }  // namespace statsd
    234 }  // namespace os
    235 }  // namespace android
    236