blob: 7e3148b396dfb9436a1c1cf93c62d60fb89fbfc3 [file] [log] [blame]
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import 'package:http2/transport.dart';
import 'status.dart';
abstract class GrpcMessage {}
class GrpcMetadata extends GrpcMessage {
final Map<String, String> metadata;
GrpcMetadata(this.metadata);
@override
String toString() => 'gRPC Metadata ($metadata)';
}
class GrpcData extends GrpcMessage {
final List<int> data;
final bool isCompressed;
GrpcData(this.data, {this.isCompressed});
@override
String toString() => 'gRPC Data (${data.length} bytes)';
}
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor() =>
new StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
if (value is GrpcData) {
if (value.isCompressed) {
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
sink.add(new GrpcData(value.data, isCompressed: false));
return;
}
}
sink.add(value);
});
class GrpcHttpEncoder extends Converter<GrpcMessage, StreamMessage> {
@override
StreamMessage convert(GrpcMessage input) {
if (input is GrpcMetadata) {
final headers = <Header>[];
input.metadata.forEach((key, value) {
headers.add(new Header(ascii.encode(key), utf8.encode(value)));
});
return new HeadersStreamMessage(headers);
} else if (input is GrpcData) {
return new DataStreamMessage(frame(input.data));
}
throw new GrpcError.internal('Unexpected message type');
}
static List<int> frame(List<int> payload) {
final payloadLength = payload.length;
final bytes = new Uint8List(payloadLength + 5);
final header = bytes.buffer.asByteData(0, 5);
header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression
header.setUint32(1, payloadLength);
bytes.setRange(5, bytes.length, payload);
return bytes;
}
}
class GrpcHttpDecoder extends Converter<StreamMessage, GrpcMessage> {
@override
GrpcMessage convert(StreamMessage input) {
final sink = new _GrpcMessageSink();
startChunkedConversion(sink)
..add(input)
..close();
return sink.message;
}
@override
Sink<StreamMessage> startChunkedConversion(Sink<GrpcMessage> sink) {
return new _GrpcMessageConversionSink(sink);
}
}
class _GrpcMessageConversionSink extends ChunkedConversionSink<StreamMessage> {
final Sink<GrpcMessage> _out;
final _dataHeader = new Uint8List(5);
Uint8List _data;
int _dataOffset = 0;
_GrpcMessageConversionSink(this._out);
void _addData(DataStreamMessage chunk) {
final chunkData = chunk.bytes;
final chunkLength = chunkData.length;
var chunkReadOffset = 0;
while (chunkReadOffset < chunkLength) {
if (_data == null) {
// Reading header.
final headerRemaining = _dataHeader.lengthInBytes - _dataOffset;
final chunkRemaining = chunkLength - chunkReadOffset;
final toCopy = min(headerRemaining, chunkRemaining);
_dataHeader.setRange(
_dataOffset, _dataOffset + toCopy, chunkData, chunkReadOffset);
_dataOffset += toCopy;
chunkReadOffset += toCopy;
if (_dataOffset == _dataHeader.lengthInBytes) {
final dataLength = _dataHeader.buffer.asByteData().getUint32(1);
// TODO(jakobr): Sanity check dataLength. Max size?
_data = new Uint8List(dataLength);
_dataOffset = 0;
}
}
if (_data != null) {
// Reading data.
final dataRemaining = _data.lengthInBytes - _dataOffset;
if (dataRemaining > 0) {
final chunkRemaining = chunkLength - chunkReadOffset;
final toCopy = min(dataRemaining, chunkRemaining);
_data.setRange(
_dataOffset, _dataOffset + toCopy, chunkData, chunkReadOffset);
_dataOffset += toCopy;
chunkReadOffset += toCopy;
}
if (_dataOffset == _data.lengthInBytes) {
_out.add(new GrpcData(_data,
isCompressed: _dataHeader.buffer.asByteData().getUint8(0) != 0));
_data = null;
_dataOffset = 0;
}
}
}
}
void _addHeaders(HeadersStreamMessage chunk) {
if (_data != null || _dataOffset != 0) {
// We were in the middle of receiving data, so receiving a header frame
// is a violation of the gRPC protocol.
throw new GrpcError.unimplemented('Received header while reading data');
}
final headers = <String, String>{};
for (var header in chunk.headers) {
// TODO(jakobr): Handle duplicate header names correctly.
headers[ascii.decode(header.name)] = ascii.decode(header.value);
}
// TODO(jakobr): Check :status, go to error mode if not 2xx.
_out.add(new GrpcMetadata(headers));
}
@override
void add(StreamMessage chunk) {
if (chunk is DataStreamMessage) {
_addData(chunk);
} else if (chunk is HeadersStreamMessage) {
_addHeaders(chunk);
} else {
// No clue what this is.
throw new GrpcError.unimplemented('Received unknown HTTP/2 frame type');
}
}
@override
void close() {
if (_data != null || _dataOffset != 0) {
throw new GrpcError.unavailable('Closed in non-idle state');
}
_out.close();
}
}
class _GrpcMessageSink extends Sink<GrpcMessage> {
GrpcMessage message;
@override
void add(GrpcMessage data) {
if (message != null) {
throw 'Too many messages received!';
}
message = data;
}
@override
void close() {
if (message == null) {
throw 'No messages received!';
}
}
}