| # Copyright 2021 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Tests for tink.python.tink.jwt._jwt_format.""" |
| |
| from absl.testing import absltest |
| from absl.testing import parameterized |
| from tink.jwt import _jwt_error |
| from tink.jwt import _jwt_format |
| |
| |
| class JwtFormatTest(parameterized.TestCase): |
| |
| def test_base64_encode_decode_header_fixed_data(self): |
| # Example from https://tools.ietf.org/html/rfc7519#section-3.1 |
| header = bytes([ |
| 123, 34, 116, 121, 112, 34, 58, 34, 74, 87, 84, 34, 44, 13, 10, 32, 34, |
| 97, 108, 103, 34, 58, 34, 72, 83, 50, 53, 54, 34, 125 |
| ]) |
| encoded_header = b'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9' |
| self.assertEqual(_jwt_format._base64_encode(header), encoded_header) |
| self.assertEqual(_jwt_format._base64_decode(encoded_header), header) |
| |
| def test_base64_encode_decode_payload_fixed_data(self): |
| # Example from https://tools.ietf.org/html/rfc7519#section-3.1 |
| payload = bytes([ |
| 123, 34, 105, 115, 115, 34, 58, 34, 106, 111, 101, 34, 44, 13, 10, 32, |
| 34, 101, 120, 112, 34, 58, 49, 51, 48, 48, 56, 49, 57, 51, 56, 48, 44, |
| 13, 10, 32, 34, 104, 116, 116, 112, 58, 47, 47, 101, 120, 97, 109, 112, |
| 108, 101, 46, 99, 111, 109, 47, 105, 115, 95, 114, 111, 111, 116, 34, |
| 58, 116, 114, 117, 101, 125 |
| ]) |
| encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0' |
| b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ') |
| self.assertEqual(_jwt_format._base64_encode(payload), encoded_payload) |
| self.assertEqual(_jwt_format._base64_decode(encoded_payload), payload) |
| |
| def test_base64_decode_bad_format_raises_jwt_invalid_error(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format._base64_decode(b'aeyJh') |
| |
| def test_base64_decode_fails_with_unknown_chars(self): |
| self.assertNotEmpty( |
| _jwt_format._base64_decode( |
| b'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-') |
| ) |
| self.assertEqual(_jwt_format._base64_decode(b''), b'') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format._base64_decode(b'[') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format._base64_decode(b'@') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format._base64_decode(b'/') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format._base64_decode(b':') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format._base64_decode(b'{') |
| |
| def test_json_loads_recursion(self): |
| num_recursions = 1000 |
| recursive_json = ('{"a":' * num_recursions) + '""' + ('}' * num_recursions) |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.json_loads(recursive_json) |
| |
| def test_json_loads_with_invalid_utf16(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.json_loads(u'{"a":{"b":{"c":"\\uD834"}}}') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.json_loads(u'{"\\uD834":"b"}') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.json_loads(u'{"a":["a":{"b":["c","\\uD834"]}]}') |
| |
| def test_decode_encode_header_hs256(self): |
| # Example from https://tools.ietf.org/html/rfc7515#appendix-A.1 |
| encoded_header = b'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9' |
| json_header = _jwt_format.decode_header(encoded_header) |
| header = _jwt_format.json_loads(json_header) |
| self.assertEqual(header['alg'], 'HS256') |
| self.assertEqual(header['typ'], 'JWT') |
| self.assertEqual( |
| _jwt_format.decode_header(_jwt_format.encode_header(json_header)), |
| json_header) |
| |
| def test_decode_encode_header_rs256(self): |
| # Example from https://tools.ietf.org/html/rfc7515#appendix-A.2 |
| encoded_header = b'eyJhbGciOiJSUzI1NiJ9' |
| json_header = _jwt_format.decode_header(encoded_header) |
| header = _jwt_format.json_loads(json_header) |
| self.assertEqual(header['alg'], 'RS256') |
| self.assertEqual( |
| _jwt_format.decode_header(_jwt_format.encode_header(json_header)), |
| json_header) |
| |
| def test_encode_decode_header(self): |
| encoded_header = _jwt_format.encode_header('{ "alg": "RS256"} ') |
| json_header = _jwt_format.decode_header(encoded_header) |
| self.assertEqual(json_header, '{ "alg": "RS256"} ') |
| |
| def test_decode_header_with_invalid_utf8(self): |
| encoded_header = _jwt_format._base64_encode( |
| b'{"alg":"RS256", "bad":"\xc2"}') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.decode_header(encoded_header) |
| |
| def test_encode_header_with_utf16_surrogate(self): |
| self.assertEqual( |
| _jwt_format.encode_header('{"alg": "RS256", "a":"\U0001d11e"}'), |
| b'eyJhbGciOiAiUlMyNTYiLCAiYSI6IvCdhJ4ifQ') |
| |
| def test_encode_header_with_invalid_utf16_character(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.encode_header('{"alg": "RS256", "a":"\uD834"}') |
| |
| @parameterized.parameters([ |
| 'HS256', 'HS384', 'HS512', 'ES256', 'ES384', 'ES512', 'RS256', 'RS384', |
| 'RS384', 'RS512', 'PS256', 'PS384', 'PS512' |
| ]) |
| def test_create_validate_header(self, algorithm): |
| encoded_header = _jwt_format.create_header(algorithm) |
| header = _jwt_format.decode_header(encoded_header) |
| _jwt_format.validate_header(header, algorithm) |
| |
| def test_create_unknown_header_fails(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.create_header('unknown') |
| |
| def test_create_verify_different_algorithms_fails(self): |
| encoded_header = _jwt_format.create_header('HS256') |
| header = _jwt_format.decode_header(encoded_header) |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.validate_header(header, 'ES256') |
| |
| def test_verify_empty_header_fails(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.validate_header('{}', 'ES256') |
| |
| def test_validate_header_with_unknown_algorithm_fails(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.validate_header('{"alg":"HS123"}', 'HS123') |
| |
| def test_validate_header_without_quotes(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.validate_header('{alg:"RS256"}', 'RS256') |
| |
| def test_validate_header_with_uppercase_typ_success(self): |
| _jwt_format.validate_header('{"alg":"HS256","typ":"JWT"}', 'HS256') |
| |
| def test_validate_header_with_lowercase_typ_success(self): |
| _jwt_format.validate_header('{"alg":"HS256","typ":"jwt"}', 'HS256') |
| |
| def test_validate_header_with_unknown_entry_success(self): |
| _jwt_format.validate_header('{"alg":"HS256","unknown":"header"}', 'HS256') |
| |
| def test_validate_header_ignores_typ(self): |
| _jwt_format.validate_header('{"alg":"HS256","typ":"unknown"}', 'HS256') |
| |
| def test_validate_header_rejects_crit(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.validate_header( |
| '{"alg":"HS256","crit":["http://example.invalid/UNDEFINED"],' |
| '"http://example.invalid/UNDEFINED":true}', |
| 'HS256') |
| |
| def test_json_decode_encode_payload_fixed_data(self): |
| # Example from https://tools.ietf.org/html/rfc7519#section-3.1 |
| encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0' |
| b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ') |
| json_payload = _jwt_format.decode_payload(encoded_payload) |
| payload = _jwt_format.json_loads(json_payload) |
| self.assertEqual(payload['iss'], 'joe') |
| self.assertEqual(payload['exp'], 1300819380) |
| self.assertEqual(payload['http://example.com/is_root'], True) |
| self.assertEqual( |
| _jwt_format.decode_payload(_jwt_format.encode_payload(json_payload)), |
| json_payload) |
| |
| def test_decode_encode_payload(self): |
| # Example from https://tools.ietf.org/html/rfc7519#section-3.1 |
| encoded_payload = (b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0' |
| b'dHA6Ly9leGFtcGxlLmNvbS9pc19yb290Ijp0cnVlfQ') |
| json_payload = _jwt_format.decode_payload(encoded_payload) |
| payload = _jwt_format.json_loads(json_payload) |
| self.assertEqual(payload['iss'], 'joe') |
| self.assertEqual(payload['exp'], 1300819380) |
| self.assertEqual(payload['http://example.com/is_root'], True) |
| self.assertEqual( |
| _jwt_format.decode_payload(_jwt_format.encode_payload(json_payload)), |
| json_payload) |
| |
| def test_encode_payload_with_utf16_surrogate(self): |
| self.assertEqual( |
| _jwt_format.encode_payload('{"iss":"\U0001d11e"}'), |
| b'eyJpc3MiOiLwnYSeIn0') |
| |
| def test_encode_payload_with_invalid_utf16(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.encode_payload('{"iss":"\uD834"}') |
| |
| def test_create_unsigned_compact_success(self): |
| self.assertEqual( |
| _jwt_format.create_unsigned_compact('RS256', '{"iss":"joe"}'), |
| b'eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJqb2UifQ') |
| |
| def test_encode_decode_signature_success(self): |
| signature = bytes([ |
| 116, 24, 223, 180, 151, 153, 224, 37, 79, 250, 96, 125, 216, 173, 187, |
| 186, 22, 212, 37, 77, 105, 214, 191, 240, 91, 88, 5, 88, 83, 132, 141, |
| 121 |
| ]) |
| encoded = _jwt_format.encode_signature(signature) |
| self.assertEqual(encoded, b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') |
| self.assertEqual(_jwt_format.decode_signature(encoded), signature) |
| |
| def test_signed_compact_create_split(self): |
| payload = '{"iss":"joe"}' |
| signature = _jwt_format.decode_signature( |
| b'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') |
| unsigned_compact = _jwt_format.create_unsigned_compact('RS256', payload) |
| signed_compact = _jwt_format.create_signed_compact(unsigned_compact, |
| signature) |
| un_comp, hdr, pay, sig = _jwt_format.split_signed_compact(signed_compact) |
| |
| self.assertEqual( |
| unsigned_compact, |
| b'eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJqb2UifQ') |
| self.assertEqual( |
| signed_compact, 'eyJhbGciOiJSUzI1NiJ9.eyJpc3MiOiJqb2UifQ.' |
| 'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk') |
| self.assertEqual(un_comp, unsigned_compact) |
| self.assertEqual(sig, signature) |
| self.assertEqual(hdr, '{"alg":"RS256"}') |
| _jwt_format.validate_header(hdr, 'RS256') |
| self.assertEqual(pay, payload) |
| |
| def test_split_empty_signed_compact(self): |
| un_comp, hdr, pay, sig = _jwt_format.split_signed_compact('..') |
| self.assertEqual(un_comp, b'.') |
| self.assertEmpty(sig) |
| self.assertEmpty(hdr) |
| self.assertEmpty(pay) |
| |
| def test_split_signed_compact_success(self): |
| un_comp, hdr, pay, sig = _jwt_format.split_signed_compact('e30.e30.YWJj') |
| self.assertEqual(un_comp, b'e30.e30') |
| self.assertEqual(sig, b'abc') |
| self.assertEqual(hdr, '{}') |
| self.assertEqual(pay, '{}') |
| |
| def test_split_signed_compact_with_bad_format_fails(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.e30.YWJj.abc') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.e30.YWJj.') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('.e30.e30.YWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('.e30.e30.') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('') |
| |
| def test_split_signed_compact_with_bad_characters_fails(self): |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('{e30.e30.YWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact(' e30.e30.YWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30. e30.YWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.e30.YWJj ') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.e30.\nYWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.\re30.YWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30$.e30.YWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.$e30.YWJj') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.e30.YWJj$') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact('e30.e30.YWJj\ud83c') |
| |
| def test_split_signed_compact_with_invalid_utf8_in_header(self): |
| encoded_header = _jwt_format._base64_encode( |
| b'{"alg":"RS256", "bad":"\xc2"}') |
| token = (encoded_header + b'.e30.YWJj').decode('utf8') |
| with self.assertRaises(_jwt_error.JwtInvalidError): |
| _jwt_format.split_signed_compact(token) |
| |
| |
| if __name__ == '__main__': |
| absltest.main() |