blob: 712c0a426e24935c5354644a96585d7b735e14ba [file] [log] [blame]
# coding=utf-8
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
#
# This is a port of the work by: Krzysztof Choromanski, Mark Rowland,
# Vikas Sindhwani, Richard E. Turner, Adrian Weller: "Structured Evolution
# with Compact Architectures for Scalable Policy Optimization",
# https://arxiv.org/abs/1804.02395
#
###############################################################################
r"""Library of gradient ascent algorithms.
Library of stateful gradient ascent algorithms taking as input the gradient and
current parameters, and output the new parameters.
"""
import abc
import numpy as np
import numpy.typing as npt
from typing import List, Optional
class GradientAscentOptimizer(metaclass=abc.ABCMeta):
"""Abstract class for general gradient ascent optimizers.
Class is responsible for encoding different gradient ascent optimization
techniques.
"""
@abc.abstractmethod
def run_step(self, current_input: npt.NDArray[np.float32],
gradient: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]:
"""Conducts a single step of gradient ascent optimization.
Conduct a single step of gradient ascent optimization procedure, given the
current parameters and the raw gradient.
Args:
current_input: the current parameters.
gradient: the raw gradient.
Returns:
New parameters by conducting a single step of gradient ascent.
"""
raise NotImplementedError("Abstract method")
@abc.abstractmethod
def get_state(self) -> List[np.float32]:
"""Returns the state of the optimizer.
Returns the state of the optimizer.
Args:
Returns:
The state of the optimizer.
"""
raise NotImplementedError("Abstract method")
@abc.abstractmethod
def set_state(self, state: npt.NDArray[np.float32]) -> None:
"""Sets up the internal state of the optimizer.
Sets up the internal state of the optimizer.
Args:
state: state to be set up
Returns:
"""
raise NotImplementedError("Abstract method")
class MomentumOptimizer(GradientAscentOptimizer):
"""Class implementing momentum gradient ascent optimizer.
Setting momentum coefficient to zero is equivalent to vanilla gradient
ascent.
the state is the moving average as a list
"""
def __init__(self, step_size: float, momentum: float):
self.step_size = step_size
self.momentum = momentum
self.moving_average = np.asarray([], dtype=np.float32)
super().__init__()
def run_step(self, current_input: npt.NDArray[np.float32],
gradient: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]:
if self.moving_average.size == 0:
# Initialize the moving average
self.moving_average = np.zeros(len(current_input), dtype=np.float32)
elif len(self.moving_average) != len(current_input):
raise ValueError(
"Dimensions of the parameters and moving average do not match")
self.moving_average = self.momentum * self.moving_average + (
1 - self.momentum) * gradient
step = self.step_size * self.moving_average
return current_input + step
def get_state(self) -> List[np.float32]:
return self.moving_average.tolist()
def set_state(self, state: npt.NDArray[np.float32]) -> None:
self.moving_average = np.asarray(state, dtype=np.float32)
class AdamOptimizer(GradientAscentOptimizer):
"""Class implementing ADAM gradient ascent optimizer.
The state is the first moment moving average, the second
moment moving average, and t (current step number)
combined in that order into one list
"""
def __init__(self,
step_size: float,
beta1: Optional[float] = 0.9,
beta2: Optional[float] = 0.999,
epsilon: Optional[float] = 1e-07):
self.step_size = step_size
self.beta1 = beta1
self.beta2 = beta2
self.epsilon = epsilon
self.first_moment_moving_average = np.asarray([], dtype=np.float32)
self.second_moment_moving_average = np.asarray([], dtype=np.float32)
self.t = 0
super().__init__()
def run_step(self, current_input: npt.NDArray[np.float32],
gradient: npt.NDArray[np.float32]) -> npt.NDArray[np.float32]:
if self.first_moment_moving_average.size == 0:
# Initialize the moving averages
self.first_moment_moving_average = np.zeros(
len(current_input), dtype=np.float32)
self.second_moment_moving_average = np.zeros(
len(current_input), dtype=np.float32)
# Initialize the step counter
self.t = 0
elif len(self.first_moment_moving_average) != len(current_input):
raise ValueError(
"Dimensions of the parameters and moving averages do not match")
self.first_moment_moving_average = (
self.beta1 * self.first_moment_moving_average +
(1 - self.beta1) * gradient)
self.second_moment_moving_average = (
self.beta2 * self.second_moment_moving_average + (1 - self.beta2) *
(gradient * gradient))
self.t += 1
scale = np.sqrt(1 - self.beta2**self.t) / (1 - self.beta1**self.t)
step = self.step_size * scale * self.first_moment_moving_average / (
np.sqrt(self.second_moment_moving_average) + self.epsilon)
return current_input + step
def get_state(self) -> List[float]:
return (self.first_moment_moving_average.tolist() +
self.second_moment_moving_average.tolist() + [self.t])
def set_state(self, state: npt.NDArray[np.float32]) -> None:
total_len = len(state)
if total_len % 2 != 1:
raise ValueError("The dimension of the state should be odd")
dim = total_len // 2
self.first_moment_moving_average = np.asarray(state[:dim], dtype=np.float32)
self.second_moment_moving_average = np.asarray(
state[dim:2 * dim], dtype=np.float32)
self.t = int(state[-1])
if self.t < 0:
raise ValueError("The step counter should be non-negative")