blob: 47d42e1cb1b6284140f1d9ff942961be61dcb893 [file] [log] [blame]
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A file-like object that encrypts data written to it.
It writes the ciphertext to a given other file-like object, which can later be
decrypted and read using a DecryptingStream wrapper.
"""
import io
from typing import BinaryIO, Optional
from tink import core
from tink.cc.pybind import tink_bindings
from tink.streaming_aead import _file_object_adapter
@core.use_tink_errors
def _new_cc_encrypting_stream(cc_primitive, aad, destination):
"""Implemented as a separate function to ensure correct error transform."""
return tink_bindings.new_cc_encrypting_stream(
cc_primitive, aad, destination)
class RawEncryptingStream(io.RawIOBase):
"""A file-like object which wraps writes to an underlying file-like object.
It encrypts any data written to it, and writes the ciphertext to the wrapped
object.
The close() method indicates that the message is complete, and will write a
final ciphertext block to signify end of message.
"""
def __init__(self, stream_aead: tink_bindings.StreamingAead,
ciphertext_destination: BinaryIO, associated_data: bytes):
"""Create a new RawEncryptingStream.
Args:
stream_aead: C++ StreamingAead primitive from which a C++ EncryptingStream
will be obtained.
ciphertext_destination: A writable file-like object to which ciphertext
bytes will be written.
associated_data: The associated data to use for encryption. This must
match the associated_data used for decryption.
"""
super().__init__()
if not ciphertext_destination.writable():
raise ValueError('ciphertext_destination must be writable')
cc_ciphertext_destination = _file_object_adapter.FileObjectAdapter(
ciphertext_destination)
self._cc_encrypting_stream = _new_cc_encrypting_stream(
stream_aead, associated_data, cc_ciphertext_destination)
@core.use_tink_errors
def _write_to_cc_encrypting_stream(self, b: bytes) -> int:
return self._cc_encrypting_stream.write(bytes(b))
@core.use_tink_errors
def _close_cc_encrypting_stream(self) -> None:
self._cc_encrypting_stream.close()
def readinto(self, b: bytearray) -> Optional[int]:
raise io.UnsupportedOperation()
def write(self, b: bytes) -> int:
"""Write the given buffer to the IO stream.
Args:
b: The buffer to write.
Returns:
The number of bytes written, which may be less than the length of b in
bytes.
Raises:
TinkError: if there was a permanent error.
"""
if self.closed: # pylint:disable=using-constant-test
raise ValueError('write on closed file')
if not isinstance(b, (bytes, memoryview, bytearray)):
raise TypeError('a bytes-like object is required, not {}'.format(
type(b).__name__))
written = self._write_to_cc_encrypting_stream(b)
if written < 0 or written > len(b):
raise core.TinkError('Incorrect number of bytes written')
return written
def close(self) -> None:
"""Flush and close the stream. Has no effect on a closed stream."""
if self.closed: # pylint:disable=using-constant-test
return
self.flush()
self._close_cc_encrypting_stream()
super().close()
def writable(self) -> bool:
"""Return True if the stream supports writing."""
return True