Home | History | Annotate | Download | only in paging
      1 /*
      2  * Copyright 2018 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package androidx.paging;
     18 
     19 import androidx.annotation.NonNull;
     20 import androidx.annotation.Nullable;
     21 import androidx.arch.core.executor.ArchTaskExecutor;
     22 
     23 import java.util.concurrent.Executor;
     24 
     25 import io.reactivex.BackpressureStrategy;
     26 import io.reactivex.Flowable;
     27 import io.reactivex.Observable;
     28 import io.reactivex.ObservableEmitter;
     29 import io.reactivex.ObservableOnSubscribe;
     30 import io.reactivex.Scheduler;
     31 import io.reactivex.functions.Cancellable;
     32 import io.reactivex.schedulers.Schedulers;
     33 
     34 /**
     35  * Builder for {@code Observable<PagedList>} or {@code Flowable<PagedList>}, given a
     36  * {@link DataSource.Factory} and a {@link PagedList.Config}.
     37  * <p>
     38  * The required parameters are in the constructor, so you can simply construct and build, or
     39  * optionally enable extra features (such as initial load key, or BoundaryCallback).
     40  * <p>
     41  * The returned observable/flowable will already be subscribed on the
     42  * {@link #setFetchScheduler(Scheduler)}, and will perform all loading on that scheduler. It will
     43  * already be observed on {@link #setNotifyScheduler(Scheduler)}, and will dispatch new PagedLists,
     44  * as well as their updates to that scheduler.
     45  *
     46  * @param  Type of input valued used to load data from the DataSource. Must be integer if
     47  *             you're using PositionalDataSource.
     48  * @param  Item type being presented.
     49  */
     50 public final class RxPagedListBuilder<Key, Value> {
     51     private Key mInitialLoadKey;
     52     private PagedList.Config mConfig;
     53     private DataSource.Factory<Key, Value> mDataSourceFactory;
     54     private PagedList.BoundaryCallback mBoundaryCallback;
     55     private Executor mNotifyExecutor;
     56     private Executor mFetchExecutor;
     57     private Scheduler mFetchScheduler;
     58     private Scheduler mNotifyScheduler;
     59 
     60     /**
     61      * Creates a RxPagedListBuilder with required parameters.
     62      *
     63      * @param dataSourceFactory DataSource factory providing DataSource generations.
     64      * @param config Paging configuration.
     65      */
     66     public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory,
     67             @NonNull PagedList.Config config) {
     68         //noinspection ConstantConditions
     69         if (config == null) {
     70             throw new IllegalArgumentException("PagedList.Config must be provided");
     71         }
     72         //noinspection ConstantConditions
     73         if (dataSourceFactory == null) {
     74             throw new IllegalArgumentException("DataSource.Factory must be provided");
     75         }
     76         mDataSourceFactory = dataSourceFactory;
     77         mConfig = config;
     78     }
     79 
     80     /**
     81      * Creates a RxPagedListBuilder with required parameters.
     82      * <p>
     83      * This method is a convenience for:
     84      * <pre>
     85      * RxPagedListBuilder(dataSourceFactory,
     86      *         new PagedList.Config.Builder().setPageSize(pageSize).build())
     87      * </pre>
     88      *
     89      * @param dataSourceFactory DataSource.Factory providing DataSource generations.
     90      * @param pageSize Size of pages to load.
     91      */
     92     @SuppressWarnings("unused")
     93     public RxPagedListBuilder(@NonNull DataSource.Factory<Key, Value> dataSourceFactory,
     94             int pageSize) {
     95         this(dataSourceFactory, new PagedList.Config.Builder().setPageSize(pageSize).build());
     96     }
     97 
     98     /**
     99      * First loading key passed to the first PagedList/DataSource.
    100      * <p>
    101      * When a new PagedList/DataSource pair is created after the first, it acquires a load key from
    102      * the previous generation so that data is loaded around the position already being observed.
    103      *
    104      * @param key Initial load key passed to the first PagedList/DataSource.
    105      * @return this
    106      */
    107     @SuppressWarnings("unused")
    108     @NonNull
    109     public RxPagedListBuilder<Key, Value> setInitialLoadKey(@Nullable Key key) {
    110         mInitialLoadKey = key;
    111         return this;
    112     }
    113 
    114     /**
    115      * Sets a {@link PagedList.BoundaryCallback} on each PagedList created, typically used to load
    116      * additional data from network when paging from local storage.
    117      * <p>
    118      * Pass a BoundaryCallback to listen to when the PagedList runs out of data to load. If this
    119      * method is not called, or {@code null} is passed, you will not be notified when each
    120      * DataSource runs out of data to provide to its PagedList.
    121      * <p>
    122      * If you are paging from a DataSource.Factory backed by local storage, you can set a
    123      * BoundaryCallback to know when there is no more information to page from local storage.
    124      * This is useful to page from the network when local storage is a cache of network data.
    125      * <p>
    126      * Note that when using a BoundaryCallback with a {@code Observable<PagedList>}, method calls
    127      * on the callback may be dispatched multiple times - one for each PagedList/DataSource
    128      * pair. If loading network data from a BoundaryCallback, you should prevent multiple
    129      * dispatches of the same method from triggering multiple simultaneous network loads.
    130      *
    131      * @param boundaryCallback The boundary callback for listening to PagedList load state.
    132      * @return this
    133      */
    134     @SuppressWarnings("unused")
    135     @NonNull
    136     public RxPagedListBuilder<Key, Value> setBoundaryCallback(
    137             @Nullable PagedList.BoundaryCallback<Value> boundaryCallback) {
    138         mBoundaryCallback = boundaryCallback;
    139         return this;
    140     }
    141 
    142     /**
    143      * Sets scheduler which will be used for observing new PagedLists, as well as loading updates
    144      * within the PagedLists.
    145      * <p>
    146      * The built observable will be {@link Observable#observeOn(Scheduler) observed on} this
    147      * scheduler, so that the thread receiving PagedLists will also receive the internal updates to
    148      * the PagedList.
    149      *
    150      * @param scheduler Scheduler for background DataSource loading.
    151      * @return this
    152      */
    153     public RxPagedListBuilder<Key, Value> setNotifyScheduler(
    154             final @NonNull Scheduler scheduler) {
    155         mNotifyScheduler = scheduler;
    156         final Scheduler.Worker worker = scheduler.createWorker();
    157         mNotifyExecutor = new Executor() {
    158             @Override
    159             public void execute(@NonNull Runnable command) {
    160                 // We use a worker here since the page load notifications
    161                 // should not be dispatched in parallel
    162                 worker.schedule(command);
    163             }
    164         };
    165         return this;
    166     }
    167 
    168     /**
    169      * Sets scheduler which will be used for background fetching of PagedLists, as well as on-demand
    170      * fetching of pages inside.
    171      *
    172      * @param scheduler Scheduler for background DataSource loading.
    173      * @return this
    174      */
    175     @SuppressWarnings({"unused", "WeakerAccess"})
    176     @NonNull
    177     public RxPagedListBuilder<Key, Value> setFetchScheduler(
    178             final @NonNull Scheduler scheduler) {
    179         mFetchExecutor = new Executor() {
    180             @Override
    181             public void execute(@NonNull Runnable command) {
    182                 // We use scheduleDirect since the page loads that use
    183                 // executor are intentionally parallel.
    184                 scheduler.scheduleDirect(command);
    185             }
    186         };
    187         mFetchScheduler = scheduler;
    188         return this;
    189     }
    190 
    191     /**
    192      * Constructs a {@code Observable<PagedList>}.
    193      * <p>
    194      * The returned Observable will already be observed on the
    195      * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the
    196      * {@link #setFetchScheduler(Scheduler) fetch scheduler}.
    197      *
    198      * @return The Observable of PagedLists
    199      */
    200     @NonNull
    201     public Observable<PagedList<Value>> buildObservable() {
    202         if (mNotifyExecutor == null) {
    203             mNotifyExecutor = ArchTaskExecutor.getMainThreadExecutor();
    204             mNotifyScheduler = Schedulers.from(mNotifyExecutor);
    205         }
    206         if (mFetchExecutor == null) {
    207             mFetchExecutor = ArchTaskExecutor.getIOThreadExecutor();
    208             mFetchScheduler = Schedulers.from(mFetchExecutor);
    209         }
    210         return Observable.create(new PagingObservableOnSubscribe<>(
    211                 mInitialLoadKey,
    212                 mConfig,
    213                 mBoundaryCallback,
    214                 mDataSourceFactory,
    215                 mNotifyExecutor,
    216                 mFetchExecutor))
    217                         .observeOn(mNotifyScheduler)
    218                         .subscribeOn(mFetchScheduler);
    219     }
    220 
    221     /**
    222      * Constructs a {@code Flowable<PagedList>}.
    223      *
    224      * The returned Observable will already be observed on the
    225      * {@link #setNotifyScheduler(Scheduler) notify scheduler}, and subscribed on the
    226      * {@link #setFetchScheduler(Scheduler) fetch scheduler}.
    227      *
    228      * @param backpressureStrategy BackpressureStrategy for the Flowable to use.
    229      * @return The Flowable of PagedLists
    230      */
    231     @NonNull
    232     public Flowable<PagedList<Value>> buildFlowable(BackpressureStrategy backpressureStrategy) {
    233         return buildObservable()
    234                 .toFlowable(backpressureStrategy);
    235     }
    236 
    237     static class PagingObservableOnSubscribe<Key, Value>
    238             implements ObservableOnSubscribe<PagedList<Value>>, DataSource.InvalidatedCallback,
    239             Cancellable,
    240             Runnable {
    241 
    242         @Nullable
    243         private final Key mInitialLoadKey;
    244         @NonNull
    245         private final PagedList.Config mConfig;
    246         @Nullable
    247         private final PagedList.BoundaryCallback mBoundaryCallback;
    248         @NonNull
    249         private final DataSource.Factory<Key, Value> mDataSourceFactory;
    250         @NonNull
    251         private final Executor mNotifyExecutor;
    252         @NonNull
    253         private final Executor mFetchExecutor;
    254 
    255         @Nullable
    256         private PagedList<Value> mList;
    257         @Nullable
    258         private DataSource<Key, Value> mDataSource;
    259 
    260         private ObservableEmitter<PagedList<Value>> mEmitter;
    261 
    262         private PagingObservableOnSubscribe(@Nullable Key initialLoadKey,
    263                 @NonNull PagedList.Config config,
    264                 @Nullable PagedList.BoundaryCallback boundaryCallback,
    265                 @NonNull DataSource.Factory<Key, Value> dataSourceFactory,
    266                 @NonNull Executor notifyExecutor,
    267                 @NonNull Executor fetchExecutor) {
    268             mInitialLoadKey = initialLoadKey;
    269             mConfig = config;
    270             mBoundaryCallback = boundaryCallback;
    271             mDataSourceFactory = dataSourceFactory;
    272             mNotifyExecutor = notifyExecutor;
    273             mFetchExecutor = fetchExecutor;
    274         }
    275 
    276         @Override
    277         public void subscribe(ObservableEmitter<PagedList<Value>> emitter)
    278                 throws Exception {
    279             mEmitter = emitter;
    280             mEmitter.setCancellable(this);
    281 
    282             // known that subscribe is already on fetchScheduler
    283             mEmitter.onNext(createPagedList());
    284         }
    285 
    286         @Override
    287         public void cancel() throws Exception {
    288             if (mDataSource != null) {
    289                 mDataSource.removeInvalidatedCallback(this);
    290             }
    291         }
    292 
    293         @Override
    294         public void run() {
    295             // fetch data, run on fetchExecutor
    296             mEmitter.onNext(createPagedList());
    297         }
    298 
    299         @Override
    300         public void onInvalidated() {
    301             if (!mEmitter.isDisposed()) {
    302                 mFetchExecutor.execute(this);
    303             }
    304         }
    305 
    306         private PagedList<Value> createPagedList() {
    307             @Nullable Key initializeKey = mInitialLoadKey;
    308             if (mList != null) {
    309                 //noinspection unchecked
    310                 initializeKey = (Key) mList.getLastKey();
    311             }
    312 
    313             do {
    314                 if (mDataSource != null) {
    315                     mDataSource.removeInvalidatedCallback(this);
    316                 }
    317                 mDataSource = mDataSourceFactory.create();
    318                 mDataSource.addInvalidatedCallback(this);
    319 
    320                 mList = new PagedList.Builder<>(mDataSource, mConfig)
    321                         .setNotifyExecutor(mNotifyExecutor)
    322                         .setFetchExecutor(mFetchExecutor)
    323                         .setBoundaryCallback(mBoundaryCallback)
    324                         .setInitialKey(initializeKey)
    325                         .build();
    326             } while (mList.isDetached());
    327             return mList;
    328         }
    329     }
    330 }
    331