1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not 5 * use this file except in compliance with the License. You may obtain a copy of 6 * the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations under 14 * the License. 15 */ 16 17 #include <sys/types.h> 18 #include <sys/wait.h> 19 #include <unistd.h> 20 #include <errno.h> 21 22 #include <utils/StrongPointer.h> 23 24 #include "Log.h" 25 #include "audio/Buffer.h" 26 #include "StringUtil.h" 27 #include "SimpleScriptExec.h" 28 #include "SignalProcessingImpl.h" 29 #include "task/TaskCase.h" 30 31 enum ToPythonCommandType { 32 EHeader = 0x0, 33 ETerminate = 0x1, 34 EFunctionName = 0x2, 35 EAudioMono = 0x4, 36 EAudioStereo = 0x5, 37 EValue64Int = 0x8, 38 EValueDouble = 0x9, 39 EExecutionResult = 0x10 40 }; 41 42 const android::String8 \ 43 SignalProcessingImpl::MAIN_PROCESSING_SCRIPT("test_description/processing_main.py"); 44 45 SignalProcessingImpl::SignalProcessingImpl() 46 : mChildRunning(false), 47 mBuffer(1024) 48 { 49 50 } 51 52 SignalProcessingImpl::~SignalProcessingImpl() 53 { 54 if (mSocket.get() != NULL) { 55 int terminationCommand [] = {ETerminate, 0}; 56 send((char*)terminationCommand, sizeof(terminationCommand)); 57 mSocket->release(); 58 } 59 if (mChildRunning) { 60 waitpid(mChildPid, NULL, 0); 61 } 62 } 63 64 #define CHILD_LOGE(x...) do { fprintf(stderr, x); \ 65 fprintf(stderr, " %s - %d\n", __FILE__, __LINE__); } while(0) 66 67 const int CHILD_WAIT_TIME_US = 100000; 68 69 bool SignalProcessingImpl::init(const android::String8& script) 70 { 71 pid_t pid; 72 if ((pid = fork()) < 0) { 73 LOGE("SignalProcessingImpl::init fork failed %d", errno); 74 return false; 75 } else if (pid == 0) { // child 76 if (execl(SimpleScriptExec::PYTHON_PATH, SimpleScriptExec::PYTHON_PATH, 77 script.string(), NULL) < 0) { 78 CHILD_LOGE("execl %s %s failed %d", SimpleScriptExec::PYTHON_PATH, 79 script.string(), errno); 80 exit(EXIT_FAILURE); 81 } 82 } else { // parent 83 mChildPid = pid; 84 mChildRunning = true; 85 int result = false; 86 int retryCount = 0; 87 // not that clean, but it takes some time for python side to have socket ready 88 const int MAX_RETRY = 20; 89 while (retryCount < MAX_RETRY) { 90 usleep(CHILD_WAIT_TIME_US); 91 mSocket.reset(new ClientSocket()); 92 if (mSocket.get() == NULL) { 93 result = false; 94 break; 95 } 96 if (mSocket->init("127.0.0.1", SCRIPT_PORT, true)) { 97 result = true; 98 break; 99 } 100 retryCount++; 101 } 102 if (!result) { 103 LOGE("cannot connect to child"); 104 mSocket.reset(NULL); 105 return result; 106 } 107 } 108 return true; 109 } 110 111 112 TaskGeneric::ExecutionResult SignalProcessingImpl::run( const android::String8& functionScript, 113 int nInputs, bool* inputTypes, void** inputs, 114 int nOutputs, bool* outputTypes, void** outputs) 115 { 116 mBuffer.reset(); 117 mBuffer.write <int32_t>((int32_t)EHeader); 118 mBuffer.write<int32_t>(nInputs + 1); 119 mBuffer.write<int32_t>((int32_t)EFunctionName); 120 mBuffer.write<int32_t>((int32_t)functionScript.length()); 121 mBuffer.writeStr(functionScript); 122 if (!send(mBuffer.getBuffer(), mBuffer.getSizeWritten())) { 123 LOGE("send failed"); 124 return TaskGeneric::EResultError; 125 } 126 for (int i = 0; i < nInputs; i++) { 127 mBuffer.reset(); 128 if (inputTypes[i]) { // android::sp<Buffer>* 129 android::sp<Buffer>* buffer = reinterpret_cast<android::sp<Buffer>*>(inputs[i]); 130 mBuffer.write<int32_t>((int32_t)((*buffer)->isStereo() ? EAudioStereo : EAudioMono)); 131 int dataLen = (*buffer)->getSize(); 132 mBuffer.write<int32_t>(dataLen); 133 if (!send(mBuffer.getBuffer(), mBuffer.getSizeWritten())) { 134 LOGE("send failed"); 135 return TaskGeneric::EResultError; 136 } 137 if (!send((*buffer)->getData(), dataLen)) { 138 LOGE("send failed"); 139 return TaskGeneric::EResultError; 140 } 141 LOGD("%d-th param buffer %d, stereo:%d", i, dataLen, (*buffer)->isStereo()); 142 } else { //TaskCase::Value* 143 TaskCase::Value* val = reinterpret_cast<TaskCase::Value*>(inputs[i]); 144 bool isI64 = (val->getType() == TaskCase::Value::ETypeI64); 145 mBuffer.write<int32_t>((int32_t)(isI64 ? EValue64Int : EValueDouble)); 146 if (isI64) { 147 mBuffer.write<int64_t>(val->getInt64()); 148 } else { 149 mBuffer.write<double>(val->getDouble()); 150 } 151 if (!send(mBuffer.getBuffer(), mBuffer.getSizeWritten())) { 152 LOGE("send failed"); 153 return TaskGeneric::EResultError; 154 } 155 LOGD("%d-th param Value", i); 156 } 157 } 158 int32_t header[4]; // id 0 - no of types - id 0x10 - ExecutionResult 159 if (!read((char*)header, sizeof(header))) { 160 LOGE("read failed"); 161 return TaskGeneric::EResultError; 162 } 163 if (header[0] != 0) { 164 LOGE("wrong data"); 165 return TaskGeneric::EResultError; 166 } 167 if (header[2] != EExecutionResult) { 168 LOGE("wrong data"); 169 return TaskGeneric::EResultError; 170 } 171 if (header[3] == TaskGeneric::EResultError) { 172 LOGE("script returned error %d", header[3]); 173 return (TaskGeneric::ExecutionResult)header[3]; 174 } 175 if ((header[1] - 1) != nOutputs) { 176 LOGE("wrong data"); 177 return TaskGeneric::EResultError; 178 } 179 for (int i = 0; i < nOutputs; i++) { 180 int32_t type; 181 if (!read((char*)&type, sizeof(type))) { 182 LOGE("read failed"); 183 return TaskGeneric::EResultError; 184 } 185 if (outputTypes[i]) { // android::sp<Buffer>* 186 int32_t dataLen; 187 if (!read((char*)&dataLen, sizeof(dataLen))) { 188 LOGE("read failed"); 189 return TaskGeneric::EResultError; 190 } 191 android::sp<Buffer>* buffer = reinterpret_cast<android::sp<Buffer>*>(outputs[i]); 192 if (buffer->get() == NULL) { // data not allocated, this can happen for unknown-length output 193 *buffer = new Buffer(dataLen, dataLen, (type == EAudioStereo) ? true: false); 194 if (buffer->get() == NULL) { 195 LOGE("alloc failed"); 196 return TaskGeneric::EResultError; 197 } 198 } 199 bool isStereo = (*buffer)->isStereo(); 200 201 if (((type == EAudioStereo) && isStereo) || ((type == EAudioMono) && !isStereo)) { 202 // valid 203 } else { 204 LOGE("%d-th output wrong type %d stereo: %d", i, type, isStereo); 205 return TaskGeneric::EResultError; 206 } 207 208 if (dataLen > (int)(*buffer)->getSize()) { 209 LOGE("%d-th output data too long %d while buffer size %d", i, dataLen, 210 (*buffer)->getSize()); 211 return TaskGeneric::EResultError; 212 } 213 if (!read((*buffer)->getData(), dataLen)) { 214 LOGE("read failed"); 215 return TaskGeneric::EResultError; 216 } 217 LOGD("received buffer %x %x", ((*buffer)->getData())[0], ((*buffer)->getData())[1]); 218 (*buffer)->setHandled(dataLen); 219 (*buffer)->setSize(dataLen); 220 } else { //TaskCase::Value* 221 TaskCase::Value* val = reinterpret_cast<TaskCase::Value*>(outputs[i]); 222 if ((type == EValue64Int) || (type == EValueDouble)) { 223 if (!read((char*)val->getPtr(), sizeof(int64_t))) { 224 LOGE("read failed"); 225 return TaskGeneric::EResultError; 226 } 227 if (type == EValue64Int) { 228 val->setType(TaskCase::Value::ETypeI64); 229 } else { 230 val->setType(TaskCase::Value::ETypeDouble); 231 } 232 } else { 233 LOGE("wrong type %d", type); 234 return TaskGeneric::EResultError; 235 } 236 } 237 } 238 return (TaskGeneric::ExecutionResult)header[3]; 239 } 240 241 bool SignalProcessingImpl::send(const char* data, int len) 242 { 243 //LOGD("send %d", len); 244 return mSocket->sendData(data, len); 245 } 246 247 bool SignalProcessingImpl::read(char* data, int len) 248 { 249 const int READ_TIMEOUT_MS = 60000 * 2; // as some calculation like calc_delay takes almost 20 secs 250 //LOGD("read %d", len); 251 return mSocket->readData(data, len, READ_TIMEOUT_MS); 252 } 253 254