1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "config.h" 6 #include "core/streams/ReadableStream.h" 7 8 #include "bindings/core/v8/ExceptionState.h" 9 #include "bindings/core/v8/ScriptFunction.h" 10 #include "bindings/core/v8/ScriptPromiseResolver.h" 11 #include "bindings/core/v8/V8Binding.h" 12 #include "core/dom/DOMException.h" 13 #include "core/dom/ExceptionCode.h" 14 #include "core/dom/ExecutionContext.h" 15 #include "core/streams/UnderlyingSource.h" 16 17 namespace blink { 18 19 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source) 20 : m_source(source) 21 , m_isStarted(false) 22 , m_isDraining(false) 23 , m_isPulling(false) 24 , m_isSchedulingPull(false) 25 , m_state(Waiting) 26 , m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready)) 27 , m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed)) 28 { 29 } 30 31 ReadableStream::~ReadableStream() 32 { 33 } 34 35 String ReadableStream::stateString() const 36 { 37 switch (m_state) { 38 case Readable: 39 return "readable"; 40 case Waiting: 41 return "waiting"; 42 case Closed: 43 return "closed"; 44 case Errored: 45 return "errored"; 46 } 47 ASSERT(false); 48 return String(); 49 } 50 51 bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize) 52 { 53 if (m_state == Errored || m_state == Closed || m_isDraining) 54 return false; 55 56 // FIXME: Query strategy. 57 return true; 58 } 59 60 bool ReadableStream::enqueuePostAction(size_t totalQueueSize) 61 { 62 m_isPulling = false; 63 64 // FIXME: Set needsMore correctly. 65 bool needsMore = true; 66 67 if (m_state == Waiting) { 68 m_state = Readable; 69 m_wait->resolve(V8UndefinedType()); 70 } 71 72 return needsMore; 73 } 74 75 void ReadableStream::close() 76 { 77 if (m_state == Waiting) { 78 m_wait->resolve(V8UndefinedType()); 79 m_closed->resolve(V8UndefinedType()); 80 m_state = Closed; 81 } else if (m_state == Readable) { 82 m_isDraining = true; 83 } 84 } 85 86 void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState) 87 { 88 if (m_state == Waiting) { 89 exceptionState.throwTypeError("read is called while state is waiting"); 90 return; 91 } 92 if (m_state == Closed) { 93 exceptionState.throwTypeError("read is called while state is closed"); 94 return; 95 } 96 if (m_state == Errored) { 97 exceptionState.throwDOMException(m_exception->code(), m_exception->message()); 98 return; 99 } 100 } 101 102 void ReadableStream::readPostAction() 103 { 104 ASSERT(m_state == Readable); 105 if (isQueueEmpty()) { 106 if (m_isDraining) { 107 m_state = Closed; 108 m_wait->reset(); 109 m_wait->resolve(V8UndefinedType()); 110 m_closed->resolve(V8UndefinedType()); 111 } else { 112 m_state = Waiting; 113 m_wait->reset(); 114 callOrSchedulePull(); 115 } 116 } 117 } 118 119 ScriptPromise ReadableStream::wait(ScriptState* scriptState) 120 { 121 if (m_state == Waiting) 122 callOrSchedulePull(); 123 return m_wait->promise(scriptState->world()); 124 } 125 126 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reason) 127 { 128 if (m_state == Errored) { 129 RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState); 130 ScriptPromise promise = resolver->promise(); 131 resolver->reject(m_exception); 132 return promise; 133 } 134 if (m_state == Closed) 135 return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate())); 136 137 if (m_state == Waiting) { 138 m_wait->resolve(V8UndefinedType()); 139 } else { 140 ASSERT(m_state == Readable); 141 m_wait->reset(); 142 m_wait->resolve(V8UndefinedType()); 143 } 144 145 clearQueue(); 146 m_state = Closed; 147 m_closed->resolve(V8UndefinedType()); 148 return m_source->cancelSource(scriptState, reason); 149 } 150 151 ScriptPromise ReadableStream::closed(ScriptState* scriptState) 152 { 153 return m_closed->promise(scriptState->world()); 154 } 155 156 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception) 157 { 158 if (m_state == Readable) { 159 clearQueue(); 160 m_wait->reset(); 161 } 162 163 if (m_state == Waiting || m_state == Readable) { 164 m_state = Errored; 165 m_exception = exception; 166 if (m_wait->state() == m_wait->Pending) 167 m_wait->reject(m_exception); 168 m_closed->reject(m_exception); 169 } 170 } 171 172 void ReadableStream::didSourceStart() 173 { 174 m_isStarted = true; 175 if (m_isSchedulingPull) 176 m_source->pullSource(); 177 } 178 179 void ReadableStream::callOrSchedulePull() 180 { 181 if (m_isPulling) 182 return; 183 m_isPulling = true; 184 if (m_isStarted) 185 m_source->pullSource(); 186 else 187 m_isSchedulingPull = true; 188 } 189 190 void ReadableStream::trace(Visitor* visitor) 191 { 192 visitor->trace(m_source); 193 visitor->trace(m_wait); 194 visitor->trace(m_closed); 195 visitor->trace(m_exception); 196 } 197 198 } // namespace blink 199 200