Home | History | Annotate | Download | only in channel_transport
      1 /*
      2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/test/channel_transport/udp_socket2_manager_win.h"
     12 
     13 #include <assert.h>
     14 #include <stdio.h>
     15 
     16 #include "webrtc/system_wrappers/include/aligned_malloc.h"
     17 #include "webrtc/test/channel_transport/udp_socket2_win.h"
     18 
     19 namespace webrtc {
     20 namespace test {
     21 
     22 uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0;
     23 bool UdpSocket2ManagerWindows::_wsaInit = false;
     24 
     25 UdpSocket2ManagerWindows::UdpSocket2ManagerWindows()
     26     : UdpSocketManager(),
     27       _id(-1),
     28       _stopped(false),
     29       _init(false),
     30       _pCrit(CriticalSectionWrapper::CreateCriticalSection()),
     31       _ioCompletionHandle(NULL),
     32       _numActiveSockets(0),
     33       _event(EventWrapper::Create())
     34 {
     35     _managerNumber = _numOfActiveManagers++;
     36 
     37     if(_numOfActiveManagers == 1)
     38     {
     39         WORD wVersionRequested = MAKEWORD(2, 2);
     40         WSADATA wsaData;
     41         _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0;
     42         // TODO (hellner): seems safer to use RAII for this. E.g. what happens
     43         //                 if a UdpSocket2ManagerWindows() created and destroyed
     44         //                 without being initialized.
     45     }
     46 }
     47 
     48 UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows()
     49 {
     50     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
     51                  "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()",
     52                  _managerNumber);
     53 
     54     if(_init)
     55     {
     56         _pCrit->Enter();
     57         if(_numActiveSockets)
     58         {
     59             _pCrit->Leave();
     60             _event->Wait(INFINITE);
     61         }
     62         else
     63         {
     64             _pCrit->Leave();
     65         }
     66         StopWorkerThreads();
     67 
     68         for (WorkerList::iterator iter = _workerThreadsList.begin();
     69              iter != _workerThreadsList.end(); ++iter) {
     70           delete *iter;
     71         }
     72         _workerThreadsList.clear();
     73         _ioContextPool.Free();
     74 
     75         _numOfActiveManagers--;
     76         if(_ioCompletionHandle)
     77         {
     78             CloseHandle(_ioCompletionHandle);
     79         }
     80         if (_numOfActiveManagers == 0)
     81         {
     82             if(_wsaInit)
     83             {
     84                 WSACleanup();
     85             }
     86         }
     87     }
     88     if(_pCrit)
     89     {
     90         delete _pCrit;
     91     }
     92     if(_event)
     93     {
     94         delete _event;
     95     }
     96 }
     97 
     98 bool UdpSocket2ManagerWindows::Init(int32_t id,
     99                                     uint8_t& numOfWorkThreads) {
    100   CriticalSectionScoped cs(_pCrit);
    101   if ((_id != -1) || (_numOfWorkThreads != 0)) {
    102       assert(_id != -1);
    103       assert(_numOfWorkThreads != 0);
    104       return false;
    105   }
    106   _id = id;
    107   _numOfWorkThreads = numOfWorkThreads;
    108   return true;
    109 }
    110 
    111 bool UdpSocket2ManagerWindows::Start()
    112 {
    113     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
    114                  "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber);
    115     if(!_init)
    116     {
    117         StartWorkerThreads();
    118     }
    119 
    120     if(!_init)
    121     {
    122         return false;
    123     }
    124     _pCrit->Enter();
    125     // Start worker threads.
    126     _stopped = false;
    127     int32_t error = 0;
    128     for (WorkerList::iterator iter = _workerThreadsList.begin();
    129          iter != _workerThreadsList.end() && !error; ++iter) {
    130       if(!(*iter)->Start())
    131         error = 1;
    132     }
    133     if(error)
    134     {
    135         WEBRTC_TRACE(
    136             kTraceError,
    137             kTraceTransport,
    138             _id,
    139             "UdpSocket2ManagerWindows(%d)::Start() error starting worker\
    140  threads",
    141             _managerNumber);
    142         _pCrit->Leave();
    143         return false;
    144     }
    145     _pCrit->Leave();
    146     return true;
    147 }
    148 
    149 bool UdpSocket2ManagerWindows::StartWorkerThreads()
    150 {
    151     if(!_init)
    152     {
    153         _pCrit->Enter();
    154 
    155         _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
    156                                                      0, 0);
    157         if(_ioCompletionHandle == NULL)
    158         {
    159             int32_t error = GetLastError();
    160             WEBRTC_TRACE(
    161                 kTraceError,
    162                 kTraceTransport,
    163                 _id,
    164                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()"
    165                 "_ioCompletioHandle == NULL: error:%d",
    166                 _managerNumber,error);
    167             _pCrit->Leave();
    168             return false;
    169         }
    170 
    171         // Create worker threads.
    172         uint32_t i = 0;
    173         bool error = false;
    174         while(i < _numOfWorkThreads && !error)
    175         {
    176             UdpSocket2WorkerWindows* pWorker =
    177                 new UdpSocket2WorkerWindows(_ioCompletionHandle);
    178             if(pWorker->Init() != 0)
    179             {
    180                 error = true;
    181                 delete pWorker;
    182                 break;
    183             }
    184             _workerThreadsList.push_front(pWorker);
    185             i++;
    186         }
    187         if(error)
    188         {
    189             WEBRTC_TRACE(
    190                 kTraceError,
    191                 kTraceTransport,
    192                 _id,
    193                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
    194                 "creating work threads",
    195                 _managerNumber);
    196             // Delete worker threads.
    197             for (WorkerList::iterator iter = _workerThreadsList.begin();
    198                  iter != _workerThreadsList.end(); ++iter) {
    199               delete *iter;
    200             }
    201             _workerThreadsList.clear();
    202             _pCrit->Leave();
    203             return false;
    204         }
    205         if(_ioContextPool.Init())
    206         {
    207             WEBRTC_TRACE(
    208                 kTraceError,
    209                 kTraceTransport,
    210                 _id,
    211                 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
    212                 "initiating _ioContextPool",
    213                 _managerNumber);
    214             _pCrit->Leave();
    215             return false;
    216         }
    217         _init = true;
    218         WEBRTC_TRACE(
    219             kTraceDebug,
    220             kTraceTransport,
    221             _id,
    222             "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work "
    223             "threads created and initialized",
    224             _numOfWorkThreads);
    225         _pCrit->Leave();
    226     }
    227     return true;
    228 }
    229 
    230 bool UdpSocket2ManagerWindows::Stop()
    231 {
    232     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
    233                  "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber);
    234 
    235     if(!_init)
    236     {
    237         return false;
    238     }
    239     _pCrit->Enter();
    240     _stopped = true;
    241     if(_numActiveSockets)
    242     {
    243         WEBRTC_TRACE(
    244             kTraceError,
    245             kTraceTransport,
    246             _id,
    247             "UdpSocket2ManagerWindows(%d)::Stop() there is still active\
    248  sockets",
    249             _managerNumber);
    250         _pCrit->Leave();
    251         return false;
    252     }
    253     // No active sockets. Stop all worker threads.
    254     bool result = StopWorkerThreads();
    255     _pCrit->Leave();
    256     return result;
    257 }
    258 
    259 bool UdpSocket2ManagerWindows::StopWorkerThreads()
    260 {
    261     int32_t error = 0;
    262     WEBRTC_TRACE(
    263         kTraceDebug,
    264         kTraceTransport,
    265         _id,
    266         "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\
    267  threadsStoped, numActicve Sockets=%d",
    268         _managerNumber,
    269         _numActiveSockets);
    270 
    271     // Release all threads waiting for GetQueuedCompletionStatus(..).
    272     if(_ioCompletionHandle)
    273     {
    274         uint32_t i = 0;
    275         for(i = 0; i < _workerThreadsList.size(); i++)
    276         {
    277             PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL);
    278         }
    279     }
    280     for (WorkerList::iterator iter = _workerThreadsList.begin();
    281          iter != _workerThreadsList.end(); ++iter) {
    282         if((*iter)->Stop() == false)
    283         {
    284             error = -1;
    285             WEBRTC_TRACE(kTraceWarning,  kTraceTransport, -1,
    286                          "failed to stop worker thread");
    287         }
    288     }
    289 
    290     if(error)
    291     {
    292         WEBRTC_TRACE(
    293             kTraceError,
    294             kTraceTransport,
    295             _id,
    296             "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\
    297  worker threads",
    298             _managerNumber);
    299         return false;
    300     }
    301     return true;
    302 }
    303 
    304 bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s)
    305 {
    306     WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
    307                  "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber);
    308     if(!_init)
    309     {
    310         WEBRTC_TRACE(
    311             kTraceError,
    312             kTraceTransport,
    313             _id,
    314             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\
    315  initialized",
    316             _managerNumber);
    317         return false;
    318     }
    319     _pCrit->Enter();
    320     if(s == NULL)
    321     {
    322         WEBRTC_TRACE(
    323             kTraceError,
    324             kTraceTransport,
    325             _id,
    326             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL",
    327             _managerNumber);
    328         _pCrit->Leave();
    329         return false;
    330     }
    331     if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET)
    332     {
    333         WEBRTC_TRACE(
    334             kTraceError,
    335             kTraceTransport,
    336             _id,
    337             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\
    338  %d",
    339             _managerNumber,
    340             (int32_t)s->GetFd());
    341         _pCrit->Leave();
    342         return false;
    343 
    344     }
    345     _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(),
    346                                                  _ioCompletionHandle,
    347                                                  (ULONG_PTR)(s), 0);
    348     if(_ioCompletionHandle == NULL)
    349     {
    350         int32_t error = GetLastError();
    351         WEBRTC_TRACE(
    352             kTraceError,
    353             kTraceTransport,
    354             _id,
    355             "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\
    356  completion: %d",
    357             _managerNumber,
    358             error);
    359         _pCrit->Leave();
    360         return false;
    361     }
    362     _numActiveSockets++;
    363     _pCrit->Leave();
    364     return true;
    365 }
    366 bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s)
    367 {
    368     if(!_init)
    369     {
    370         return false;
    371     }
    372     _pCrit->Enter();
    373     _numActiveSockets--;
    374     if(_numActiveSockets == 0)
    375     {
    376         _event->Set();
    377     }
    378     _pCrit->Leave();
    379     return true;
    380 }
    381 
    382 PerIoContext* UdpSocket2ManagerWindows::PopIoContext()
    383 {
    384     if(!_init)
    385     {
    386         return NULL;
    387     }
    388 
    389     PerIoContext* pIoC = NULL;
    390     if(!_stopped)
    391     {
    392         pIoC = _ioContextPool.PopIoContext();
    393     }else
    394     {
    395         WEBRTC_TRACE(
    396             kTraceError,
    397             kTraceTransport,
    398             _id,
    399             "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started",
    400             _managerNumber);
    401     }
    402     return pIoC;
    403 }
    404 
    405 int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext)
    406 {
    407     return _ioContextPool.PushIoContext(pIoContext);
    408 }
    409 
    410 IoContextPool::IoContextPool()
    411     : _pListHead(NULL),
    412       _init(false),
    413       _size(0),
    414       _inUse(0)
    415 {
    416 }
    417 
    418 IoContextPool::~IoContextPool()
    419 {
    420     Free();
    421     assert(_size.Value() == 0);
    422     AlignedFree(_pListHead);
    423 }
    424 
    425 int32_t IoContextPool::Init(uint32_t /*increaseSize*/)
    426 {
    427     if(_init)
    428     {
    429         return 0;
    430     }
    431 
    432     _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER),
    433                                               MEMORY_ALLOCATION_ALIGNMENT);
    434     if(_pListHead == NULL)
    435     {
    436         return -1;
    437     }
    438     InitializeSListHead(_pListHead);
    439     _init = true;
    440     return 0;
    441 }
    442 
    443 PerIoContext* IoContextPool::PopIoContext()
    444 {
    445     if(!_init)
    446     {
    447         return NULL;
    448     }
    449 
    450     PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
    451     if(pListEntry == NULL)
    452     {
    453         IoContextPoolItem* item = (IoContextPoolItem*)
    454             AlignedMalloc(
    455                 sizeof(IoContextPoolItem),
    456                 MEMORY_ALLOCATION_ALIGNMENT);
    457         if(item == NULL)
    458         {
    459             return NULL;
    460         }
    461         memset(&item->payload.ioContext,0,sizeof(PerIoContext));
    462         item->payload.base = item;
    463         pListEntry = &(item->itemEntry);
    464         ++_size;
    465     }
    466     ++_inUse;
    467     return &((IoContextPoolItem*)pListEntry)->payload.ioContext;
    468 }
    469 
    470 int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext)
    471 {
    472     // TODO (hellner): Overlapped IO should be completed at this point. Perhaps
    473     //                 add an assert?
    474     const bool overlappedIOCompleted = HasOverlappedIoCompleted(
    475         (LPOVERLAPPED)pIoContext);
    476 
    477     IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base;
    478 
    479     const int32_t usedItems = --_inUse;
    480     const int32_t totalItems = _size.Value();
    481     const int32_t freeItems = totalItems - usedItems;
    482     if(freeItems < 0)
    483     {
    484         assert(false);
    485         AlignedFree(item);
    486         return -1;
    487     }
    488     if((freeItems >= totalItems>>1) &&
    489         overlappedIOCompleted)
    490     {
    491         AlignedFree(item);
    492         --_size;
    493         return 0;
    494     }
    495     InterlockedPushEntrySList(_pListHead, &(item->itemEntry));
    496     return 0;
    497 }
    498 
    499 int32_t IoContextPool::Free()
    500 {
    501     if(!_init)
    502     {
    503         return 0;
    504     }
    505 
    506     int32_t itemsFreed = 0;
    507     PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
    508     while(pListEntry != NULL)
    509     {
    510         IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry);
    511         AlignedFree(item);
    512         --_size;
    513         itemsFreed++;
    514         pListEntry = InterlockedPopEntrySList(_pListHead);
    515     }
    516     return itemsFreed;
    517 }
    518 
    519 int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0;
    520 
    521 UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)
    522     : _ioCompletionHandle(ioCompletionHandle),
    523       _pThread(Run, this, "UdpSocket2ManagerWindows_thread"),
    524       _init(false) {
    525     _workerNumber = _numOfWorkers++;
    526     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
    527                  "UdpSocket2WorkerWindows created");
    528 }
    529 
    530 UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows()
    531 {
    532     WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
    533                  "UdpSocket2WorkerWindows deleted");
    534 }
    535 
    536 bool UdpSocket2WorkerWindows::Start()
    537 {
    538     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
    539                  "Start UdpSocket2WorkerWindows");
    540     _pThread.Start();
    541 
    542     _pThread.SetPriority(rtc::kRealtimePriority);
    543     return true;
    544 }
    545 
    546 bool UdpSocket2WorkerWindows::Stop()
    547 {
    548     WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
    549                  "Stop UdpSocket2WorkerWindows");
    550     _pThread.Stop();
    551     return true;
    552 }
    553 
    554 int32_t UdpSocket2WorkerWindows::Init()
    555 {
    556   _init = true;
    557   return 0;
    558 }
    559 
    560 bool UdpSocket2WorkerWindows::Run(void* obj)
    561 {
    562     UdpSocket2WorkerWindows* pWorker =
    563         static_cast<UdpSocket2WorkerWindows*>(obj);
    564     return pWorker->Process();
    565 }
    566 
    567 // Process should always return true. Stopping the worker threads is done in
    568 // the UdpSocket2ManagerWindows::StopWorkerThreads() function.
    569 bool UdpSocket2WorkerWindows::Process()
    570 {
    571     int32_t success = 0;
    572     DWORD ioSize = 0;
    573     UdpSocket2Windows* pSocket = NULL;
    574     PerIoContext* pIOContext = 0;
    575     OVERLAPPED* pOverlapped = 0;
    576     success = GetQueuedCompletionStatus(_ioCompletionHandle,
    577                                         &ioSize,
    578                                        (ULONG_PTR*)&pSocket, &pOverlapped, 200);
    579 
    580     uint32_t error = 0;
    581     if(!success)
    582     {
    583         error = GetLastError();
    584         if(error == WAIT_TIMEOUT)
    585         {
    586             return true;
    587         }
    588         // This may happen if e.g. PostQueuedCompletionStatus() has been called.
    589         // The IO context still needs to be reclaimed or re-used which is done
    590         // in UdpSocket2Windows::IOCompleted(..).
    591     }
    592     if(pSocket == NULL)
    593     {
    594         WEBRTC_TRACE(
    595             kTraceDebug,
    596             kTraceTransport,
    597             -1,
    598             "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread",
    599             _workerNumber);
    600         return true;
    601     }
    602     pIOContext = (PerIoContext*)pOverlapped;
    603     pSocket->IOCompleted(pIOContext,ioSize,error);
    604     return true;
    605 }
    606 
    607 }  // namespace test
    608 }  // namespace webrtc
    609