| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "net/url" |
| "time" |
| |
| "github.com/golang/protobuf/ptypes" |
| durationpb "github.com/golang/protobuf/ptypes/duration" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/rls/internal/keys" |
| "google.golang.org/grpc/internal" |
| "google.golang.org/grpc/internal/pretty" |
| rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/serviceconfig" |
| "google.golang.org/protobuf/encoding/protojson" |
| ) |
| |
| const ( |
| // Default max_age if not specified (or greater than this value) in the |
| // service config. |
| maxMaxAge = 5 * time.Minute |
| // Upper limit for cache_size since we don't fully trust the service config. |
| maxCacheSize = 5 * 1024 * 1024 * 8 // 5MB in bytes |
| // Default lookup_service_timeout if not specified in the service config. |
| defaultLookupServiceTimeout = 10 * time.Second |
| // Default value for targetNameField in the child policy config during |
| // service config validation. |
| dummyChildPolicyTarget = "target_name_to_be_filled_in_later" |
| ) |
| |
| // lbConfig is the internal representation of the RLS LB policy's config. |
| type lbConfig struct { |
| serviceconfig.LoadBalancingConfig |
| |
| cacheSizeBytes int64 // Keep this field 64-bit aligned. |
| kbMap keys.BuilderMap |
| lookupService string |
| lookupServiceTimeout time.Duration |
| maxAge time.Duration |
| staleAge time.Duration |
| defaultTarget string |
| |
| childPolicyName string |
| childPolicyConfig map[string]json.RawMessage |
| childPolicyTargetField string |
| controlChannelServiceConfig string |
| } |
| |
| func (lbCfg *lbConfig) Equal(other *lbConfig) bool { |
| return lbCfg.kbMap.Equal(other.kbMap) && |
| lbCfg.lookupService == other.lookupService && |
| lbCfg.lookupServiceTimeout == other.lookupServiceTimeout && |
| lbCfg.maxAge == other.maxAge && |
| lbCfg.staleAge == other.staleAge && |
| lbCfg.cacheSizeBytes == other.cacheSizeBytes && |
| lbCfg.defaultTarget == other.defaultTarget && |
| lbCfg.childPolicyName == other.childPolicyName && |
| lbCfg.childPolicyTargetField == other.childPolicyTargetField && |
| lbCfg.controlChannelServiceConfig == other.controlChannelServiceConfig && |
| childPolicyConfigEqual(lbCfg.childPolicyConfig, other.childPolicyConfig) |
| } |
| |
| func childPolicyConfigEqual(a, b map[string]json.RawMessage) bool { |
| if (b == nil) != (a == nil) { |
| return false |
| } |
| if len(b) != len(a) { |
| return false |
| } |
| for k, jsonA := range a { |
| jsonB, ok := b[k] |
| if !ok { |
| return false |
| } |
| if !bytes.Equal(jsonA, jsonB) { |
| return false |
| } |
| } |
| return true |
| } |
| |
| // This struct resembles the JSON representation of the loadBalancing config |
| // and makes it easier to unmarshal. |
| type lbConfigJSON struct { |
| RouteLookupConfig json.RawMessage |
| RouteLookupChannelServiceConfig json.RawMessage |
| ChildPolicy []map[string]json.RawMessage |
| ChildPolicyConfigTargetFieldName string |
| } |
| |
| // ParseConfig parses the JSON load balancer config provided into an |
| // internal form or returns an error if the config is invalid. |
| // |
| // When parsing a config update, the following validations are performed: |
| // - routeLookupConfig: |
| // - grpc_keybuilders field: |
| // - must have at least one entry |
| // - must not have two entries with the same `Name` |
| // - within each entry: |
| // - must have at least one `Name` |
| // - must not have a `Name` with the `service` field unset or empty |
| // - within each `headers` entry: |
| // - must not have `required_match` set |
| // - must not have `key` unset or empty |
| // - across all `headers`, `constant_keys` and `extra_keys` fields: |
| // - must not have the same `key` specified twice |
| // - no `key` must be the empty string |
| // - `lookup_service` field must be set and must parse as a target URI |
| // - if `max_age` > 5m, it should be set to 5 minutes |
| // - if `stale_age` > `max_age`, ignore it |
| // - if `stale_age` is set, then `max_age` must also be set |
| // - ignore `valid_targets` field |
| // - `cache_size_bytes` field must have a value greater than 0, and if its |
| // value is greater than 5M, we cap it at 5M |
| // |
| // - routeLookupChannelServiceConfig: |
| // - if specified, must parse as valid service config |
| // |
| // - childPolicy: |
| // - must find a valid child policy with a valid config |
| // |
| // - childPolicyConfigTargetFieldName: |
| // - must be set and non-empty |
| func (rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
| logger.Infof("Received JSON service config: %v", pretty.ToJSON(c)) |
| cfgJSON := &lbConfigJSON{} |
| if err := json.Unmarshal(c, cfgJSON); err != nil { |
| return nil, fmt.Errorf("rls: json unmarshal failed for service config %+v: %v", string(c), err) |
| } |
| |
| m := protojson.UnmarshalOptions{DiscardUnknown: true} |
| rlsProto := &rlspb.RouteLookupConfig{} |
| if err := m.Unmarshal(cfgJSON.RouteLookupConfig, rlsProto); err != nil { |
| return nil, fmt.Errorf("rls: bad RouteLookupConfig proto %+v: %v", string(cfgJSON.RouteLookupConfig), err) |
| } |
| lbCfg, err := parseRLSProto(rlsProto) |
| if err != nil { |
| return nil, err |
| } |
| |
| if sc := string(cfgJSON.RouteLookupChannelServiceConfig); sc != "" { |
| parsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(sc) |
| if parsed.Err != nil { |
| return nil, fmt.Errorf("rls: bad control channel service config %q: %v", sc, parsed.Err) |
| } |
| lbCfg.controlChannelServiceConfig = sc |
| } |
| |
| if cfgJSON.ChildPolicyConfigTargetFieldName == "" { |
| return nil, fmt.Errorf("rls: childPolicyConfigTargetFieldName field is not set in service config %+v", string(c)) |
| } |
| name, config, err := parseChildPolicyConfigs(cfgJSON.ChildPolicy, cfgJSON.ChildPolicyConfigTargetFieldName) |
| if err != nil { |
| return nil, err |
| } |
| lbCfg.childPolicyName = name |
| lbCfg.childPolicyConfig = config |
| lbCfg.childPolicyTargetField = cfgJSON.ChildPolicyConfigTargetFieldName |
| return lbCfg, nil |
| } |
| |
| func parseRLSProto(rlsProto *rlspb.RouteLookupConfig) (*lbConfig, error) { |
| // Validations specified on the `grpc_keybuilders` field are performed here. |
| kbMap, err := keys.MakeBuilderMap(rlsProto) |
| if err != nil { |
| return nil, err |
| } |
| |
| // `lookup_service` field must be set and must parse as a target URI. |
| lookupService := rlsProto.GetLookupService() |
| if lookupService == "" { |
| return nil, fmt.Errorf("rls: empty lookup_service in route lookup config %+v", rlsProto) |
| } |
| parsedTarget, err := url.Parse(lookupService) |
| if err != nil { |
| // url.Parse() fails if scheme is missing. Retry with default scheme. |
| parsedTarget, err = url.Parse(resolver.GetDefaultScheme() + ":///" + lookupService) |
| if err != nil { |
| return nil, fmt.Errorf("rls: invalid target URI in lookup_service %s", lookupService) |
| } |
| } |
| if parsedTarget.Scheme == "" { |
| parsedTarget.Scheme = resolver.GetDefaultScheme() |
| } |
| if resolver.Get(parsedTarget.Scheme) == nil { |
| return nil, fmt.Errorf("rls: unregistered scheme in lookup_service %s", lookupService) |
| } |
| |
| lookupServiceTimeout, err := convertDuration(rlsProto.GetLookupServiceTimeout()) |
| if err != nil { |
| return nil, fmt.Errorf("rls: failed to parse lookup_service_timeout in route lookup config %+v: %v", rlsProto, err) |
| } |
| if lookupServiceTimeout == 0 { |
| lookupServiceTimeout = defaultLookupServiceTimeout |
| } |
| |
| // Validations performed here: |
| // - if `max_age` > 5m, it should be set to 5 minutes |
| // - if `stale_age` > `max_age`, ignore it |
| // - if `stale_age` is set, then `max_age` must also be set |
| maxAge, err := convertDuration(rlsProto.GetMaxAge()) |
| if err != nil { |
| return nil, fmt.Errorf("rls: failed to parse max_age in route lookup config %+v: %v", rlsProto, err) |
| } |
| staleAge, err := convertDuration(rlsProto.GetStaleAge()) |
| if err != nil { |
| return nil, fmt.Errorf("rls: failed to parse staleAge in route lookup config %+v: %v", rlsProto, err) |
| } |
| if staleAge != 0 && maxAge == 0 { |
| return nil, fmt.Errorf("rls: stale_age is set, but max_age is not in route lookup config %+v", rlsProto) |
| } |
| if staleAge >= maxAge { |
| logger.Infof("rls: stale_age %v is not less than max_age %v, ignoring it", staleAge, maxAge) |
| staleAge = 0 |
| } |
| if maxAge == 0 || maxAge > maxMaxAge { |
| logger.Infof("rls: max_age in route lookup config is %v, using %v", maxAge, maxMaxAge) |
| maxAge = maxMaxAge |
| } |
| |
| // `cache_size_bytes` field must have a value greater than 0, and if its |
| // value is greater than 5M, we cap it at 5M |
| cacheSizeBytes := rlsProto.GetCacheSizeBytes() |
| if cacheSizeBytes <= 0 { |
| return nil, fmt.Errorf("rls: cache_size_bytes must be set to a non-zero value: %+v", rlsProto) |
| } |
| if cacheSizeBytes > maxCacheSize { |
| logger.Info("rls: cache_size_bytes %v is too large, setting it to: %v", cacheSizeBytes, maxCacheSize) |
| cacheSizeBytes = maxCacheSize |
| } |
| return &lbConfig{ |
| kbMap: kbMap, |
| lookupService: lookupService, |
| lookupServiceTimeout: lookupServiceTimeout, |
| maxAge: maxAge, |
| staleAge: staleAge, |
| cacheSizeBytes: cacheSizeBytes, |
| defaultTarget: rlsProto.GetDefaultTarget(), |
| }, nil |
| } |
| |
| // parseChildPolicyConfigs iterates through the list of child policies and picks |
| // the first registered policy and validates its config. |
| func parseChildPolicyConfigs(childPolicies []map[string]json.RawMessage, targetFieldName string) (string, map[string]json.RawMessage, error) { |
| for i, config := range childPolicies { |
| if len(config) != 1 { |
| return "", nil, fmt.Errorf("rls: invalid childPolicy: entry %v does not contain exactly 1 policy/config pair: %q", i, config) |
| } |
| |
| var name string |
| var rawCfg json.RawMessage |
| for name, rawCfg = range config { |
| } |
| builder := balancer.Get(name) |
| if builder == nil { |
| continue |
| } |
| parser, ok := builder.(balancer.ConfigParser) |
| if !ok { |
| return "", nil, fmt.Errorf("rls: childPolicy %q with config %q does not support config parsing", name, string(rawCfg)) |
| } |
| |
| // To validate child policy configs we do the following: |
| // - unmarshal the raw JSON bytes of the child policy config into a map |
| // - add an entry with key set to `target_field_name` and a dummy value |
| // - marshal the map back to JSON and parse the config using the parser |
| // retrieved previously |
| var childConfig map[string]json.RawMessage |
| if err := json.Unmarshal(rawCfg, &childConfig); err != nil { |
| return "", nil, fmt.Errorf("rls: json unmarshal failed for child policy config %q: %v", string(rawCfg), err) |
| } |
| childConfig[targetFieldName], _ = json.Marshal(dummyChildPolicyTarget) |
| jsonCfg, err := json.Marshal(childConfig) |
| if err != nil { |
| return "", nil, fmt.Errorf("rls: json marshal failed for child policy config {%+v}: %v", childConfig, err) |
| } |
| if _, err := parser.ParseConfig(jsonCfg); err != nil { |
| return "", nil, fmt.Errorf("rls: childPolicy config validation failed: %v", err) |
| } |
| return name, childConfig, nil |
| } |
| return "", nil, fmt.Errorf("rls: invalid childPolicy config: no supported policies found in %+v", childPolicies) |
| } |
| |
| func convertDuration(d *durationpb.Duration) (time.Duration, error) { |
| if d == nil { |
| return 0, nil |
| } |
| return ptypes.Duration(d) |
| } |