Create Streaming AEAD Key Manager.

PiperOrigin-RevId: 270006223
diff --git a/python/cc/clif/cc_key_manager.clif b/python/cc/clif/cc_key_manager.clif
index 01fae03..8036715 100644
--- a/python/cc/clif/cc_key_manager.clif
+++ b/python/cc/clif/cc_key_manager.clif
@@ -17,6 +17,7 @@
 from "tink/cc/python/aead.h" import *  # Aead
 from "tink/cc/python/deterministic_aead.h" import *  # DeterministicAead
 from "tink/cc/python/hybrid_decrypt.h" import *  # HybridDecrypt
+from "tink/cc/python/streaming_aead.h" import *  # StreamingAead
 from "tink/cc/python/hybrid_encrypt.h" import *  # HybridEncrypt
 from "tink/cc/python/mac.h" import *  # Mac
 from "tink/cc/python/public_key_sign.h" import *  # PublicKeySign
@@ -24,6 +25,7 @@
 
 from tink.cc.python.aead import Aead
 from tink.cc.python.deterministic_aead import DeterministicAead
+from tink.cc.python.streaming_aead import StreamingAead
 from tink.cc.python.hybrid_decrypt import HybridDecrypt
 from tink.cc.python.hybrid_encrypt import HybridEncrypt
 from tink.cc.python.mac import Mac
@@ -54,6 +56,18 @@
       def `NewKeyData` as new_key_data(self, key_template:KeyTemplate)
         -> StatusOr<KeyData>
 
+    # Key Manager for Streaming AEAD.
+    class `CcKeyManager<StreamingAead>` as StreamingAeadKeyManager:
+      @classmethod
+      def `GetFromCcRegistry`as from_cc_registry(cls, type_url: str)
+      -> StatusOr<StreamingAeadKeyManager>
+
+      def `GetPrimitive` as primitive(self, key_data:KeyData)
+        -> StatusOr<StreamingAead>
+      def `KeyType` as key_type(self) -> str
+      def `NewKeyData` as new_key_data(self, key_template:KeyTemplate)
+        -> StatusOr<KeyData>
+
     # Key Manager for HybridDecrypt.
     class `CcKeyManager<HybridDecrypt>` as HybridDecryptKeyManager:
       @classmethod
