Create Streaming AEAD Key Manager.
PiperOrigin-RevId: 270006223
diff --git a/python/cc/clif/cc_key_manager.clif b/python/cc/clif/cc_key_manager.clif
index 01fae03..8036715 100644
--- a/python/cc/clif/cc_key_manager.clif
+++ b/python/cc/clif/cc_key_manager.clif
@@ -17,6 +17,7 @@
from "tink/cc/python/aead.h" import * # Aead
from "tink/cc/python/deterministic_aead.h" import * # DeterministicAead
from "tink/cc/python/hybrid_decrypt.h" import * # HybridDecrypt
+from "tink/cc/python/streaming_aead.h" import * # StreamingAead
from "tink/cc/python/hybrid_encrypt.h" import * # HybridEncrypt
from "tink/cc/python/mac.h" import * # Mac
from "tink/cc/python/public_key_sign.h" import * # PublicKeySign
@@ -24,6 +25,7 @@
from tink.cc.python.aead import Aead
from tink.cc.python.deterministic_aead import DeterministicAead
+from tink.cc.python.streaming_aead import StreamingAead
from tink.cc.python.hybrid_decrypt import HybridDecrypt
from tink.cc.python.hybrid_encrypt import HybridEncrypt
from tink.cc.python.mac import Mac
@@ -54,6 +56,18 @@
def `NewKeyData` as new_key_data(self, key_template:KeyTemplate)
-> StatusOr<KeyData>
+ # Key Manager for Streaming AEAD.
+ class `CcKeyManager<StreamingAead>` as StreamingAeadKeyManager:
+ @classmethod
+ def `GetFromCcRegistry`as from_cc_registry(cls, type_url: str)
+ -> StatusOr<StreamingAeadKeyManager>
+
+ def `GetPrimitive` as primitive(self, key_data:KeyData)
+ -> StatusOr<StreamingAead>
+ def `KeyType` as key_type(self) -> str
+ def `NewKeyData` as new_key_data(self, key_template:KeyTemplate)
+ -> StatusOr<KeyData>
+
# Key Manager for HybridDecrypt.
class `CcKeyManager<HybridDecrypt>` as HybridDecryptKeyManager:
@classmethod
diff --git a/python/streaming_aead/streaming_aead_key_manager.py b/python/streaming_aead/streaming_aead_key_manager.py
new file mode 100644
index 0000000..c40cdea
--- /dev/null
+++ b/python/streaming_aead/streaming_aead_key_manager.py
@@ -0,0 +1,54 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Python wrapper of the CLIF-wrapped C++ Streaming AEAD key manager."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import google_type_annotations
+from __future__ import print_function
+
+import typing
+from typing import Text, BinaryIO
+
+from tink.python.cc.clif import cc_key_manager
+from tink.python.core import key_manager
+from tink.python.core import tink_error
+from tink.python.streaming_aead import encrypting_stream
+from tink.python.streaming_aead import streaming_aead
+
+
+class _StreamingAeadCcToPyWrapper(streaming_aead.StreamingAead):
+ """Transforms cliffed C++ StreamingAead into a Python primitive."""
+
+ def __init__(self, cc_streaming_aead):
+ self._streaming_aead = cc_streaming_aead
+
+ @tink_error.use_tink_errors
+ def new_encrypting_stream(self, ciphertext_destination: BinaryIO,
+ associated_data: bytes) -> BinaryIO:
+ stream = encrypting_stream.EncryptingStream(self._streaming_aead,
+ ciphertext_destination,
+ associated_data)
+ return typing.cast(BinaryIO, stream)
+
+ @tink_error.use_tink_errors
+ def new_decrypting_stream(self, ciphertext_source: BinaryIO,
+ associated_data: bytes) -> BinaryIO:
+ # TODO(tink-dev) implement DecryptingStream
+ return typing.cast(BinaryIO, None)
+
+
+def from_cc_registry(
+ type_url: Text) -> key_manager.KeyManager[streaming_aead.StreamingAead]:
+ return key_manager.KeyManagerCcToPyWrapper(
+ cc_key_manager.StreamingAeadKeyManager.from_cc_registry(type_url),
+ streaming_aead.StreamingAead, _StreamingAeadCcToPyWrapper)
diff --git a/python/streaming_aead/streaming_aead_key_manager_test.py b/python/streaming_aead/streaming_aead_key_manager_test.py
new file mode 100644
index 0000000..a2a5121
--- /dev/null
+++ b/python/streaming_aead/streaming_aead_key_manager_test.py
@@ -0,0 +1,108 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests for tink.python.streaming_aead_key_manager."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+from absl.testing import absltest
+from tink.proto import aes_ctr_hmac_streaming_pb2
+from tink.proto import aes_gcm_hkdf_streaming_pb2
+from tink.proto import common_pb2
+from tink.proto import tink_pb2
+from tink.python import tink_config
+from tink.python.core import tink_error
+from tink.python.streaming_aead import streaming_aead
+from tink.python.streaming_aead import streaming_aead_key_manager
+from tink.python.streaming_aead import streaming_aead_key_templates
+
+
+def setUpModule():
+ tink_config.register()
+
+
+class StreamingAeadKeyManagerTest(absltest.TestCase):
+
+ def setUp(self):
+ super(StreamingAeadKeyManagerTest, self).setUp()
+ self.key_manager_gcm = streaming_aead_key_manager.from_cc_registry(
+ 'type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey')
+ self.key_manager_ctr = streaming_aead_key_manager.from_cc_registry(
+ 'type.googleapis.com/google.crypto.tink.AesCtrHmacStreamingKey')
+
+ def test_primitive_class(self):
+ self.assertEqual(self.key_manager_gcm.primitive_class(),
+ streaming_aead.StreamingAead)
+ self.assertEqual(self.key_manager_ctr.primitive_class(),
+ streaming_aead.StreamingAead)
+
+ def test_key_type(self):
+ self.assertEqual(
+ self.key_manager_gcm.key_type(),
+ 'type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey')
+ self.assertEqual(
+ self.key_manager_ctr.key_type(),
+ 'type.googleapis.com/google.crypto.tink.AesCtrHmacStreamingKey')
+
+ def test_new_key_data(self):
+ # AES GCM HKDF
+ key_template = streaming_aead_key_templates.AES128_GCM_HKDF_4KB
+ key_data = self.key_manager_gcm.new_key_data(key_template)
+ self.assertEqual(key_data.type_url, self.key_manager_gcm.key_type())
+ self.assertEqual(key_data.key_material_type, tink_pb2.KeyData.SYMMETRIC)
+ key = aes_gcm_hkdf_streaming_pb2.AesGcmHkdfStreamingKey()
+ key.ParseFromString(key_data.value)
+ self.assertEqual(key.version, 0)
+ self.assertLen(key.key_value, 16)
+ self.assertEqual(key.params.hkdf_hash_type, common_pb2.HashType.SHA256)
+ self.assertEqual(key.params.derived_key_size, 16)
+ self.assertEqual(key.params.ciphertext_segment_size, 4096)
+
+ # AES CTR HMAC
+ key_template = streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB
+ key_data = self.key_manager_ctr.new_key_data(key_template)
+ self.assertEqual(key_data.type_url, self.key_manager_ctr.key_type())
+ self.assertEqual(key_data.key_material_type, tink_pb2.KeyData.SYMMETRIC)
+ key = aes_ctr_hmac_streaming_pb2.AesCtrHmacStreamingKey()
+ key.ParseFromString(key_data.value)
+ self.assertEqual(key.version, 0)
+ self.assertLen(key.key_value, 16)
+ self.assertEqual(key.params.hkdf_hash_type, common_pb2.HashType.SHA256)
+ self.assertEqual(key.params.derived_key_size, 16)
+ self.assertEqual(key.params.hmac_params.hash, common_pb2.HashType.SHA256)
+ self.assertEqual(key.params.hmac_params.tag_size, 32)
+ self.assertEqual(key.params.ciphertext_segment_size, 4096)
+
+ def test_invalid_params_throw_exception(self):
+ # AES GCM HKDF
+ key_template = streaming_aead_key_templates.create_aes_gcm_hkdf_streaming_key_template(
+ 63, common_pb2.HashType.SHA1, 65, 55)
+ with self.assertRaisesRegex(tink_error.TinkError,
+ 'key_size must not be smaller than'):
+ self.key_manager_gcm.new_key_data(key_template)
+
+ # AES CTR HKDF
+ key_template = streaming_aead_key_templates.create_aes_ctr_hmac_streaming_key_template(
+ 63, common_pb2.HashType.SHA1, 65, common_pb2.HashType.SHA256, 55, 2)
+ with self.assertRaisesRegex(tink_error.TinkError,
+ 'key_size must not be smaller than'):
+ self.key_manager_ctr.new_key_data(key_template)
+
+ def test_encrypt_decrypt(self):
+ pass
+ # TODO(tanujdhir) Consider putting round-trip encryption test here once
+ # implemented
+
+
+if __name__ == '__main__':
+ absltest.main()