Home | History | Annotate | Download | only in streams
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "config.h"
      6 #include "core/streams/ReadableStream.h"
      7 
      8 #include "bindings/core/v8/ExceptionState.h"
      9 #include "bindings/core/v8/ScriptFunction.h"
     10 #include "bindings/core/v8/ScriptPromiseResolver.h"
     11 #include "bindings/core/v8/V8Binding.h"
     12 #include "core/dom/DOMException.h"
     13 #include "core/dom/ExceptionCode.h"
     14 #include "core/dom/ExecutionContext.h"
     15 #include "core/streams/UnderlyingSource.h"
     16 
     17 namespace blink {
     18 
     19 ReadableStream::ReadableStream(ExecutionContext* executionContext, UnderlyingSource* source)
     20     : m_source(source)
     21     , m_isStarted(false)
     22     , m_isDraining(false)
     23     , m_isPulling(false)
     24     , m_isSchedulingPull(false)
     25     , m_state(Waiting)
     26     , m_wait(new WaitPromise(executionContext, this, WaitPromise::Ready))
     27     , m_closed(new ClosedPromise(executionContext, this, ClosedPromise::Closed))
     28 {
     29 }
     30 
     31 ReadableStream::~ReadableStream()
     32 {
     33 }
     34 
     35 String ReadableStream::stateString() const
     36 {
     37     switch (m_state) {
     38     case Readable:
     39         return "readable";
     40     case Waiting:
     41         return "waiting";
     42     case Closed:
     43         return "closed";
     44     case Errored:
     45         return "errored";
     46     }
     47     ASSERT(false);
     48     return String();
     49 }
     50 
     51 bool ReadableStream::enqueuePreliminaryCheck(size_t chunkSize)
     52 {
     53     if (m_state == Errored || m_state == Closed || m_isDraining)
     54         return false;
     55 
     56     // FIXME: Query strategy.
     57     return true;
     58 }
     59 
     60 bool ReadableStream::enqueuePostAction(size_t totalQueueSize)
     61 {
     62     m_isPulling = false;
     63 
     64     // FIXME: Set needsMore correctly.
     65     bool needsMore = true;
     66 
     67     if (m_state == Waiting) {
     68         m_state = Readable;
     69         m_wait->resolve(V8UndefinedType());
     70     }
     71 
     72     return needsMore;
     73 }
     74 
     75 void ReadableStream::close()
     76 {
     77     if (m_state == Waiting) {
     78         m_wait->resolve(V8UndefinedType());
     79         m_closed->resolve(V8UndefinedType());
     80         m_state = Closed;
     81     } else if (m_state == Readable) {
     82         m_isDraining = true;
     83     }
     84 }
     85 
     86 void ReadableStream::readPreliminaryCheck(ExceptionState& exceptionState)
     87 {
     88     if (m_state == Waiting) {
     89         exceptionState.throwTypeError("read is called while state is waiting");
     90         return;
     91     }
     92     if (m_state == Closed) {
     93         exceptionState.throwTypeError("read is called while state is closed");
     94         return;
     95     }
     96     if (m_state == Errored) {
     97         exceptionState.throwDOMException(m_exception->code(), m_exception->message());
     98         return;
     99     }
    100 }
    101 
    102 void ReadableStream::readPostAction()
    103 {
    104     ASSERT(m_state == Readable);
    105     if (isQueueEmpty()) {
    106         if (m_isDraining) {
    107             m_state = Closed;
    108             m_wait->reset();
    109             m_wait->resolve(V8UndefinedType());
    110             m_closed->resolve(V8UndefinedType());
    111         } else {
    112             m_state = Waiting;
    113             m_wait->reset();
    114             callOrSchedulePull();
    115         }
    116     }
    117 }
    118 
    119 ScriptPromise ReadableStream::wait(ScriptState* scriptState)
    120 {
    121     if (m_state == Waiting)
    122         callOrSchedulePull();
    123     return m_wait->promise(scriptState->world());
    124 }
    125 
    126 ScriptPromise ReadableStream::cancel(ScriptState* scriptState, ScriptValue reason)
    127 {
    128     if (m_state == Errored) {
    129         RefPtr<ScriptPromiseResolver> resolver = ScriptPromiseResolver::create(scriptState);
    130         ScriptPromise promise = resolver->promise();
    131         resolver->reject(m_exception);
    132         return promise;
    133     }
    134     if (m_state == Closed)
    135         return ScriptPromise::cast(scriptState, v8::Undefined(scriptState->isolate()));
    136 
    137     if (m_state == Waiting) {
    138         m_wait->resolve(V8UndefinedType());
    139     } else {
    140         ASSERT(m_state == Readable);
    141         m_wait->reset();
    142         m_wait->resolve(V8UndefinedType());
    143     }
    144 
    145     clearQueue();
    146     m_state = Closed;
    147     m_closed->resolve(V8UndefinedType());
    148     return m_source->cancelSource(scriptState, reason);
    149 }
    150 
    151 ScriptPromise ReadableStream::closed(ScriptState* scriptState)
    152 {
    153     return m_closed->promise(scriptState->world());
    154 }
    155 
    156 void ReadableStream::error(PassRefPtrWillBeRawPtr<DOMException> exception)
    157 {
    158     if (m_state == Readable) {
    159         clearQueue();
    160         m_wait->reset();
    161     }
    162 
    163     if (m_state == Waiting || m_state == Readable) {
    164         m_state = Errored;
    165         m_exception = exception;
    166         if (m_wait->state() == m_wait->Pending)
    167             m_wait->reject(m_exception);
    168         m_closed->reject(m_exception);
    169     }
    170 }
    171 
    172 void ReadableStream::didSourceStart()
    173 {
    174     m_isStarted = true;
    175     if (m_isSchedulingPull)
    176         m_source->pullSource();
    177 }
    178 
    179 void ReadableStream::callOrSchedulePull()
    180 {
    181     if (m_isPulling)
    182         return;
    183     m_isPulling = true;
    184     if (m_isStarted)
    185         m_source->pullSource();
    186     else
    187         m_isSchedulingPull = true;
    188 }
    189 
    190 void ReadableStream::trace(Visitor* visitor)
    191 {
    192     visitor->trace(m_source);
    193     visitor->trace(m_wait);
    194     visitor->trace(m_closed);
    195     visitor->trace(m_exception);
    196 }
    197 
    198 } // namespace blink
    199 
    200