diff --git a/python/streaming_aead/streaming_aead_key_manager.py b/python/streaming_aead/streaming_aead_key_manager.py
new file mode 100644
index 0000000..c40cdea
--- /dev/null
+++ b/python/streaming_aead/streaming_aead_key_manager.py
@@ -0,0 +1,54 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Python wrapper of the CLIF-wrapped C++ Streaming AEAD key manager."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import google_type_annotations
+from __future__ import print_function
+
+import typing
+from typing import Text, BinaryIO
+
+from tink.python.cc.clif import cc_key_manager
+from tink.python.core import key_manager
+from tink.python.core import tink_error
+from tink.python.streaming_aead import encrypting_stream
+from tink.python.streaming_aead import streaming_aead
+
+
+class _StreamingAeadCcToPyWrapper(streaming_aead.StreamingAead):
+  """Transforms cliffed C++ StreamingAead into a Python primitive."""
+
+  def __init__(self, cc_streaming_aead):
+    self._streaming_aead = cc_streaming_aead
+
+  @tink_error.use_tink_errors
+  def new_encrypting_stream(self, ciphertext_destination: BinaryIO,
+                            associated_data: bytes) -> BinaryIO:
+    stream = encrypting_stream.EncryptingStream(self._streaming_aead,
+                                                ciphertext_destination,
+                                                associated_data)
+    return typing.cast(BinaryIO, stream)
+
+  @tink_error.use_tink_errors
+  def new_decrypting_stream(self, ciphertext_source: BinaryIO,
+                            associated_data: bytes) -> BinaryIO:
+    # TODO(tink-dev) implement DecryptingStream
+    return typing.cast(BinaryIO, None)
+
+
+def from_cc_registry(
+    type_url: Text) -> key_manager.KeyManager[streaming_aead.StreamingAead]:
+  return key_manager.KeyManagerCcToPyWrapper(
+      cc_key_manager.StreamingAeadKeyManager.from_cc_registry(type_url),
+      streaming_aead.StreamingAead, _StreamingAeadCcToPyWrapper)
diff --git a/python/streaming_aead/streaming_aead_key_manager_test.py b/python/streaming_aead/streaming_aead_key_manager_test.py
new file mode 100644
index 0000000..a2a5121
--- /dev/null
+++ b/python/streaming_aead/streaming_aead_key_manager_test.py
@@ -0,0 +1,108 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests for tink.python.streaming_aead_key_manager."""
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+from absl.testing import absltest
+from tink.proto import aes_ctr_hmac_streaming_pb2
+from tink.proto import aes_gcm_hkdf_streaming_pb2
+from tink.proto import common_pb2
+from tink.proto import tink_pb2
+from tink.python import tink_config
+from tink.python.core import tink_error
+from tink.python.streaming_aead import streaming_aead
+from tink.python.streaming_aead import streaming_aead_key_manager
+from tink.python.streaming_aead import streaming_aead_key_templates
+
+
+def setUpModule():
+  tink_config.register()
+
+
+class StreamingAeadKeyManagerTest(absltest.TestCase):
+
+  def setUp(self):
+    super(StreamingAeadKeyManagerTest, self).setUp()
+    self.key_manager_gcm = streaming_aead_key_manager.from_cc_registry(
+        'type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey')
+    self.key_manager_ctr = streaming_aead_key_manager.from_cc_registry(
+        'type.googleapis.com/google.crypto.tink.AesCtrHmacStreamingKey')
+
+  def test_primitive_class(self):
+    self.assertEqual(self.key_manager_gcm.primitive_class(),
+                     streaming_aead.StreamingAead)
+    self.assertEqual(self.key_manager_ctr.primitive_class(),
+                     streaming_aead.StreamingAead)
+
+  def test_key_type(self):
+    self.assertEqual(
+        self.key_manager_gcm.key_type(),
+        'type.googleapis.com/google.crypto.tink.AesGcmHkdfStreamingKey')
+    self.assertEqual(
+        self.key_manager_ctr.key_type(),
+        'type.googleapis.com/google.crypto.tink.AesCtrHmacStreamingKey')
+
+  def test_new_key_data(self):
+    # AES GCM HKDF
+    key_template = streaming_aead_key_templates.AES128_GCM_HKDF_4KB
+    key_data = self.key_manager_gcm.new_key_data(key_template)
+    self.assertEqual(key_data.type_url, self.key_manager_gcm.key_type())
+    self.assertEqual(key_data.key_material_type, tink_pb2.KeyData.SYMMETRIC)
+    key = aes_gcm_hkdf_streaming_pb2.AesGcmHkdfStreamingKey()
+    key.ParseFromString(key_data.value)
+    self.assertEqual(key.version, 0)
+    self.assertLen(key.key_value, 16)
+    self.assertEqual(key.params.hkdf_hash_type, common_pb2.HashType.SHA256)
+    self.assertEqual(key.params.derived_key_size, 16)
+    self.assertEqual(key.params.ciphertext_segment_size, 4096)
+
+    # AES CTR HMAC
+    key_template = streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB
+    key_data = self.key_manager_ctr.new_key_data(key_template)
+    self.assertEqual(key_data.type_url, self.key_manager_ctr.key_type())
+    self.assertEqual(key_data.key_material_type, tink_pb2.KeyData.SYMMETRIC)
+    key = aes_ctr_hmac_streaming_pb2.AesCtrHmacStreamingKey()
+    key.ParseFromString(key_data.value)
+    self.assertEqual(key.version, 0)
+    self.assertLen(key.key_value, 16)
+    self.assertEqual(key.params.hkdf_hash_type, common_pb2.HashType.SHA256)
+    self.assertEqual(key.params.derived_key_size, 16)
+    self.assertEqual(key.params.hmac_params.hash, common_pb2.HashType.SHA256)
+    self.assertEqual(key.params.hmac_params.tag_size, 32)
+    self.assertEqual(key.params.ciphertext_segment_size, 4096)
+
+  def test_invalid_params_throw_exception(self):
+    # AES GCM HKDF
+    key_template = streaming_aead_key_templates.create_aes_gcm_hkdf_streaming_key_template(
+        63, common_pb2.HashType.SHA1, 65, 55)
+    with self.assertRaisesRegex(tink_error.TinkError,
+                                'key_size must not be smaller than'):
+      self.key_manager_gcm.new_key_data(key_template)
+
+    # AES CTR HKDF
+    key_template = streaming_aead_key_templates.create_aes_ctr_hmac_streaming_key_template(
+        63, common_pb2.HashType.SHA1, 65, common_pb2.HashType.SHA256, 55, 2)
+    with self.assertRaisesRegex(tink_error.TinkError,
+                                'key_size must not be smaller than'):
+      self.key_manager_ctr.new_key_data(key_template)
+
+  def test_encrypt_decrypt(self):
+    pass
+    # TODO(tanujdhir) Consider putting round-trip encryption test here once
+    # implemented
+
+
+if __name__ == '__main__':
+  absltest.main()