blob: e564f51ef58cacba428d00a04d293a34cf2c9b37 [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// A single page of paged results from a query.
///
/// Use `next` to move to the next page. If this is the last page `next`
/// completes with `null`
abstract class Page<T> {
/// The items in this page.
List<T> get items;
/// Whether this is the last page of results.
bool get isLast;
/// Move to the next page.
///
/// The future returned completes with the next page or results.
///
/// Throws if [next] is called on the last page.
Future<Page<T>> next({int pageSize});
}
typedef FirstPageProvider<T> = Future<Page<T>> Function(int pageSize);
/// Helper class to turn a series of pages into a stream.
class StreamFromPages<T> {
static const int _pageSize = 50;
final FirstPageProvider<T> _firstPageProvider;
bool _pendingRequest = false;
bool _paused = false;
bool _cancelled = false;
late Page<T> _currentPage;
late final StreamController<T> _controller;
StreamFromPages(this._firstPageProvider) {
_controller = StreamController<T>(
sync: true,
onListen: _onListen,
onPause: _onPause,
onResume: _onResume,
onCancel: _onCancel);
}
Stream<T> get stream => _controller.stream;
void _handleError(Object e, StackTrace s) {
_controller.addError(e, s);
_controller.close();
}
void _handlePage(Page<T> page) {
if (_cancelled) return;
_pendingRequest = false;
_currentPage = page;
page.items.forEach(_controller.add);
if (page.isLast) {
_controller.close();
} else if (!_paused && !_cancelled) {
page.next().then(_handlePage, onError: _handleError);
}
}
void _onListen() {
var pageSize = _pageSize;
_pendingRequest = true;
_firstPageProvider(pageSize).then(_handlePage, onError: _handleError);
}
void _onPause() {
_paused = true;
}
void _onResume() {
_paused = false;
if (_pendingRequest) return;
_pendingRequest = true;
_currentPage.next().then(_handlePage, onError: _handleError);
}
void _onCancel() {
_cancelled = true;
}
}