Home | History | Annotate | Download | only in channel_transport
      1 /*
      2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/test/channel_transport/udp_socket2_manager_win.h"
     12 
     13 #include <assert.h>
     14 #include <stdio.h>
     15 
     16 #include "webrtc/system_wrappers/interface/aligned_malloc.h"
     17 #include "webrtc/test/channel_transport/udp_socket2_win.h"
     18 
     19 namespace webrtc {
     20 namespace test {
     21 
     22 uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0;
     23 bool UdpSocket2ManagerWindows::_wsaInit = false;
     24 
     25 UdpSocket2ManagerWindows::UdpSocket2ManagerWindows()
     26     : UdpSocketManager(),
     27       _id(-1),
     28       _stopped(false),
     29       _init(false),
     30       _pCrit(CriticalSectionWrapper::CreateCriticalSection()),
     31       _ioCompletionHandle(NULL),
     32       _numActiveSockets(0),
     33       _event(EventWrapper::Create())
     34 {
     35     _managerNumber = _numOfActiveManagers++;
     36 
     37     if(_numOfActiveManagers == 1)
     38     {
     39         WORD wVersionRequested = MAKEWORD(2, 2);
     40         WSADATA wsaData;
     41         _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0;
     42         // TODO (hellner): seems safer to use RAII for this. E.g. what happens
     43         //                 if a UdpSocket2ManagerWindows() created and destroyed
     44         //                 without being initialized.
     45     }
     46 }
     47 
     48 UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows()
     49 {
     50     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
     51                  "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()",
     52                  _managerNumber);
     53 
     54     if(_init)
     55     {
     56         _pCrit->Enter();
     57         if(_numActiveSockets)
     58         {
     59             _pCrit->Leave();
     60             _event->Wait(INFINITE);
     61         }
     62         else
     63         {
     64             _pCrit->Leave();
     65         }
     66         StopWorkerThreads();
     67 
     68         for (WorkerList::iterator iter = _workerThreadsList.begin();
     69              iter != _workerThreadsList.end(); ++iter) {
     70           delete *iter;
     71         }
     72         _workerThreadsList.clear();
     73         _ioContextPool.Free();
     74 
     75         _numOfActiveManagers--;
     76         if(_ioCompletionHandle)
     77         {
     78             CloseHandle(_ioCompletionHandle);
     79         }
     80         if (_numOfActiveManagers == 0)
     81         {
     82             if(_wsaInit)
     83             {
     84                 WSACleanup();
     85             }
     86         }
     87     }
     88     if(_pCrit)
     89     {
     90         delete _pCrit;
     91     }
     92     if(_event)
     93     {
     94         delete _event;
     95     }
     96 }
     97 
     98 bool UdpSocket2ManagerWindows::Init(int32_t id,
     99                                     uint8_t& numOfWorkThreads) {
    100   CriticalSectionScoped cs(_pCrit);
    101   if ((_id != -1) || (_numOfWorkThreads != 0)) {
    102       assert(_id != -1);
    103       assert(_numOfWorkThreads != 0);
    104       return false;
    105   }
    106   _id = id;
    107   _numOfWorkThreads = numOfWorkThreads;
    108   return true;
    109 }
    110 
    111 int32_t UdpSocket2ManagerWindows::ChangeUniqueId(const int32_t id)
    112 {
    113     _id = id;
    114     return 0;
    115 }
    116 
    117 bool UdpSocket2ManagerWindows::Start()
    118 {
    119     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
    120                  "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber);
    121     if(!_init)
    122     {
    123         StartWorkerThreads();
    124     }
    125 
    126     if(!_init)
    127     {
    128         return false;
    129     }
    130     _pCrit->Enter();
    131     // Start worker threads.
    132     _stopped = false;
    133     int32_t error = 0;
    134     for (WorkerList::iterator iter = _workerThreadsList.begin();
    135          iter != _workerThreadsList.end() && !error; ++iter) {
    136       if(!(*iter)->Start())
    137         error = 1;
    138     }
    139     if(error)
    140     {
    141         WEBRTC_TRACE(
    142             kTraceError,
    143             kTraceTransport,
    144             _id,
    145             "UdpSocket2ManagerWindows(%d)::Start() error starting worker\
    146  threads",
    147             _managerNumber);
    148         _pCrit->Leave();
    149         return false;
    150     }
    151     _pCrit->Leave();
    152     return true;
    153 }
    154 
    155 bool UdpSocket2ManagerWindows::StartWorkerThreads()
    156 {
    157     if(!_init)
    158     {
    159         _pCrit->Enter();
    160 
    161         _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
    162                                                      0, 0);
    163         if(_ioCompletionHandle == NULL)
    164         {
    165             int32_t error = GetLastError();
    166             WEBRTC_TRACE(
    167                 kTraceError,
    168                 kTraceTransport,
    169                 _id,
    170                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()"
    171                 "_ioCompletioHandle == NULL: error:%d",
    172                 _managerNumber,error);
    173             _pCrit->Leave();
    174             return false;
    175         }
    176 
    177         // Create worker threads.
    178         uint32_t i = 0;
    179         bool error = false;
    180         while(i < _numOfWorkThreads && !error)
    181         {
    182             UdpSocket2WorkerWindows* pWorker =
    183                 new UdpSocket2WorkerWindows(_ioCompletionHandle);
    184             if(pWorker->Init() != 0)
    185             {
    186                 error = true;
    187                 delete pWorker;
    188                 break;
    189             }
    190             _workerThreadsList.push_front(pWorker);
    191             i++;
    192         }
    193         if(error)
    194         {
    195             WEBRTC_TRACE(
    196                 kTraceError,
    197                 kTraceTransport,
    198                 _id,
    199                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
    200                 "creating work threads",
    201                 _managerNumber);
    202             // Delete worker threads.
    203             for (WorkerList::iterator iter = _workerThreadsList.begin();
    204                  iter != _workerThreadsList.end(); ++iter) {
    205               delete *iter;
    206             }
    207             _workerThreadsList.clear();
    208             _pCrit->Leave();
    209             return false;
    210         }
    211         if(_ioContextPool.Init())
    212         {
    213             WEBRTC_TRACE(
    214                 kTraceError,
    215                 kTraceTransport,
    216                 _id,
    217                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
    218                 "initiating _ioContextPool",
    219                 _managerNumber);
    220             _pCrit->Leave();
    221             return false;
    222         }
    223         _init = true;
    224         WEBRTC_TRACE(
    225             kTraceDebug,
    226             kTraceTransport,
    227             _id,
    228             "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work "
    229             "threads created and initialized",
    230             _numOfWorkThreads);
    231         _pCrit->Leave();
    232     }
    233     return true;
    234 }
    235 
    236 bool UdpSocket2ManagerWindows::Stop()
    237 {
    238     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
    239                  "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber);
    240 
    241     if(!_init)
    242     {
    243         return false;
    244     }
    245     _pCrit->Enter();
    246     _stopped = true;
    247     if(_numActiveSockets)
    248     {
    249         WEBRTC_TRACE(
    250             kTraceError,
    251             kTraceTransport,
    252             _id,
    253             "UdpSocket2ManagerWindows(%d)::Stop() there is still active\
    254  sockets",
    255             _managerNumber);
    256         _pCrit->Leave();
    257         return false;
    258     }
    259     // No active sockets. Stop all worker threads.
    260     bool result = StopWorkerThreads();
    261     _pCrit->Leave();
    262     return result;
    263 }
    264 
    265 bool UdpSocket2ManagerWindows::StopWorkerThreads()
    266 {
    267     int32_t error = 0;
    268     WEBRTC_TRACE(
    269         kTraceDebug,
    270         kTraceTransport,
    271         _id,
    272         "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\
    273  threadsStoped, numActicve Sockets=%d",
    274         _managerNumber,
    275         _numActiveSockets);
    276 
    277     // Set worker threads to not alive so that they will stop calling
    278     // UdpSocket2WorkerWindows::Run().
    279     for (WorkerList::iterator iter = _workerThreadsList.begin();
    280          iter != _workerThreadsList.end(); ++iter) {
    281         (*iter)->SetNotAlive();
    282     }
    283     // Release all threads waiting for GetQueuedCompletionStatus(..).
    284     if(_ioCompletionHandle)
    285     {
    286         uint32_t i = 0;
    287         for(i = 0; i < _workerThreadsList.size(); i++)
    288         {
    289             PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL);
    290         }
    291     }
    292     for (WorkerList::iterator iter = _workerThreadsList.begin();
    293          iter != _workerThreadsList.end(); ++iter) {
    294         if((*iter)->Stop() == false)
    295         {
    296             error = -1;
    297             WEBRTC_TRACE(kTraceWarning,  kTraceTransport, -1,
    298                          "failed to stop worker thread");
    299         }
    300     }
    301 
    302     if(error)
    303     {
    304         WEBRTC_TRACE(
    305             kTraceError,
    306             kTraceTransport,
    307             _id,
    308             "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\
    309  worker threads",
    310             _managerNumber);
    311         return false;
    312     }
    313     return true;
    314 }
    315 
    316 bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s)
    317 {
    318     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
    319                  "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber);
    320     if(!_init)
    321     {
    322         WEBRTC_TRACE(
    323             kTraceError,
    324             kTraceTransport,
    325             _id,
    326             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\
    327  initialized",
    328             _managerNumber);
    329         return false;
    330     }
    331     _pCrit->Enter();
    332     if(s == NULL)
    333     {
    334         WEBRTC_TRACE(
    335             kTraceError,
    336             kTraceTransport,
    337             _id,
    338             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL",
    339             _managerNumber);
    340         _pCrit->Leave();
    341         return false;
    342     }
    343     if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET)
    344     {
    345         WEBRTC_TRACE(
    346             kTraceError,
    347             kTraceTransport,
    348             _id,
    349             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\
    350  %d",
    351             _managerNumber,
    352             (int32_t)s->GetFd());
    353         _pCrit->Leave();
    354         return false;
    355 
    356     }
    357     _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(),
    358                                                  _ioCompletionHandle,
    359                                                  (ULONG_PTR)(s), 0);
    360     if(_ioCompletionHandle == NULL)
    361     {
    362         int32_t error = GetLastError();
    363         WEBRTC_TRACE(
    364             kTraceError,
    365             kTraceTransport,
    366             _id,
    367             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\
    368  completion: %d",
    369             _managerNumber,
    370             error);
    371         _pCrit->Leave();
    372         return false;
    373     }
    374     _numActiveSockets++;
    375     _pCrit->Leave();
    376     return true;
    377 }
    378 bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s)
    379 {
    380     if(!_init)
    381     {
    382         return false;
    383     }
    384     _pCrit->Enter();
    385     _numActiveSockets--;
    386     if(_numActiveSockets == 0)
    387     {
    388         _event->Set();
    389     }
    390     _pCrit->Leave();
    391     return true;
    392 }
    393 
    394 PerIoContext* UdpSocket2ManagerWindows::PopIoContext()
    395 {
    396     if(!_init)
    397     {
    398         return NULL;
    399     }
    400 
    401     PerIoContext* pIoC = NULL;
    402     if(!_stopped)
    403     {
    404         pIoC = _ioContextPool.PopIoContext();
    405     }else
    406     {
    407         WEBRTC_TRACE(
    408             kTraceError,
    409             kTraceTransport,
    410             _id,
    411             "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started",
    412             _managerNumber);
    413     }
    414     return pIoC;
    415 }
    416 
    417 int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext)
    418 {
    419     return _ioContextPool.PushIoContext(pIoContext);
    420 }
    421 
    422 IoContextPool::IoContextPool()
    423     : _pListHead(NULL),
    424       _init(false),
    425       _size(0),
    426       _inUse(0)
    427 {
    428 }
    429 
    430 IoContextPool::~IoContextPool()
    431 {
    432     Free();
    433     assert(_size.Value() == 0);
    434     AlignedFree(_pListHead);
    435 }
    436 
    437 int32_t IoContextPool::Init(uint32_t /*increaseSize*/)
    438 {
    439     if(_init)
    440     {
    441         return 0;
    442     }
    443 
    444     _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER),
    445                                               MEMORY_ALLOCATION_ALIGNMENT);
    446     if(_pListHead == NULL)
    447     {
    448         return -1;
    449     }
    450     InitializeSListHead(_pListHead);
    451     _init = true;
    452     return 0;
    453 }
    454 
    455 PerIoContext* IoContextPool::PopIoContext()
    456 {
    457     if(!_init)
    458     {
    459         return NULL;
    460     }
    461 
    462     PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
    463     if(pListEntry == NULL)
    464     {
    465         IoContextPoolItem* item = (IoContextPoolItem*)
    466             AlignedMalloc(
    467                 sizeof(IoContextPoolItem),
    468                 MEMORY_ALLOCATION_ALIGNMENT);
    469         if(item == NULL)
    470         {
    471             return NULL;
    472         }
    473         memset(&item->payload.ioContext,0,sizeof(PerIoContext));
    474         item->payload.base = item;
    475         pListEntry = &(item->itemEntry);
    476         ++_size;
    477     }
    478     ++_inUse;
    479     return &((IoContextPoolItem*)pListEntry)->payload.ioContext;
    480 }
    481 
    482 int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext)
    483 {
    484     // TODO (hellner): Overlapped IO should be completed at this point. Perhaps
    485     //                 add an assert?
    486     const bool overlappedIOCompleted = HasOverlappedIoCompleted(
    487         (LPOVERLAPPED)pIoContext);
    488 
    489     IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base;
    490 
    491     const int32_t usedItems = --_inUse;
    492     const int32_t totalItems = _size.Value();
    493     const int32_t freeItems = totalItems - usedItems;
    494     if(freeItems < 0)
    495     {
    496         assert(false);
    497         AlignedFree(item);
    498         return -1;
    499     }
    500     if((freeItems >= totalItems>>1) &&
    501         overlappedIOCompleted)
    502     {
    503         AlignedFree(item);
    504         --_size;
    505         return 0;
    506     }
    507     InterlockedPushEntrySList(_pListHead, &(item->itemEntry));
    508     return 0;
    509 }
    510 
    511 int32_t IoContextPool::Free()
    512 {
    513     if(!_init)
    514     {
    515         return 0;
    516     }
    517 
    518     int32_t itemsFreed = 0;
    519     PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
    520     while(pListEntry != NULL)
    521     {
    522         IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry);
    523         AlignedFree(item);
    524         --_size;
    525         itemsFreed++;
    526         pListEntry = InterlockedPopEntrySList(_pListHead);
    527     }
    528     return itemsFreed;
    529 }
    530 
    531 int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0;
    532 
    533 UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)
    534     : _ioCompletionHandle(ioCompletionHandle),
    535       _pThread(NULL),
    536       _init(false)
    537 {
    538     _workerNumber = _numOfWorkers++;
    539     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
    540                  "UdpSocket2WorkerWindows created");
    541 }
    542 
    543 UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows()
    544 {
    545     if(_pThread)
    546     {
    547         delete _pThread;
    548     }
    549     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
    550                  "UdpSocket2WorkerWindows deleted");
    551 }
    552 
    553 bool UdpSocket2WorkerWindows::Start()
    554 {
    555     unsigned int id = 0;
    556     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
    557                  "Start UdpSocket2WorkerWindows");
    558     return _pThread->Start(id);
    559 }
    560 
    561 bool UdpSocket2WorkerWindows::Stop()
    562 {
    563     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
    564                  "Stop UdpSocket2WorkerWindows");
    565     return _pThread->Stop();
    566 }
    567 
    568 void UdpSocket2WorkerWindows::SetNotAlive()
    569 {
    570     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
    571                  "SetNotAlive UdpSocket2WorkerWindows");
    572     _pThread->SetNotAlive();
    573 }
    574 
    575 int32_t UdpSocket2WorkerWindows::Init()
    576 {
    577     if(!_init)
    578     {
    579         const char* threadName = "UdpSocket2ManagerWindows_thread";
    580         _pThread = ThreadWrapper::CreateThread(Run, this, kRealtimePriority,
    581                                                threadName);
    582         if(_pThread == NULL)
    583         {
    584             WEBRTC_TRACE(
    585                 kTraceError,
    586                 kTraceTransport,
    587                 -1,
    588                 "UdpSocket2WorkerWindows(%d)::Init(), error creating thread!",
    589                 _workerNumber);
    590             return -1;
    591         }
    592         _init = true;
    593     }
    594     return 0;
    595 }
    596 
    597 bool UdpSocket2WorkerWindows::Run(ThreadObj obj)
    598 {
    599     UdpSocket2WorkerWindows* pWorker =
    600         static_cast<UdpSocket2WorkerWindows*>(obj);
    601     return pWorker->Process();
    602 }
    603 
    604 // Process should always return true. Stopping the worker threads is done in
    605 // the UdpSocket2ManagerWindows::StopWorkerThreads() function.
    606 bool UdpSocket2WorkerWindows::Process()
    607 {
    608     int32_t success = 0;
    609     DWORD ioSize = 0;
    610     UdpSocket2Windows* pSocket = NULL;
    611     PerIoContext* pIOContext = 0;
    612     OVERLAPPED* pOverlapped = 0;
    613     success = GetQueuedCompletionStatus(_ioCompletionHandle,
    614                                         &ioSize,
    615                                        (ULONG_PTR*)&pSocket, &pOverlapped, 200);
    616 
    617     uint32_t error = 0;
    618     if(!success)
    619     {
    620         error = GetLastError();
    621         if(error == WAIT_TIMEOUT)
    622         {
    623             return true;
    624         }
    625         // This may happen if e.g. PostQueuedCompletionStatus() has been called.
    626         // The IO context still needs to be reclaimed or re-used which is done
    627         // in UdpSocket2Windows::IOCompleted(..).
    628     }
    629     if(pSocket == NULL)
    630     {
    631         WEBRTC_TRACE(
    632             kTraceDebug,
    633             kTraceTransport,
    634             -1,
    635             "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread",
    636             _workerNumber);
    637         return true;
    638     }
    639     pIOContext = (PerIoContext*)pOverlapped;
    640     pSocket->IOCompleted(pIOContext,ioSize,error);
    641     return true;
    642 }
    643 
    644 }  // namespace test
    645 }  // namespace webrtc
    646