| // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:collection'; |
| import 'dart:convert'; |
| import 'dart:io'; |
| import 'dart:math' show min; |
| |
| import 'package:bazel_worker/driver.dart'; |
| import 'package:build/build.dart'; |
| import 'package:path/path.dart' as p; |
| import 'package:pedantic/pedantic.dart'; |
| |
| import 'scratch_space.dart'; |
| |
| final sdkDir = p.dirname(p.dirname(Platform.resolvedExecutable)); |
| |
| // If no terminal is attached, prevent a new one from launching. |
| final _processMode = stdin.hasTerminal |
| ? ProcessStartMode.normal |
| : ProcessStartMode.detachedWithStdio; |
| |
| /// Completes once the analyzer workers have been shut down. |
| Future<Null> get analyzerWorkersAreDone => |
| _analyzerWorkersAreDoneCompleter?.future ?? Future.value(null); |
| Completer<Null> _analyzerWorkersAreDoneCompleter; |
| |
| /// Completes once the dartdevc workers have been shut down. |
| Future<Null> get dartdevcWorkersAreDone => |
| _dartdevcWorkersAreDoneCompleter?.future ?? Future.value(null); |
| Completer<Null> _dartdevcWorkersAreDoneCompleter; |
| |
| /// Completes once the dartdevk workers have been shut down. |
| Future<Null> get dartdevkWorkersAreDone => |
| _dartdevkWorkersAreDoneCompleter?.future ?? Future.value(null); |
| Completer<Null> _dartdevkWorkersAreDoneCompleter; |
| |
| /// Completes once the dart2js workers have been shut down. |
| Future<Null> get dart2jsWorkersAreDone => |
| _dart2jsWorkersAreDoneCompleter?.future ?? Future.value(null); |
| Completer<Null> _dart2jsWorkersAreDoneCompleter; |
| |
| /// Completes once the common frontend workers have been shut down. |
| Future<Null> get frontendWorkersAreDone => |
| _frontendWorkersAreDoneCompleter?.future ?? Future.value(null); |
| Completer<Null> _frontendWorkersAreDoneCompleter; |
| |
| final int _defaultMaxWorkers = min((Platform.numberOfProcessors / 2).ceil(), 4); |
| |
| const _maxWorkersEnvVar = 'BUILD_MAX_WORKERS_PER_TASK'; |
| |
| final int _maxWorkersPerTask = () { |
| var toParse = |
| Platform.environment[_maxWorkersEnvVar] ?? '$_defaultMaxWorkers'; |
| var parsed = int.tryParse(toParse); |
| if (parsed == null) { |
| log.warning('Invalid value for $_maxWorkersEnvVar environment variable, ' |
| 'expected an int but got `$toParse`. Falling back to default value ' |
| 'of $_defaultMaxWorkers.'); |
| return _defaultMaxWorkers; |
| } |
| return parsed; |
| }(); |
| |
| /// Manages a shared set of persistent analyzer workers. |
| BazelWorkerDriver get _analyzerDriver { |
| _analyzerWorkersAreDoneCompleter ??= Completer<Null>(); |
| return __analyzerDriver ??= BazelWorkerDriver( |
| () => Process.start( |
| p.join(sdkDir, 'bin', 'dart'), |
| [ |
| p.join(sdkDir, 'bin', 'snapshots', 'dartanalyzer.dart.snapshot'), |
| '--dart-sdk=$sdkDir', |
| '--build-mode', |
| '--persistent_worker' |
| ], |
| mode: _processMode, |
| workingDirectory: scratchSpace.tempDir.path), |
| maxWorkers: _maxWorkersPerTask); |
| } |
| |
| BazelWorkerDriver __analyzerDriver; |
| |
| /// Resource for fetching the current [BazelWorkerDriver] for dartanalyzer. |
| final analyzerDriverResource = |
| Resource<BazelWorkerDriver>(() => _analyzerDriver, beforeExit: () async { |
| await _analyzerDriver?.terminateWorkers(); |
| _analyzerWorkersAreDoneCompleter.complete(); |
| _analyzerWorkersAreDoneCompleter = null; |
| __analyzerDriver = null; |
| }); |
| |
| /// Manages a shared set of persistent dartdevc workers. |
| BazelWorkerDriver get _dartdevcDriver { |
| _dartdevcWorkersAreDoneCompleter ??= Completer<Null>(); |
| return __dartdevcDriver ??= BazelWorkerDriver( |
| () => Process.start( |
| p.join(sdkDir, 'bin', 'dart'), |
| [ |
| p.join(sdkDir, 'bin', 'snapshots', 'dartdevc.dart.snapshot'), |
| '--dart-sdk=$sdkDir', |
| '--persistent_worker' |
| ], |
| mode: _processMode, |
| workingDirectory: scratchSpace.tempDir.path), |
| maxWorkers: _maxWorkersPerTask); |
| } |
| |
| BazelWorkerDriver __dartdevcDriver; |
| |
| /// Resource for fetching the current [BazelWorkerDriver] for dartdevc. |
| final dartdevcDriverResource = |
| Resource<BazelWorkerDriver>(() => _dartdevcDriver, beforeExit: () async { |
| await _dartdevcDriver?.terminateWorkers(); |
| _dartdevcWorkersAreDoneCompleter.complete(); |
| _dartdevcWorkersAreDoneCompleter = null; |
| __dartdevcDriver = null; |
| }); |
| |
| /// Manages a shared set of persistent dartdevk workers. |
| BazelWorkerDriver get _dartdevkDriver { |
| _dartdevkWorkersAreDoneCompleter ??= Completer<Null>(); |
| return __dartdevkDriver ??= BazelWorkerDriver( |
| () => Process.start( |
| p.join(sdkDir, 'bin', 'dart'), |
| [ |
| p.join(sdkDir, 'bin', 'snapshots', 'dartdevc.dart.snapshot'), |
| '--kernel', |
| '--persistent_worker' |
| ], |
| mode: _processMode, |
| workingDirectory: scratchSpace.tempDir.path), |
| maxWorkers: _maxWorkersPerTask); |
| } |
| |
| BazelWorkerDriver __dartdevkDriver; |
| |
| /// Resource for fetching the current [BazelWorkerDriver] for dartdevk. |
| final dartdevkDriverResource = |
| Resource<BazelWorkerDriver>(() => _dartdevkDriver, beforeExit: () async { |
| await _dartdevkDriver?.terminateWorkers(); |
| _dartdevkWorkersAreDoneCompleter.complete(); |
| _dartdevkWorkersAreDoneCompleter = null; |
| __dartdevkDriver = null; |
| }); |
| |
| /// Manages a shared set of persistent common frontend workers. |
| BazelWorkerDriver get _frontendDriver { |
| _frontendWorkersAreDoneCompleter ??= Completer<Null>(); |
| return __frontendDriver ??= BazelWorkerDriver( |
| () => Process.start( |
| p.join(sdkDir, 'bin', 'dart'), |
| [ |
| p.join(sdkDir, 'bin', 'snapshots', 'kernel_worker.dart.snapshot'), |
| '--persistent_worker' |
| ], |
| mode: _processMode, |
| workingDirectory: scratchSpace.tempDir.path), |
| maxWorkers: _maxWorkersPerTask); |
| } |
| |
| BazelWorkerDriver __frontendDriver; |
| |
| /// Resource for fetching the current [BazelWorkerDriver] for common frontend. |
| final frontendDriverResource = |
| Resource<BazelWorkerDriver>(() => _frontendDriver, beforeExit: () async { |
| await _frontendDriver?.terminateWorkers(); |
| _frontendWorkersAreDoneCompleter.complete(); |
| _frontendWorkersAreDoneCompleter = null; |
| __frontendDriver = null; |
| }); |
| |
| /// Manages a shared set of persistent dart2js workers. |
| Dart2JsBatchWorkerPool get _dart2jsWorkerPool { |
| _dart2jsWorkersAreDoneCompleter ??= Completer<Null>(); |
| var librariesSpec = p.joinAll([sdkDir, 'lib', 'libraries.json']); |
| return __dart2jsWorkerPool ??= Dart2JsBatchWorkerPool(() => Process.start( |
| p.join(sdkDir, 'bin', 'dart'), |
| [ |
| p.join(sdkDir, 'bin', 'snapshots', 'dart2js.dart.snapshot'), |
| '--libraries-spec=$librariesSpec', |
| '--batch', |
| ], |
| mode: _processMode, |
| workingDirectory: scratchSpace.tempDir.path)); |
| } |
| |
| Dart2JsBatchWorkerPool __dart2jsWorkerPool; |
| |
| /// Resource for fetching the current [Dart2JsBatchWorkerPool] for dart2js. |
| final dart2JsWorkerResource = Resource<Dart2JsBatchWorkerPool>( |
| () => _dart2jsWorkerPool, beforeExit: () async { |
| await _dart2jsWorkerPool.terminateWorkers(); |
| _dart2jsWorkersAreDoneCompleter.complete(); |
| _dart2jsWorkersAreDoneCompleter = null; |
| }); |
| |
| /// Manages a pool of persistent [_Dart2JsWorker]s running in batch mode and |
| /// schedules jobs among them. |
| class Dart2JsBatchWorkerPool { |
| final Future<Process> Function() _spawnWorker; |
| |
| final _workQueue = Queue<_Dart2JsJob>(); |
| |
| bool _queueIsActive = false; |
| |
| final _availableWorkers = Queue<_Dart2JsWorker>(); |
| |
| final _allWorkers = <_Dart2JsWorker>[]; |
| |
| Dart2JsBatchWorkerPool(this._spawnWorker); |
| |
| Future<Dart2JsResult> compile(List<String> args) async { |
| var job = _Dart2JsJob(args); |
| _workQueue.add(job); |
| if (!_queueIsActive) _startWorkQueue(); |
| return job.result; |
| } |
| |
| void _startWorkQueue() { |
| assert(!_queueIsActive); |
| _queueIsActive = true; |
| () async { |
| while (_workQueue.isNotEmpty) { |
| _Dart2JsWorker worker; |
| if (_availableWorkers.isEmpty && |
| _allWorkers.length < _maxWorkersPerTask) { |
| worker = _Dart2JsWorker(_spawnWorker); |
| _allWorkers.add(worker); |
| } |
| |
| _Dart2JsWorker nextWorker() => _availableWorkers.isNotEmpty |
| ? _availableWorkers.removeFirst() |
| : null; |
| |
| worker ??= nextWorker(); |
| while (worker == null) { |
| // TODO: something smarter here? in practice this seems to work |
| // reasonably well though and simplifies things a lot ¯\_(ツ)_/¯. |
| await Future.delayed(Duration(seconds: 1)); |
| worker = nextWorker(); |
| } |
| unawaited(worker |
| .doJob(_workQueue.removeFirst()) |
| .whenComplete(() => _availableWorkers.add(worker))); |
| } |
| _queueIsActive = false; |
| }(); |
| } |
| |
| Future<Null> terminateWorkers() async { |
| var allWorkers = _allWorkers.toList(); |
| _allWorkers.clear(); |
| _availableWorkers.clear(); |
| await Future.wait(allWorkers.map((w) => w.terminate())); |
| } |
| } |
| |
| /// A single dart2js worker process running in batch mode. |
| /// |
| /// This may actually spawn multiple processes over time, if a running worker |
| /// dies or it decides that it should be restarted for some reason. |
| class _Dart2JsWorker { |
| final Future<Process> Function() _spawnWorker; |
| |
| int _jobsSinceLastRestartCount = 0; |
| static const int _jobsBeforeRestartMax = 5; |
| static const int _retryCountMax = 2; |
| |
| Stream<String> __workerStderrLines; |
| Stream<String> get _workerStderrLines { |
| assert(__worker != null); |
| return __workerStderrLines ??= __worker.stderr |
| .transform(utf8.decoder) |
| .transform(const LineSplitter()) |
| .asBroadcastStream(); |
| } |
| |
| Stream<String> __workerStdoutLines; |
| Stream<String> get _workerStdoutLines { |
| assert(__worker != null); |
| return __workerStdoutLines ??= __worker.stdout |
| .transform(utf8.decoder) |
| .transform(const LineSplitter()) |
| .asBroadcastStream(); |
| } |
| |
| Process __worker; |
| Future<Process> _spawningWorker; |
| Future<Process> get _worker { |
| if (__worker != null) return Future.value(__worker); |
| return _spawningWorker ??= () async { |
| if (__worker == null) { |
| _jobsSinceLastRestartCount = 0; |
| __worker ??= await _spawnWorker(); |
| _spawningWorker = null; |
| // exitCode can be null: https://github.com/dart-lang/sdk/issues/35874 |
| unawaited(__worker.exitCode?.then((_) { |
| __worker = null; |
| __workerStdoutLines = null; |
| __workerStderrLines = null; |
| _currentJobResult |
| ?.completeError('Dart2js exited with an unknown error'); |
| })); |
| } |
| return __worker; |
| }(); |
| } |
| |
| Completer<Dart2JsResult> _currentJobResult; |
| |
| _Dart2JsWorker(this._spawnWorker); |
| |
| /// Performs [job], gracefully handling worker failures by retrying |
| /// [_retryCountMax] times and restarting the worker between jobs based on |
| /// [_jobsBeforeRestartMax] to limit memory consumption. |
| /// |
| /// Only one job may be performed at a time. |
| Future<void> doJob(_Dart2JsJob job) async { |
| assert(_currentJobResult == null); |
| var tryCount = 0; |
| var succeeded = false; |
| while (tryCount < _retryCountMax && !succeeded) { |
| tryCount++; |
| _jobsSinceLastRestartCount++; |
| var worker = await _worker; |
| var output = StringBuffer(); |
| _currentJobResult = Completer<Dart2JsResult>(); |
| var sawError = false; |
| var stderrListener = _workerStderrLines.listen((line) { |
| if (line == '>>> EOF STDERR') { |
| _currentJobResult?.complete( |
| Dart2JsResult(!sawError, 'Dart2Js finished with:\n\n$output')); |
| } |
| if (!line.startsWith('>>> ')) { |
| output.writeln(line); |
| } |
| }); |
| var stdoutListener = _workerStdoutLines.listen((line) { |
| if (line.contains('>>> TEST FAIL')) { |
| sawError = true; |
| } |
| if (!line.startsWith('>>> ')) { |
| output.writeln(line); |
| } |
| }); |
| |
| log.info('Running dart2js with ${job.args.join(' ')}\n'); |
| worker.stdin.writeln(job.args.join(' ')); |
| |
| Dart2JsResult result; |
| try { |
| result = await _currentJobResult.future; |
| job.resultCompleter.complete(result); |
| succeeded = true; |
| } catch (e) { |
| log.warning('Dart2Js failure: $e'); |
| succeeded = false; |
| if (tryCount >= _retryCountMax) { |
| job.resultCompleter.complete(_currentJobResult.future); |
| } |
| } finally { |
| _currentJobResult = null; |
| // TODO: Remove this hack once dart-lang/sdk#33708 is resolved. |
| if (_jobsSinceLastRestartCount >= _jobsBeforeRestartMax) { |
| await terminate(); |
| } |
| await stderrListener.cancel(); |
| await stdoutListener.cancel(); |
| } |
| } |
| } |
| |
| Future<void> terminate() async { |
| var worker = __worker ?? await _spawningWorker; |
| __worker = null; |
| __workerStdoutLines = null; |
| __workerStderrLines = null; |
| if (worker != null) { |
| worker.kill(); |
| await worker.stdin.close(); |
| } |
| await worker?.exitCode; |
| } |
| } |
| |
| /// A single dart2js job, consisting of [args] and a [result]. |
| class _Dart2JsJob { |
| final List<String> args; |
| |
| final resultCompleter = Completer<Dart2JsResult>(); |
| Future<Dart2JsResult> get result => resultCompleter.future; |
| |
| _Dart2JsJob(this.args); |
| } |
| |
| /// The result of a [_Dart2JsJob] |
| class Dart2JsResult { |
| final bool succeeded; |
| final String output; |
| |
| Dart2JsResult(this.succeeded, this.output); |
| } |