1 // StreamBinder.cpp 2 3 #include "StdAfx.h" 4 5 #include "StreamBinder.h" 6 #include "../../Common/Defs.h" 7 #include "../../Common/MyCom.h" 8 9 using namespace NWindows; 10 using namespace NSynchronization; 11 12 class CSequentialInStreamForBinder: 13 public ISequentialInStream, 14 public CMyUnknownImp 15 { 16 public: 17 MY_UNKNOWN_IMP 18 19 STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize); 20 private: 21 CStreamBinder *m_StreamBinder; 22 public: 23 ~CSequentialInStreamForBinder() { m_StreamBinder->CloseRead(); } 24 void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; } 25 }; 26 27 STDMETHODIMP CSequentialInStreamForBinder::Read(void *data, UInt32 size, UInt32 *processedSize) 28 { return m_StreamBinder->Read(data, size, processedSize); } 29 30 class CSequentialOutStreamForBinder: 31 public ISequentialOutStream, 32 public CMyUnknownImp 33 { 34 public: 35 MY_UNKNOWN_IMP 36 37 STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize); 38 39 private: 40 CStreamBinder *m_StreamBinder; 41 public: 42 ~CSequentialOutStreamForBinder() { m_StreamBinder->CloseWrite(); } 43 void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; } 44 }; 45 46 STDMETHODIMP CSequentialOutStreamForBinder::Write(const void *data, UInt32 size, UInt32 *processedSize) 47 { return m_StreamBinder->Write(data, size, processedSize); } 48 49 50 ////////////////////////// 51 // CStreamBinder 52 // (_thereAreBytesToReadEvent && _bufferSize == 0) means that stream is finished. 53 54 HRes CStreamBinder::CreateEvents() 55 { 56 RINOK(_allBytesAreWritenEvent.Create(true)); 57 RINOK(_thereAreBytesToReadEvent.Create()); 58 return _readStreamIsClosedEvent.Create(); 59 } 60 61 void CStreamBinder::ReInit() 62 { 63 _thereAreBytesToReadEvent.Reset(); 64 _readStreamIsClosedEvent.Reset(); 65 ProcessedSize = 0; 66 } 67 68 69 70 void CStreamBinder::CreateStreams(ISequentialInStream **inStream, 71 ISequentialOutStream **outStream) 72 { 73 CSequentialInStreamForBinder *inStreamSpec = new 74 CSequentialInStreamForBinder; 75 CMyComPtr<ISequentialInStream> inStreamLoc(inStreamSpec); 76 inStreamSpec->SetBinder(this); 77 *inStream = inStreamLoc.Detach(); 78 79 CSequentialOutStreamForBinder *outStreamSpec = new 80 CSequentialOutStreamForBinder; 81 CMyComPtr<ISequentialOutStream> outStreamLoc(outStreamSpec); 82 outStreamSpec->SetBinder(this); 83 *outStream = outStreamLoc.Detach(); 84 85 _buffer = NULL; 86 _bufferSize= 0; 87 ProcessedSize = 0; 88 } 89 90 HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize) 91 { 92 UInt32 sizeToRead = size; 93 if (size > 0) 94 { 95 RINOK(_thereAreBytesToReadEvent.Lock()); 96 sizeToRead = MyMin(_bufferSize, size); 97 if (_bufferSize > 0) 98 { 99 memcpy(data, _buffer, sizeToRead); 100 _buffer = ((const Byte *)_buffer) + sizeToRead; 101 _bufferSize -= sizeToRead; 102 if (_bufferSize == 0) 103 { 104 _thereAreBytesToReadEvent.Reset(); 105 _allBytesAreWritenEvent.Set(); 106 } 107 } 108 } 109 if (processedSize != NULL) 110 *processedSize = sizeToRead; 111 ProcessedSize += sizeToRead; 112 return S_OK; 113 } 114 115 void CStreamBinder::CloseRead() 116 { 117 _readStreamIsClosedEvent.Set(); 118 } 119 120 HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize) 121 { 122 if (size > 0) 123 { 124 _buffer = data; 125 _bufferSize = size; 126 _allBytesAreWritenEvent.Reset(); 127 _thereAreBytesToReadEvent.Set(); 128 129 HANDLE events[2]; 130 events[0] = _allBytesAreWritenEvent; 131 events[1] = _readStreamIsClosedEvent; 132 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); 133 if (waitResult != WAIT_OBJECT_0 + 0) 134 { 135 // ReadingWasClosed = true; 136 return S_FALSE; 137 } 138 // if(!_allBytesAreWritenEvent.Lock()) 139 // return E_FAIL; 140 } 141 if (processedSize != NULL) 142 *processedSize = size; 143 return S_OK; 144 } 145 146 void CStreamBinder::CloseWrite() 147 { 148 // _bufferSize must be = 0 149 _thereAreBytesToReadEvent.Set(); 150 } 151