| # Copyright 2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Cross-language tests for reading and writing encrypted keysets.""" |
| |
| from typing import Iterable, Tuple, Optional |
| |
| from absl.testing import absltest |
| from absl.testing import parameterized |
| import tink |
| from tink import aead |
| from tink.proto import tink_pb2 |
| from util import key_util |
| from util import testing_servers |
| from google.protobuf import json_format |
| from google.protobuf import text_format |
| |
| # Contains keys with different status and output_prefix_type |
| SYMMETRIC_KEYSET = r""" |
| primary_key_id: 2178398476 |
| key { |
| key_data { |
| type_url: "type.googleapis.com/google.crypto.tink.HmacKey" |
| value: "\032 \216\300\2643\375\353)\347?\034q\006\325~\322\377\365\364\202\205\320m\005\327Y\3213\213\217i>\034\022\004\020\020\010\003" |
| key_material_type: SYMMETRIC |
| } |
| status: ENABLED |
| key_id: 2178398476 |
| output_prefix_type: TINK |
| } |
| key { |
| key_data { |
| type_url: "type.googleapis.com/google.crypto.tink.HmacKey" |
| value: "\032@\212}\023kK\247.\300\030\377 \351\321\234}rFuJ\367\201\260b)0\271k\001v,\0346D\363mM\255\272\317\007\340M\225d\270[\210\262\362\352\3544&\037\005(\370\320\031\335}\311\374\n\022\004\020 \010\004" |
| key_material_type: SYMMETRIC |
| } |
| status: DISABLED |
| key_id: 1021124131 |
| output_prefix_type: LEGACY |
| } |
| key { |
| key_data { |
| type_url: "type.googleapis.com/google.crypto.tink.HmacKey" |
| value: "\032 \312\272\026\243]t\023\024\310\"\2331\361c\r\202\372\363o\260\335\274\2726#\365\034yU\365)\264\022\004\020\020\010\003" |
| key_material_type: SYMMETRIC |
| } |
| status: ENABLED |
| key_id: 1531888792 |
| output_prefix_type: CRUNCHY |
| } |
| key { |
| key_data { |
| type_url: "type.googleapis.com/google.crypto.tink.HmacKey" |
| value: "\032 \363q\0337,\254\303\215$\370yR\304`\206uf{V\243\271\367\254\351\034\020\247M\'\240+\320\022\004\020\020\010\003" |
| key_material_type: SYMMETRIC |
| } |
| status: DESTROYED |
| key_id: 3173753038 |
| output_prefix_type: RAW |
| } |
| """ |
| |
| PRIVATE_KEYSET = r""" |
| primary_key_id: 3858784341 |
| key { |
| key_data { |
| type_url: "type.googleapis.com/google.crypto.tink.EcdsaPrivateKey" |
| value: "\032 \021U\231BC\265\337\020$\351n\336di\245\245\371\004-\215k\214\262\344*\306\224\367\360I\317\330\022L\" e\356\202K\367I{\247T\314o\032\222\000\267\266\024\263u\234H\236<\374\340sDK<;6\242\032 c\264\n\200\340\317\001\351\352\372\305\345\371i\3625\200\305 \367\257\335\256\221\313\313\263\036!\270\305\020\022\006\030\002\020\002\010\003" |
| key_material_type: ASYMMETRIC_PRIVATE |
| } |
| status: ENABLED |
| key_id: 3858784341 |
| output_prefix_type: TINK |
| } |
| key { |
| key_data { |
| type_url: "type.googleapis.com/google.crypto.tink.EcdsaPrivateKey" |
| value: "\032 :]\010\201.YK\214\372\302P}\250\354Q\246\322(\216\213\345T\316Lp\013\037#\347\316Sr\022L\" M\014\237i\213\010\252\246\216\342\222\374\026\303\334\010u4\357\323\332\227\250\177\336\216|\217\264\3424\207\032 \013\367.n\035\323\274\337\350\252\233\214)\007\347!\327\313B\223\336jp\251\035\371\247h\014\272\357\317\022\006\030\002\020\002\010\003" |
| key_material_type: ASYMMETRIC_PRIVATE |
| } |
| status: ENABLED |
| key_id: 465053161 |
| output_prefix_type: RAW |
| } |
| """ |
| |
| PUBLIC_KEYSET = r""" |
| primary_key_id: 768876193 |
| key { |
| key_data { |
| type_url: "type.googleapis.com/google.crypto.tink.EcdsaPublicKey" |
| value: "\" \336\302\324\330=\026.|\\\224\314A\301Ka\241\324{\035\210Tp\222\306\263\317\236\307\032q\010\252\032 }\261\033x\347Gx\224&V\314hx\000\217Q\272G\361b\302\346Fb?r\334\223w\304y\325\022\006\030\002\020\002\010\003" |
| key_material_type: ASYMMETRIC_PUBLIC |
| } |
| status: ENABLED |
| key_id: 768876193 |
| output_prefix_type: TINK |
| } |
| """ |
| |
| TEST_KEYSETS = [ |
| ('symmetric', SYMMETRIC_KEYSET), |
| ('private', PRIVATE_KEYSET), |
| ('public', PUBLIC_KEYSET), |
| ] |
| |
| |
| def setUpModule(): |
| testing_servers.start('keyset_read_write') |
| |
| |
| def tearDownModule(): |
| testing_servers.stop() |
| |
| |
| def read_write_encrypted_test_cases( |
| ) -> Iterable[Tuple[str, bytes, str, str, str, str, Optional[bytes]]]: |
| """Yields (test_name, test_parameters...) tuples to test.""" |
| for keyset_name, keyset_text_proto in TEST_KEYSETS: |
| keyset_proto = text_format.Parse(keyset_text_proto, tink_pb2.Keyset()) |
| keyset = keyset_proto.SerializeToString() |
| for write_lang in testing_servers.LANGUAGES: |
| for read_lang in testing_servers.LANGUAGES: |
| for associated_data in [None, b'', b'associated_data']: |
| yield ('_bin_%s, r in %s, w in %s, ad=%s' % |
| (keyset_name, read_lang, write_lang, associated_data), keyset, |
| read_lang, 'KEYSET_READER_BINARY', write_lang, |
| 'KEYSET_WRITER_BINARY', associated_data) |
| yield ('_json_%s, r in %s, w in %s, ad=%s' % |
| (keyset_name, write_lang, read_lang, associated_data), keyset, |
| read_lang, 'KEYSET_READER_JSON', write_lang, |
| 'KEYSET_WRITER_JSON', associated_data) |
| |
| |
| class KeysetReadWriteTest(parameterized.TestCase): |
| |
| @parameterized.named_parameters(TEST_KEYSETS) |
| def test_to_from_json(self, keyset_text_proto): |
| keyset_proto = text_format.Parse(keyset_text_proto, tink_pb2.Keyset()) |
| keyset = keyset_proto.SerializeToString() |
| for to_lang in testing_servers.LANGUAGES: |
| json_keyset = testing_servers.keyset_to_json(to_lang, keyset) |
| for from_lang in testing_servers.LANGUAGES: |
| keyset_from_json = testing_servers.keyset_from_json( |
| from_lang, json_keyset) |
| key_util.assert_tink_proto_equal( |
| self, |
| tink_pb2.Keyset.FromString(keyset), |
| tink_pb2.Keyset.FromString(keyset_from_json), |
| msg=('keysets are not equal when converting to JSON in ' |
| '%s and back in %s' % (to_lang, from_lang))) |
| |
| @parameterized.named_parameters(read_write_encrypted_test_cases()) |
| def test_read_write_encrypted_keyset(self, keyset, read_lang, reader_type, |
| write_lang, writer_type, |
| associated_data): |
| # Use an arbitrary AEAD template that's supported in all languages, |
| # and use an arbitrary language to generate the keyset_encryption_keyset. |
| keyset_encryption_keyset = testing_servers.new_keyset( |
| 'cc', aead.aead_key_templates.AES128_GCM) |
| |
| encrypted_keyset = testing_servers.keyset_write_encrypted( |
| write_lang, keyset, keyset_encryption_keyset, associated_data, |
| writer_type) |
| decrypted_keyset = testing_servers.keyset_read_encrypted( |
| read_lang, encrypted_keyset, keyset_encryption_keyset, |
| associated_data, reader_type) |
| # Both keyset and decrypted_keyset are serialized tink_pb2.Keyset. |
| key_util.assert_tink_proto_equal( |
| self, tink_pb2.Keyset.FromString(keyset), |
| tink_pb2.Keyset.FromString(decrypted_keyset)) |
| |
| with self.assertRaises(tink.TinkError): |
| testing_servers.keyset_read_encrypted(read_lang, encrypted_keyset, |
| keyset_encryption_keyset, |
| b'invalid_associated_data', |
| reader_type) |
| |
| @parameterized.parameters(testing_servers.LANGUAGES) |
| def test_read_encrypted_ignores_keyset_info_binary(self, lang): |
| # Use an arbitrary AEAD template that's supported in all languages, |
| # and use an arbitrary language to generate the keyset_encryption_keyset. |
| keyset_encryption_keyset = testing_servers.new_keyset( |
| 'cc', aead.aead_key_templates.AES128_GCM) |
| # Also, generate an arbitrary keyset. |
| keyset = testing_servers.new_keyset('cc', |
| aead.aead_key_templates.AES128_GCM) |
| associated_data = b'associated_data' |
| |
| encrypted_keyset = testing_servers.keyset_write_encrypted( |
| lang, keyset, keyset_encryption_keyset, associated_data, |
| 'KEYSET_WRITER_BINARY') |
| |
| # encrypted_keyset is a serialized tink_pb2.EncryptedKeyset |
| parsed_encrypted_keyset = tink_pb2.EncryptedKeyset.FromString( |
| encrypted_keyset) |
| |
| # Note that some implementations (currently C++) do not set keyset_info. |
| # But we require that values are correct when they are set. |
| if parsed_encrypted_keyset.HasField('keyset_info'): |
| self.assertLen(parsed_encrypted_keyset.keyset_info.key_info, 1) |
| self.assertEqual(parsed_encrypted_keyset.keyset_info.primary_key_id, |
| parsed_encrypted_keyset.keyset_info.key_info[0].key_id) |
| |
| # keyset_info should be ignored when reading a keyset. |
| # to test this, we add something invalid and check that read still works. |
| parsed_encrypted_keyset.keyset_info.key_info.append( |
| tink_pb2.KeysetInfo.KeyInfo(type_url='invalid', key_id=123)) |
| modified_encrypted_keyset = parsed_encrypted_keyset.SerializeToString() |
| |
| decrypted_keyset = testing_servers.keyset_read_encrypted( |
| lang, modified_encrypted_keyset, keyset_encryption_keyset, |
| associated_data, 'KEYSET_READER_BINARY') |
| # Both keyset and decrypted_keyset are serialized tink_pb2.Keyset. |
| key_util.assert_tink_proto_equal( |
| self, tink_pb2.Keyset.FromString(keyset), |
| tink_pb2.Keyset.FromString(decrypted_keyset)) |
| |
| @parameterized.parameters(testing_servers.LANGUAGES) |
| def test_read_encrypted_ignores_keyset_info_json(self, lang): |
| # Use an arbitrary AEAD template that's supported in all languages, |
| # and use an arbitrary language to generate the keyset_encryption_keyset. |
| keyset_encryption_keyset = testing_servers.new_keyset( |
| 'cc', aead.aead_key_templates.AES128_GCM) |
| # Also, generate an arbitrary keyset. |
| keyset = testing_servers.new_keyset('cc', |
| aead.aead_key_templates.AES128_GCM) |
| associated_data = b'associated_data' |
| |
| encrypted_keyset = testing_servers.keyset_write_encrypted( |
| lang, keyset, keyset_encryption_keyset, associated_data, |
| 'KEYSET_WRITER_JSON') |
| |
| # encrypted_keyset is a JSON serialized tink_pb2.EncryptedKeyset |
| parsed_encrypted_keyset = json_format.Parse(encrypted_keyset, |
| tink_pb2.EncryptedKeyset()) |
| |
| # Note that some implementations (currently C++) do not set keyset_info. |
| # But we require that values are correct when they are set. |
| if parsed_encrypted_keyset.HasField('keyset_info'): |
| self.assertLen(parsed_encrypted_keyset.keyset_info.key_info, 1) |
| self.assertEqual(parsed_encrypted_keyset.keyset_info.primary_key_id, |
| parsed_encrypted_keyset.keyset_info.key_info[0].key_id) |
| |
| # keyset_info should be ignored when reading a keyset. |
| # To test this, we add something invalid and check that read still works. |
| # Some languages (C++ and Java) however do check that the fields of |
| # keyset_info are present. So we have to set all required fields here. |
| parsed_encrypted_keyset.keyset_info.key_info.append( |
| tink_pb2.KeysetInfo.KeyInfo( |
| type_url='invalid', |
| status=tink_pb2.ENABLED, |
| key_id=123, |
| output_prefix_type=tink_pb2.LEGACY)) |
| parsed_encrypted_keyset.keyset_info.primary_key_id = 123 |
| modified_encrypted_keyset = json_format.MessageToJson( |
| parsed_encrypted_keyset).encode('utf8') |
| |
| decrypted_keyset = testing_servers.keyset_read_encrypted( |
| lang, modified_encrypted_keyset, keyset_encryption_keyset, |
| associated_data, 'KEYSET_READER_JSON') |
| # Both keyset and decrypted_keyset are serialized tink_pb2.Keyset. |
| key_util.assert_tink_proto_equal( |
| self, tink_pb2.Keyset.FromString(keyset), |
| tink_pb2.Keyset.FromString(decrypted_keyset)) |
| |
| if __name__ == '__main__': |
| absltest.main() |