1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/basictypes.h" 6 #include "base/bind.h" 7 #include "base/memory/scoped_ptr.h" 8 #include "base/synchronization/waitable_event.h" 9 #include "base/threading/thread.h" 10 #include "chromecast/media/cma/ipc/media_memory_chunk.h" 11 #include "chromecast/media/cma/ipc/media_message.h" 12 #include "chromecast/media/cma/ipc/media_message_fifo.h" 13 #include "chromecast/media/cma/ipc/media_message_type.h" 14 #include "testing/gtest/include/gtest/gtest.h" 15 16 namespace chromecast { 17 namespace media { 18 19 namespace { 20 21 class FifoMemoryChunk : public MediaMemoryChunk { 22 public: 23 FifoMemoryChunk(void* mem, size_t size) 24 : mem_(mem), size_(size) {} 25 virtual ~FifoMemoryChunk() {} 26 27 virtual void* data() const OVERRIDE { return mem_; } 28 virtual size_t size() const OVERRIDE { return size_; } 29 virtual bool valid() const OVERRIDE { return true; } 30 31 private: 32 void* mem_; 33 size_t size_; 34 35 DISALLOW_COPY_AND_ASSIGN(FifoMemoryChunk); 36 }; 37 38 void MsgProducer(scoped_ptr<MediaMessageFifo> fifo, 39 int msg_count, 40 base::WaitableEvent* event) { 41 42 for (int k = 0; k < msg_count; k++) { 43 uint32 msg_type = 0x2 + (k % 5); 44 uint32 max_msg_content_size = k % 64; 45 do { 46 scoped_ptr<MediaMessage> msg1( 47 MediaMessage::CreateMessage( 48 msg_type, 49 base::Bind(&MediaMessageFifo::ReserveMemory, 50 base::Unretained(fifo.get())), 51 max_msg_content_size)); 52 if (msg1) 53 break; 54 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10)); 55 } while(true); 56 } 57 58 fifo.reset(); 59 60 event->Signal(); 61 } 62 63 void MsgConsumer(scoped_ptr<MediaMessageFifo> fifo, 64 int msg_count, 65 base::WaitableEvent* event) { 66 67 int k = 0; 68 while (k < msg_count) { 69 uint32 msg_type = 0x2 + (k % 5); 70 do { 71 scoped_ptr<MediaMessage> msg2(fifo->Pop()); 72 if (msg2) { 73 if (msg2->type() != PaddingMediaMsg) { 74 EXPECT_EQ(msg2->type(), msg_type); 75 k++; 76 } 77 break; 78 } 79 base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10)); 80 } while(true); 81 } 82 83 fifo.reset(); 84 85 event->Signal(); 86 } 87 88 void MsgProducerConsumer( 89 scoped_ptr<MediaMessageFifo> producer_fifo, 90 scoped_ptr<MediaMessageFifo> consumer_fifo, 91 base::WaitableEvent* event) { 92 for (int k = 0; k < 2048; k++) { 93 // Should have enough space to create a message. 94 uint32 msg_type = 0x2 + (k % 5); 95 uint32 max_msg_content_size = k % 64; 96 scoped_ptr<MediaMessage> msg1( 97 MediaMessage::CreateMessage( 98 msg_type, 99 base::Bind(&MediaMessageFifo::ReserveMemory, 100 base::Unretained(producer_fifo.get())), 101 max_msg_content_size)); 102 EXPECT_TRUE(msg1); 103 104 // Make sure the message is commited. 105 msg1.reset(); 106 107 // At this point, we should have a message to read. 108 scoped_ptr<MediaMessage> msg2(consumer_fifo->Pop()); 109 EXPECT_TRUE(msg2); 110 } 111 112 producer_fifo.reset(); 113 consumer_fifo.reset(); 114 115 event->Signal(); 116 } 117 118 } // namespace 119 120 TEST(MediaMessageFifoTest, AlternateWriteRead) { 121 size_t buffer_size = 64 * 1024; 122 scoped_ptr<uint64[]> buffer(new uint64[buffer_size / sizeof(uint64)]); 123 124 scoped_ptr<base::Thread> thread( 125 new base::Thread("FeederConsumerThread")); 126 thread->Start(); 127 128 scoped_ptr<MediaMessageFifo> producer_fifo(new MediaMessageFifo( 129 scoped_ptr<MediaMemoryChunk>( 130 new FifoMemoryChunk(&buffer[0], buffer_size)), 131 true)); 132 scoped_ptr<MediaMessageFifo> consumer_fifo(new MediaMessageFifo( 133 scoped_ptr<MediaMemoryChunk>( 134 new FifoMemoryChunk(&buffer[0], buffer_size)), 135 false)); 136 137 base::WaitableEvent event(false, false); 138 thread->message_loop_proxy()->PostTask( 139 FROM_HERE, 140 base::Bind(&MsgProducerConsumer, 141 base::Passed(&producer_fifo), 142 base::Passed(&consumer_fifo), 143 &event)); 144 event.Wait(); 145 146 thread.reset(); 147 } 148 149 TEST(MediaMessageFifoTest, MultiThreaded) { 150 size_t buffer_size = 64 * 1024; 151 scoped_ptr<uint64[]> buffer(new uint64[buffer_size / sizeof(uint64)]); 152 153 scoped_ptr<base::Thread> producer_thread( 154 new base::Thread("FeederThread")); 155 scoped_ptr<base::Thread> consumer_thread( 156 new base::Thread("ConsumerThread")); 157 producer_thread->Start(); 158 consumer_thread->Start(); 159 160 scoped_ptr<MediaMessageFifo> producer_fifo(new MediaMessageFifo( 161 scoped_ptr<MediaMemoryChunk>( 162 new FifoMemoryChunk(&buffer[0], buffer_size)), 163 true)); 164 scoped_ptr<MediaMessageFifo> consumer_fifo(new MediaMessageFifo( 165 scoped_ptr<MediaMemoryChunk>( 166 new FifoMemoryChunk(&buffer[0], buffer_size)), 167 false)); 168 169 base::WaitableEvent producer_event_done(false, false); 170 base::WaitableEvent consumer_event_done(false, false); 171 172 const int msg_count = 2048; 173 producer_thread->message_loop_proxy()->PostTask( 174 FROM_HERE, 175 base::Bind(&MsgProducer, 176 base::Passed(&producer_fifo), 177 msg_count, 178 &producer_event_done)); 179 consumer_thread->message_loop_proxy()->PostTask( 180 FROM_HERE, 181 base::Bind(&MsgConsumer, 182 base::Passed(&consumer_fifo), 183 msg_count, 184 &consumer_event_done)); 185 186 producer_event_done.Wait(); 187 consumer_event_done.Wait(); 188 189 producer_thread.reset(); 190 consumer_thread.reset(); 191 } 192 193 } // namespace media 194 } // namespace chromecast 195 196