1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "chre_host/socket_client.h" 18 19 #include <inttypes.h> 20 21 #include <string.h> 22 #include <unistd.h> 23 24 #include <chrono> 25 26 #include <cutils/sockets.h> 27 #include <sys/socket.h> 28 #include <utils/RefBase.h> 29 #include <utils/StrongPointer.h> 30 31 #include "chre_host/log.h" 32 33 namespace android { 34 namespace chre { 35 36 SocketClient::SocketClient() { 37 std::atomic_init(&mSockFd, INVALID_SOCKET); 38 } 39 40 SocketClient::~SocketClient() { 41 disconnect(); 42 } 43 44 bool SocketClient::connect(const char *socketName, 45 const sp<ICallbacks>& callbacks) { 46 return doConnect(socketName, callbacks, false /* connectInBackground */); 47 } 48 49 bool SocketClient::connectInBackground(const char *socketName, 50 const sp<ICallbacks>& callbacks) { 51 return doConnect(socketName, callbacks, true /* connectInBackground */); 52 } 53 54 void SocketClient::disconnect() { 55 if (inReceiveThread()) { 56 LOGE("disconnect() can't be called from a receive thread callback"); 57 } else if (receiveThreadRunning()) { 58 // Inform the RX thread that we're requesting a shutdown, breaking it out of 59 // the retry wait if it's currently blocked there 60 { 61 std::lock_guard<std::mutex> lock(mShutdownMutex); 62 mGracefulShutdown = true; 63 } 64 mShutdownCond.notify_all(); 65 66 // Invalidate the socket (will kick the RX thread out of recv if it's 67 // currently blocked there) 68 if (mSockFd != INVALID_SOCKET && shutdown(mSockFd, SHUT_RDWR) != 0) { 69 LOG_ERROR("Couldn't shut down socket", errno); 70 } 71 72 if (mRxThread.joinable()) { 73 LOGD("Waiting for RX thread to exit"); 74 mRxThread.join(); 75 } 76 } 77 } 78 79 bool SocketClient::isConnected() const { 80 return (mSockFd != INVALID_SOCKET); 81 } 82 83 bool SocketClient::sendMessage(const void *data, size_t length) { 84 bool success = false; 85 86 if (mSockFd == INVALID_SOCKET) { 87 LOGW("Tried sending a message, but don't have a valid socket handle"); 88 } else { 89 ssize_t bytesSent = send(mSockFd, data, length, 0); 90 if (bytesSent < 0) { 91 LOGE("Failed to send %zu bytes of data: %s", length, strerror(errno)); 92 } else if (bytesSent == 0) { 93 LOGW("Failed to send data; remote side disconnected"); 94 } else if (static_cast<size_t>(bytesSent) != length) { 95 LOGW("Truncated packet, tried sending %zu bytes, only %zd went through", 96 length, bytesSent); 97 } else { 98 success = true; 99 } 100 } 101 102 return success; 103 } 104 105 bool SocketClient::doConnect(const char *socketName, 106 const sp<ICallbacks>& callbacks, 107 bool connectInBackground) { 108 bool success = false; 109 if (inReceiveThread()) { 110 LOGE("Can't attempt to connect from a receive thread callback"); 111 } else { 112 if (receiveThreadRunning()) { 113 LOGW("Re-connecting socket with implicit disconnect"); 114 disconnect(); 115 } 116 117 size_t socketNameLen = strlcpy(mSocketName, socketName, 118 sizeof(mSocketName)); 119 if (socketNameLen >= sizeof(mSocketName)) { 120 LOGE("Socket name length parameter is too long (%zu, max %zu)", 121 socketNameLen, sizeof(mSocketName)); 122 } else if (callbacks == nullptr) { 123 LOGE("Callbacks parameter must be provided"); 124 } else if (connectInBackground || tryConnect()) { 125 mGracefulShutdown = false; 126 mCallbacks = callbacks; 127 mRxThread = std::thread([this]() { 128 receiveThread(); 129 }); 130 success = true; 131 } 132 } 133 134 return success; 135 } 136 137 bool SocketClient::inReceiveThread() const { 138 return (std::this_thread::get_id() == mRxThread.get_id()); 139 } 140 141 void SocketClient::receiveThread() { 142 constexpr size_t kReceiveBufferSize = 4096; 143 uint8_t buffer[kReceiveBufferSize]; 144 145 LOGV("Receive thread started"); 146 while (!mGracefulShutdown && (mSockFd != INVALID_SOCKET || reconnect())) { 147 while (!mGracefulShutdown) { 148 ssize_t bytesReceived = recv(mSockFd, buffer, sizeof(buffer), 0); 149 if (bytesReceived < 0) { 150 LOG_ERROR("Exiting RX thread", errno); 151 break; 152 } else if (bytesReceived == 0) { 153 if (!mGracefulShutdown) { 154 LOGI("Socket disconnected on remote end"); 155 mCallbacks->onDisconnected(); 156 } 157 break; 158 } 159 160 mCallbacks->onMessageReceived(buffer, bytesReceived); 161 } 162 163 if (close(mSockFd) != 0) { 164 LOG_ERROR("Couldn't close socket", errno); 165 } 166 mSockFd = INVALID_SOCKET; 167 } 168 169 if (!mGracefulShutdown) { 170 mCallbacks->onConnectionAborted(); 171 } 172 173 mCallbacks.clear(); 174 LOGV("Exiting receive thread"); 175 } 176 177 bool SocketClient::receiveThreadRunning() const { 178 return mRxThread.joinable(); 179 } 180 181 bool SocketClient::reconnect() { 182 constexpr auto kMinDelay = std::chrono::duration<int32_t, std::milli>(250); 183 constexpr auto kMaxDelay = std::chrono::minutes(5); 184 // Try reconnecting at initial delay this many times before backing off 185 constexpr unsigned int kExponentialBackoffDelay = 186 std::chrono::seconds(10) / kMinDelay; 187 // Give up after this many tries (~2.5 hours) 188 constexpr unsigned int kRetryLimit = kExponentialBackoffDelay + 40; 189 auto delay = kMinDelay; 190 unsigned int retryCount = 0; 191 192 while (retryCount++ < kRetryLimit) { 193 { 194 std::unique_lock<std::mutex> lock(mShutdownMutex); 195 mShutdownCond.wait_for(lock, delay, 196 [this]() { return mGracefulShutdown.load(); }); 197 if (mGracefulShutdown) { 198 break; 199 } 200 } 201 202 bool suppressErrorLogs = (delay == kMinDelay); 203 if (!tryConnect(suppressErrorLogs)) { 204 if (!suppressErrorLogs) { 205 LOGW("Failed to (re)connect, next try in %" PRId32 " ms", 206 delay.count()); 207 } 208 if (retryCount > kExponentialBackoffDelay) { 209 delay *= 2; 210 } 211 if (delay > kMaxDelay) { 212 delay = kMaxDelay; 213 } 214 } else { 215 LOGD("Successfully (re)connected"); 216 mCallbacks->onConnected(); 217 return true; 218 } 219 } 220 221 return false; 222 } 223 224 bool SocketClient::tryConnect(bool suppressErrorLogs) { 225 bool success = false; 226 227 errno = 0; 228 int sockFd = socket(AF_LOCAL, SOCK_SEQPACKET, 0); 229 if (sockFd >= 0) { 230 // Set the send buffer size to 2MB to allow plenty of room for nanoapp 231 // loading 232 int sndbuf = 2 * 1024 * 1024; 233 int ret = setsockopt( 234 sockFd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); 235 if (ret == 0) { 236 mSockFd = socket_local_client_connect( 237 sockFd, mSocketName, ANDROID_SOCKET_NAMESPACE_RESERVED, 238 SOCK_SEQPACKET); 239 if (mSockFd != INVALID_SOCKET) { 240 success = true; 241 } else if (!suppressErrorLogs) { 242 LOGE("Couldn't connect client socket to '%s': %s", 243 mSocketName, strerror(errno)); 244 } 245 } else if (!suppressErrorLogs) { 246 LOGE("Failed to set SO_SNDBUF to %d: %s", sndbuf, strerror(errno)); 247 } 248 249 if (!success) { 250 close(sockFd); 251 } 252 } else if (!suppressErrorLogs) { 253 LOGE("Couldn't create local socket: %s", strerror(errno)); 254 } 255 256 return success; 257 } 258 259 } // namespace chre 260 } // namespace android 261