1 /* 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/test/channel_transport/udp_socket2_manager_win.h" 12 13 #include <assert.h> 14 #include <stdio.h> 15 16 #include "webrtc/system_wrappers/interface/aligned_malloc.h" 17 #include "webrtc/test/channel_transport/udp_socket2_win.h" 18 19 namespace webrtc { 20 namespace test { 21 22 uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0; 23 bool UdpSocket2ManagerWindows::_wsaInit = false; 24 25 UdpSocket2ManagerWindows::UdpSocket2ManagerWindows() 26 : UdpSocketManager(), 27 _id(-1), 28 _stopped(false), 29 _init(false), 30 _pCrit(CriticalSectionWrapper::CreateCriticalSection()), 31 _ioCompletionHandle(NULL), 32 _numActiveSockets(0), 33 _event(EventWrapper::Create()) 34 { 35 _managerNumber = _numOfActiveManagers++; 36 37 if(_numOfActiveManagers == 1) 38 { 39 WORD wVersionRequested = MAKEWORD(2, 2); 40 WSADATA wsaData; 41 _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0; 42 // TODO (hellner): seems safer to use RAII for this. E.g. what happens 43 // if a UdpSocket2ManagerWindows() created and destroyed 44 // without being initialized. 45 } 46 } 47 48 UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows() 49 { 50 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 51 "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()", 52 _managerNumber); 53 54 if(_init) 55 { 56 _pCrit->Enter(); 57 if(_numActiveSockets) 58 { 59 _pCrit->Leave(); 60 _event->Wait(INFINITE); 61 } 62 else 63 { 64 _pCrit->Leave(); 65 } 66 StopWorkerThreads(); 67 68 for (WorkerList::iterator iter = _workerThreadsList.begin(); 69 iter != _workerThreadsList.end(); ++iter) { 70 delete *iter; 71 } 72 _workerThreadsList.clear(); 73 _ioContextPool.Free(); 74 75 _numOfActiveManagers--; 76 if(_ioCompletionHandle) 77 { 78 CloseHandle(_ioCompletionHandle); 79 } 80 if (_numOfActiveManagers == 0) 81 { 82 if(_wsaInit) 83 { 84 WSACleanup(); 85 } 86 } 87 } 88 if(_pCrit) 89 { 90 delete _pCrit; 91 } 92 if(_event) 93 { 94 delete _event; 95 } 96 } 97 98 bool UdpSocket2ManagerWindows::Init(int32_t id, 99 uint8_t& numOfWorkThreads) { 100 CriticalSectionScoped cs(_pCrit); 101 if ((_id != -1) || (_numOfWorkThreads != 0)) { 102 assert(_id != -1); 103 assert(_numOfWorkThreads != 0); 104 return false; 105 } 106 _id = id; 107 _numOfWorkThreads = numOfWorkThreads; 108 return true; 109 } 110 111 int32_t UdpSocket2ManagerWindows::ChangeUniqueId(const int32_t id) 112 { 113 _id = id; 114 return 0; 115 } 116 117 bool UdpSocket2ManagerWindows::Start() 118 { 119 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 120 "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber); 121 if(!_init) 122 { 123 StartWorkerThreads(); 124 } 125 126 if(!_init) 127 { 128 return false; 129 } 130 _pCrit->Enter(); 131 // Start worker threads. 132 _stopped = false; 133 int32_t error = 0; 134 for (WorkerList::iterator iter = _workerThreadsList.begin(); 135 iter != _workerThreadsList.end() && !error; ++iter) { 136 if(!(*iter)->Start()) 137 error = 1; 138 } 139 if(error) 140 { 141 WEBRTC_TRACE( 142 kTraceError, 143 kTraceTransport, 144 _id, 145 "UdpSocket2ManagerWindows(%d)::Start() error starting worker\ 146 threads", 147 _managerNumber); 148 _pCrit->Leave(); 149 return false; 150 } 151 _pCrit->Leave(); 152 return true; 153 } 154 155 bool UdpSocket2ManagerWindows::StartWorkerThreads() 156 { 157 if(!_init) 158 { 159 _pCrit->Enter(); 160 161 _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 162 0, 0); 163 if(_ioCompletionHandle == NULL) 164 { 165 int32_t error = GetLastError(); 166 WEBRTC_TRACE( 167 kTraceError, 168 kTraceTransport, 169 _id, 170 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()" 171 "_ioCompletioHandle == NULL: error:%d", 172 _managerNumber,error); 173 _pCrit->Leave(); 174 return false; 175 } 176 177 // Create worker threads. 178 uint32_t i = 0; 179 bool error = false; 180 while(i < _numOfWorkThreads && !error) 181 { 182 UdpSocket2WorkerWindows* pWorker = 183 new UdpSocket2WorkerWindows(_ioCompletionHandle); 184 if(pWorker->Init() != 0) 185 { 186 error = true; 187 delete pWorker; 188 break; 189 } 190 _workerThreadsList.push_front(pWorker); 191 i++; 192 } 193 if(error) 194 { 195 WEBRTC_TRACE( 196 kTraceError, 197 kTraceTransport, 198 _id, 199 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " 200 "creating work threads", 201 _managerNumber); 202 // Delete worker threads. 203 for (WorkerList::iterator iter = _workerThreadsList.begin(); 204 iter != _workerThreadsList.end(); ++iter) { 205 delete *iter; 206 } 207 _workerThreadsList.clear(); 208 _pCrit->Leave(); 209 return false; 210 } 211 if(_ioContextPool.Init()) 212 { 213 WEBRTC_TRACE( 214 kTraceError, 215 kTraceTransport, 216 _id, 217 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " 218 "initiating _ioContextPool", 219 _managerNumber); 220 _pCrit->Leave(); 221 return false; 222 } 223 _init = true; 224 WEBRTC_TRACE( 225 kTraceDebug, 226 kTraceTransport, 227 _id, 228 "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work " 229 "threads created and initialized", 230 _numOfWorkThreads); 231 _pCrit->Leave(); 232 } 233 return true; 234 } 235 236 bool UdpSocket2ManagerWindows::Stop() 237 { 238 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 239 "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber); 240 241 if(!_init) 242 { 243 return false; 244 } 245 _pCrit->Enter(); 246 _stopped = true; 247 if(_numActiveSockets) 248 { 249 WEBRTC_TRACE( 250 kTraceError, 251 kTraceTransport, 252 _id, 253 "UdpSocket2ManagerWindows(%d)::Stop() there is still active\ 254 sockets", 255 _managerNumber); 256 _pCrit->Leave(); 257 return false; 258 } 259 // No active sockets. Stop all worker threads. 260 bool result = StopWorkerThreads(); 261 _pCrit->Leave(); 262 return result; 263 } 264 265 bool UdpSocket2ManagerWindows::StopWorkerThreads() 266 { 267 int32_t error = 0; 268 WEBRTC_TRACE( 269 kTraceDebug, 270 kTraceTransport, 271 _id, 272 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\ 273 threadsStoped, numActicve Sockets=%d", 274 _managerNumber, 275 _numActiveSockets); 276 277 // Set worker threads to not alive so that they will stop calling 278 // UdpSocket2WorkerWindows::Run(). 279 for (WorkerList::iterator iter = _workerThreadsList.begin(); 280 iter != _workerThreadsList.end(); ++iter) { 281 (*iter)->SetNotAlive(); 282 } 283 // Release all threads waiting for GetQueuedCompletionStatus(..). 284 if(_ioCompletionHandle) 285 { 286 uint32_t i = 0; 287 for(i = 0; i < _workerThreadsList.size(); i++) 288 { 289 PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL); 290 } 291 } 292 for (WorkerList::iterator iter = _workerThreadsList.begin(); 293 iter != _workerThreadsList.end(); ++iter) { 294 if((*iter)->Stop() == false) 295 { 296 error = -1; 297 WEBRTC_TRACE(kTraceWarning, kTraceTransport, -1, 298 "failed to stop worker thread"); 299 } 300 } 301 302 if(error) 303 { 304 WEBRTC_TRACE( 305 kTraceError, 306 kTraceTransport, 307 _id, 308 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\ 309 worker threads", 310 _managerNumber); 311 return false; 312 } 313 return true; 314 } 315 316 bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s) 317 { 318 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, 319 "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber); 320 if(!_init) 321 { 322 WEBRTC_TRACE( 323 kTraceError, 324 kTraceTransport, 325 _id, 326 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\ 327 initialized", 328 _managerNumber); 329 return false; 330 } 331 _pCrit->Enter(); 332 if(s == NULL) 333 { 334 WEBRTC_TRACE( 335 kTraceError, 336 kTraceTransport, 337 _id, 338 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL", 339 _managerNumber); 340 _pCrit->Leave(); 341 return false; 342 } 343 if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET) 344 { 345 WEBRTC_TRACE( 346 kTraceError, 347 kTraceTransport, 348 _id, 349 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\ 350 %d", 351 _managerNumber, 352 (int32_t)s->GetFd()); 353 _pCrit->Leave(); 354 return false; 355 356 } 357 _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(), 358 _ioCompletionHandle, 359 (ULONG_PTR)(s), 0); 360 if(_ioCompletionHandle == NULL) 361 { 362 int32_t error = GetLastError(); 363 WEBRTC_TRACE( 364 kTraceError, 365 kTraceTransport, 366 _id, 367 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\ 368 completion: %d", 369 _managerNumber, 370 error); 371 _pCrit->Leave(); 372 return false; 373 } 374 _numActiveSockets++; 375 _pCrit->Leave(); 376 return true; 377 } 378 bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s) 379 { 380 if(!_init) 381 { 382 return false; 383 } 384 _pCrit->Enter(); 385 _numActiveSockets--; 386 if(_numActiveSockets == 0) 387 { 388 _event->Set(); 389 } 390 _pCrit->Leave(); 391 return true; 392 } 393 394 PerIoContext* UdpSocket2ManagerWindows::PopIoContext() 395 { 396 if(!_init) 397 { 398 return NULL; 399 } 400 401 PerIoContext* pIoC = NULL; 402 if(!_stopped) 403 { 404 pIoC = _ioContextPool.PopIoContext(); 405 }else 406 { 407 WEBRTC_TRACE( 408 kTraceError, 409 kTraceTransport, 410 _id, 411 "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started", 412 _managerNumber); 413 } 414 return pIoC; 415 } 416 417 int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext) 418 { 419 return _ioContextPool.PushIoContext(pIoContext); 420 } 421 422 IoContextPool::IoContextPool() 423 : _pListHead(NULL), 424 _init(false), 425 _size(0), 426 _inUse(0) 427 { 428 } 429 430 IoContextPool::~IoContextPool() 431 { 432 Free(); 433 assert(_size.Value() == 0); 434 AlignedFree(_pListHead); 435 } 436 437 int32_t IoContextPool::Init(uint32_t /*increaseSize*/) 438 { 439 if(_init) 440 { 441 return 0; 442 } 443 444 _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER), 445 MEMORY_ALLOCATION_ALIGNMENT); 446 if(_pListHead == NULL) 447 { 448 return -1; 449 } 450 InitializeSListHead(_pListHead); 451 _init = true; 452 return 0; 453 } 454 455 PerIoContext* IoContextPool::PopIoContext() 456 { 457 if(!_init) 458 { 459 return NULL; 460 } 461 462 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); 463 if(pListEntry == NULL) 464 { 465 IoContextPoolItem* item = (IoContextPoolItem*) 466 AlignedMalloc( 467 sizeof(IoContextPoolItem), 468 MEMORY_ALLOCATION_ALIGNMENT); 469 if(item == NULL) 470 { 471 return NULL; 472 } 473 memset(&item->payload.ioContext,0,sizeof(PerIoContext)); 474 item->payload.base = item; 475 pListEntry = &(item->itemEntry); 476 ++_size; 477 } 478 ++_inUse; 479 return &((IoContextPoolItem*)pListEntry)->payload.ioContext; 480 } 481 482 int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext) 483 { 484 // TODO (hellner): Overlapped IO should be completed at this point. Perhaps 485 // add an assert? 486 const bool overlappedIOCompleted = HasOverlappedIoCompleted( 487 (LPOVERLAPPED)pIoContext); 488 489 IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base; 490 491 const int32_t usedItems = --_inUse; 492 const int32_t totalItems = _size.Value(); 493 const int32_t freeItems = totalItems - usedItems; 494 if(freeItems < 0) 495 { 496 assert(false); 497 AlignedFree(item); 498 return -1; 499 } 500 if((freeItems >= totalItems>>1) && 501 overlappedIOCompleted) 502 { 503 AlignedFree(item); 504 --_size; 505 return 0; 506 } 507 InterlockedPushEntrySList(_pListHead, &(item->itemEntry)); 508 return 0; 509 } 510 511 int32_t IoContextPool::Free() 512 { 513 if(!_init) 514 { 515 return 0; 516 } 517 518 int32_t itemsFreed = 0; 519 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); 520 while(pListEntry != NULL) 521 { 522 IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry); 523 AlignedFree(item); 524 --_size; 525 itemsFreed++; 526 pListEntry = InterlockedPopEntrySList(_pListHead); 527 } 528 return itemsFreed; 529 } 530 531 int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0; 532 533 UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle) 534 : _ioCompletionHandle(ioCompletionHandle), 535 _pThread(NULL), 536 _init(false) 537 { 538 _workerNumber = _numOfWorkers++; 539 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, 540 "UdpSocket2WorkerWindows created"); 541 } 542 543 UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows() 544 { 545 if(_pThread) 546 { 547 delete _pThread; 548 } 549 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, 550 "UdpSocket2WorkerWindows deleted"); 551 } 552 553 bool UdpSocket2WorkerWindows::Start() 554 { 555 unsigned int id = 0; 556 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, 557 "Start UdpSocket2WorkerWindows"); 558 return _pThread->Start(id); 559 } 560 561 bool UdpSocket2WorkerWindows::Stop() 562 { 563 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, 564 "Stop UdpSocket2WorkerWindows"); 565 return _pThread->Stop(); 566 } 567 568 void UdpSocket2WorkerWindows::SetNotAlive() 569 { 570 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, 571 "SetNotAlive UdpSocket2WorkerWindows"); 572 _pThread->SetNotAlive(); 573 } 574 575 int32_t UdpSocket2WorkerWindows::Init() 576 { 577 if(!_init) 578 { 579 const char* threadName = "UdpSocket2ManagerWindows_thread"; 580 _pThread = ThreadWrapper::CreateThread(Run, this, kRealtimePriority, 581 threadName); 582 if(_pThread == NULL) 583 { 584 WEBRTC_TRACE( 585 kTraceError, 586 kTraceTransport, 587 -1, 588 "UdpSocket2WorkerWindows(%d)::Init(), error creating thread!", 589 _workerNumber); 590 return -1; 591 } 592 _init = true; 593 } 594 return 0; 595 } 596 597 bool UdpSocket2WorkerWindows::Run(ThreadObj obj) 598 { 599 UdpSocket2WorkerWindows* pWorker = 600 static_cast<UdpSocket2WorkerWindows*>(obj); 601 return pWorker->Process(); 602 } 603 604 // Process should always return true. Stopping the worker threads is done in 605 // the UdpSocket2ManagerWindows::StopWorkerThreads() function. 606 bool UdpSocket2WorkerWindows::Process() 607 { 608 int32_t success = 0; 609 DWORD ioSize = 0; 610 UdpSocket2Windows* pSocket = NULL; 611 PerIoContext* pIOContext = 0; 612 OVERLAPPED* pOverlapped = 0; 613 success = GetQueuedCompletionStatus(_ioCompletionHandle, 614 &ioSize, 615 (ULONG_PTR*)&pSocket, &pOverlapped, 200); 616 617 uint32_t error = 0; 618 if(!success) 619 { 620 error = GetLastError(); 621 if(error == WAIT_TIMEOUT) 622 { 623 return true; 624 } 625 // This may happen if e.g. PostQueuedCompletionStatus() has been called. 626 // The IO context still needs to be reclaimed or re-used which is done 627 // in UdpSocket2Windows::IOCompleted(..). 628 } 629 if(pSocket == NULL) 630 { 631 WEBRTC_TRACE( 632 kTraceDebug, 633 kTraceTransport, 634 -1, 635 "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread", 636 _workerNumber); 637 return true; 638 } 639 pIOContext = (PerIoContext*)pOverlapped; 640 pSocket->IOCompleted(pIOContext,ioSize,error); 641 return true; 642 } 643 644 } // namespace test 645 } // namespace webrtc 646