blob: 64bee2a7bae74c193f2d1cb5bd50fb23e01ddd0c [file] [log] [blame]
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
"""The JwtValidator."""
import datetime
from typing import Optional
from tink.jwt import _jwt_error
from tink.jwt import _raw_jwt
_MAX_CLOCK_SKEW = datetime.timedelta(minutes=10)
class JwtValidator:
"""A JwtValidator defines how JSON Web Tokens (JWTs) should be validated.
By default, the JwtValidator requires that a token has a valid expiration
claim, no issuer and no audience claim. This can be changed using the
expect_... and ignore_... arguments.
If present, the JwtValidator also validates the not-before claim. The
validation time can be changed using the fixed_now parameter. clock_skew can
be set to allow a small leeway (not more than 10 minutes) to account for
clock skew.
"""
def __init__(self,
*,
expected_type_header: Optional[str],
expected_issuer: Optional[str],
expected_audience: Optional[str],
ignore_type_header: bool,
ignore_issuer: bool,
ignore_audiences: bool,
allow_missing_expiration: bool,
expect_issued_in_the_past: bool,
clock_skew: Optional[datetime.timedelta],
fixed_now: Optional[datetime.datetime]) -> None:
if expected_type_header and ignore_type_header:
raise ValueError(
'expected_type_header and ignore_type_header cannot be used together')
if expected_issuer and ignore_issuer:
raise ValueError(
'expected_issuer and ignore_issuer cannot be used together')
if expected_audience and ignore_audiences:
raise ValueError(
'expected_audience and ignore_audiences cannot be used together')
self._expected_type_header = expected_type_header
self._expected_issuer = expected_issuer
self._expected_audience = expected_audience
self._ignore_type_header = ignore_type_header
self._ignore_issuer = ignore_issuer
self._ignore_audiences = ignore_audiences
self._allow_missing_expiration = allow_missing_expiration
self._expect_issued_in_the_past = expect_issued_in_the_past
if clock_skew:
if clock_skew > _MAX_CLOCK_SKEW:
raise ValueError('clock skew too large, max is 10 minutes')
self._clock_skew = clock_skew
else:
self._clock_skew = datetime.timedelta()
if fixed_now and not fixed_now.tzinfo:
raise ValueError('fixed_now without tzinfo')
self._fixed_now = fixed_now
def has_expected_type_header(self) -> bool:
return self._expected_type_header is not None
def expected_type_header(self) -> str:
return self._expected_type_header
def has_expected_issuer(self) -> bool:
return self._expected_issuer is not None
def expected_issuer(self) -> str:
return self._expected_issuer
def has_expected_audience(self) -> bool:
return self._expected_audience is not None
def expected_audience(self) -> str:
return self._expected_audience
def ignore_type_header(self) -> bool:
return self._ignore_type_header
def ignore_issuer(self) -> bool:
return self._ignore_issuer
def ignore_audiences(self) -> bool:
return self._ignore_audiences
def allow_missing_expiration(self) -> bool:
return self._allow_missing_expiration
def expect_issued_in_the_past(self) -> bool:
return self._expect_issued_in_the_past
def clock_skew(self) -> datetime.timedelta:
return self._clock_skew
def has_fixed_now(self) -> bool:
return self._fixed_now is not None
def fixed_now(self) -> datetime.datetime:
return self._fixed_now
def new_validator(
*,
expected_type_header: Optional[str] = None,
expected_issuer: Optional[str] = None,
expected_audience: Optional[str] = None,
ignore_type_header: bool = False,
ignore_issuer: bool = False,
ignore_audiences: bool = False,
allow_missing_expiration: bool = False,
expect_issued_in_the_past: bool = False,
clock_skew: Optional[datetime.timedelta] = None,
fixed_now: Optional[datetime.datetime] = None) -> JwtValidator:
"""Creates a new JwtValidator."""
return JwtValidator(
expected_type_header=expected_type_header,
expected_issuer=expected_issuer,
expected_audience=expected_audience,
ignore_type_header=ignore_type_header,
ignore_issuer=ignore_issuer,
ignore_audiences=ignore_audiences,
allow_missing_expiration=allow_missing_expiration,
expect_issued_in_the_past=expect_issued_in_the_past,
clock_skew=clock_skew,
fixed_now=fixed_now)
def validate(validator: JwtValidator, raw_jwt: _raw_jwt.RawJwt) -> None:
"""Validates a jwt.RawJwt and raises JwtInvalidError if it is invalid.
This function is called by the JWT primitives and does not need to be called
by the user.
Args:
validator: a jwt.JwtValidator that defines how to validate tokens.
raw_jwt: a jwt.RawJwt token to validate.
Raises:
jwt.JwtInvalidError
"""
if validator.has_fixed_now():
now = validator.fixed_now()
else:
now = datetime.datetime.now(tz=datetime.timezone.utc)
if not raw_jwt.has_expiration() and not validator.allow_missing_expiration():
raise _jwt_error.JwtInvalidError('token is missing an expiration')
if (raw_jwt.has_expiration() and
raw_jwt.expiration() <= now - validator.clock_skew()):
raise _jwt_error.JwtInvalidError('token has expired since %s' %
raw_jwt.expiration())
if (raw_jwt.has_not_before() and
raw_jwt.not_before() > now + validator.clock_skew()):
raise _jwt_error.JwtInvalidError('token cannot be used before %s' %
raw_jwt.not_before())
if validator.expect_issued_in_the_past():
if not raw_jwt.has_issued_at():
raise _jwt_error.JwtInvalidError('token is missing iat claim')
if raw_jwt.issued_at() > now + validator.clock_skew():
raise _jwt_error.JwtInvalidError(
'token has a invalid iat claim in the future: %s' %
raw_jwt.issued_at())
if validator.has_expected_type_header():
if not raw_jwt.has_type_header():
raise _jwt_error.JwtInvalidError(
'invalid JWT; missing expected type header %s.' %
validator.expected_type_header())
if validator.expected_type_header() != raw_jwt.type_header():
raise _jwt_error.JwtInvalidError(
'invalid JWT; expected type header %s, but got %s' %
(validator.expected_type_header(), raw_jwt.type_header()))
else:
if raw_jwt.has_type_header() and not validator.ignore_type_header():
raise _jwt_error.JwtInvalidError(
'invalid JWT; token has type_header set, but validator not.')
if validator.has_expected_issuer():
if not raw_jwt.has_issuer():
raise _jwt_error.JwtInvalidError(
'invalid JWT; missing expected issuer %s.' %
validator.expected_issuer())
if validator.expected_issuer() != raw_jwt.issuer():
raise _jwt_error.JwtInvalidError(
'invalid JWT; expected issuer %s, but got %s' %
(validator.expected_issuer(), raw_jwt.issuer()))
else:
if raw_jwt.has_issuer() and not validator.ignore_issuer():
raise _jwt_error.JwtInvalidError(
'invalid JWT; token has issuer set, but validator not.')
if validator.has_expected_audience():
if (not raw_jwt.has_audiences() or
validator.expected_audience() not in raw_jwt.audiences()):
raise _jwt_error.JwtInvalidError(
'invalid JWT; missing expected audience %s.' %
validator.expected_audience())
else:
if raw_jwt.has_audiences() and not validator.ignore_audiences():
raise _jwt_error.JwtInvalidError(
'invalid JWT; token has audience set, but validator not.')