| /* |
| * |
| * Copyright 2020 gRPC authors. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package rls |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "time" |
| |
| "github.com/golang/protobuf/jsonpb" |
| "github.com/golang/protobuf/ptypes" |
| durationpb "github.com/golang/protobuf/ptypes/duration" |
| "google.golang.org/grpc/balancer" |
| "google.golang.org/grpc/balancer/rls/internal/keys" |
| rlspb "google.golang.org/grpc/balancer/rls/internal/proto/grpc_lookup_v1" |
| "google.golang.org/grpc/internal/grpcutil" |
| "google.golang.org/grpc/resolver" |
| "google.golang.org/grpc/serviceconfig" |
| ) |
| |
| const ( |
| // This is max duration that we are willing to cache RLS responses. If the |
| // service config doesn't specify a value for max_age or if it specified a |
| // value greater that this, we will use this value instead. |
| maxMaxAge = 5 * time.Minute |
| // If lookup_service_timeout is not specified in the service config, we use |
| // a default of 10 seconds. |
| defaultLookupServiceTimeout = 10 * time.Second |
| // This is set to the targetNameField in the child policy config during |
| // service config validation. |
| dummyChildPolicyTarget = "target_name_to_be_filled_in_later" |
| ) |
| |
| // lbConfig contains the parsed and validated contents of the |
| // loadBalancingConfig section of the service config. The RLS LB policy will |
| // use this to directly access config data instead of ploughing through proto |
| // fields. |
| type lbConfig struct { |
| serviceconfig.LoadBalancingConfig |
| |
| kbMap keys.BuilderMap |
| lookupService string |
| lookupServiceTimeout time.Duration |
| maxAge time.Duration |
| staleAge time.Duration |
| cacheSizeBytes int64 |
| defaultTarget string |
| cpName string |
| cpTargetField string |
| cpConfig map[string]json.RawMessage |
| } |
| |
| func (lbCfg *lbConfig) Equal(other *lbConfig) bool { |
| return lbCfg.kbMap.Equal(other.kbMap) && |
| lbCfg.lookupService == other.lookupService && |
| lbCfg.lookupServiceTimeout == other.lookupServiceTimeout && |
| lbCfg.maxAge == other.maxAge && |
| lbCfg.staleAge == other.staleAge && |
| lbCfg.cacheSizeBytes == other.cacheSizeBytes && |
| lbCfg.defaultTarget == other.defaultTarget && |
| lbCfg.cpName == other.cpName && |
| lbCfg.cpTargetField == other.cpTargetField && |
| cpConfigEqual(lbCfg.cpConfig, other.cpConfig) |
| } |
| |
| func cpConfigEqual(am, bm map[string]json.RawMessage) bool { |
| if (bm == nil) != (am == nil) { |
| return false |
| } |
| if len(bm) != len(am) { |
| return false |
| } |
| |
| for k, jsonA := range am { |
| jsonB, ok := bm[k] |
| if !ok { |
| return false |
| } |
| if !bytes.Equal(jsonA, jsonB) { |
| return false |
| } |
| } |
| return true |
| } |
| |
| // This struct resembles the JSON respresentation of the loadBalancing config |
| // and makes it easier to unmarshal. |
| type lbConfigJSON struct { |
| RouteLookupConfig json.RawMessage |
| ChildPolicy []*loadBalancingConfig |
| ChildPolicyConfigTargetFieldName string |
| } |
| |
| // loadBalancingConfig represents a single load balancing config, |
| // stored in JSON format. |
| // |
| // TODO(easwars): This code seems to be repeated in a few places |
| // (service_config.go and in the xds code as well). Refactor and re-use. |
| type loadBalancingConfig struct { |
| Name string |
| Config json.RawMessage |
| } |
| |
| // MarshalJSON returns a JSON encoding of l. |
| func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) { |
| return nil, fmt.Errorf("rls: loadBalancingConfig.MarshalJSON() is unimplemented") |
| } |
| |
| // UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. |
| func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error { |
| var cfg map[string]json.RawMessage |
| if err := json.Unmarshal(data, &cfg); err != nil { |
| return err |
| } |
| for name, config := range cfg { |
| l.Name = name |
| l.Config = config |
| } |
| return nil |
| } |
| |
| // ParseConfig parses and validates the JSON representation of the service |
| // config and returns the loadBalancingConfig to be used by the RLS LB policy. |
| // |
| // Helps implement the balancer.ConfigParser interface. |
| // |
| // The following validation checks are performed: |
| // * routeLookupConfig: |
| // ** grpc_keybuilders field: |
| // - must have at least one entry |
| // - must not have two entries with the same Name |
| // - must not have any entry with a Name with the service field unset or |
| // empty |
| // - must not have any entries without a Name |
| // - must not have a headers entry that has required_match set |
| // - must not have two headers entries with the same key within one entry |
| // ** lookup_service field: |
| // - must be set and non-empty and must parse as a target URI |
| // ** max_age field: |
| // - if not specified or is greater than maxMaxAge, it will be reset to |
| // maxMaxAge |
| // ** stale_age field: |
| // - if the value is greater than or equal to max_age, it is ignored |
| // - if set, then max_age must also be set |
| // ** valid_targets field: |
| // - will be ignored |
| // ** cache_size_bytes field: |
| // - must be greater than zero |
| // - TODO(easwars): Define a minimum value for this field, to be used when |
| // left unspecified |
| // * childPolicy field: |
| // - must find a valid child policy with a valid config (the child policy must |
| // be able to parse the provided config successfully when we pass it a dummy |
| // target name in the target_field provided by the |
| // childPolicyConfigTargetFieldName field) |
| // * childPolicyConfigTargetFieldName field: |
| // - must be set and non-empty |
| func (*rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { |
| cfgJSON := &lbConfigJSON{} |
| if err := json.Unmarshal(c, cfgJSON); err != nil { |
| return nil, fmt.Errorf("rls: json unmarshal failed for service config {%+v}: %v", string(c), err) |
| } |
| |
| m := jsonpb.Unmarshaler{AllowUnknownFields: true} |
| rlsProto := &rlspb.RouteLookupConfig{} |
| if err := m.Unmarshal(bytes.NewReader(cfgJSON.RouteLookupConfig), rlsProto); err != nil { |
| return nil, fmt.Errorf("rls: bad RouteLookupConfig proto {%+v}: %v", string(cfgJSON.RouteLookupConfig), err) |
| } |
| |
| var childPolicy *loadBalancingConfig |
| for _, lbcfg := range cfgJSON.ChildPolicy { |
| if balancer.Get(lbcfg.Name) != nil { |
| childPolicy = lbcfg |
| break |
| } |
| } |
| |
| kbMap, err := keys.MakeBuilderMap(rlsProto) |
| if err != nil { |
| return nil, err |
| } |
| |
| lookupService := rlsProto.GetLookupService() |
| if lookupService == "" { |
| return nil, fmt.Errorf("rls: empty lookup_service in service config {%+v}", string(c)) |
| } |
| parsedTarget := grpcutil.ParseTarget(lookupService, false) |
| if parsedTarget.Scheme == "" { |
| parsedTarget.Scheme = resolver.GetDefaultScheme() |
| } |
| if resolver.Get(parsedTarget.Scheme) == nil { |
| return nil, fmt.Errorf("rls: invalid target URI in lookup_service {%s}", lookupService) |
| } |
| |
| lookupServiceTimeout, err := convertDuration(rlsProto.GetLookupServiceTimeout()) |
| if err != nil { |
| return nil, fmt.Errorf("rls: failed to parse lookup_service_timeout in service config {%+v}: %v", string(c), err) |
| } |
| if lookupServiceTimeout == 0 { |
| lookupServiceTimeout = defaultLookupServiceTimeout |
| } |
| maxAge, err := convertDuration(rlsProto.GetMaxAge()) |
| if err != nil { |
| return nil, fmt.Errorf("rls: failed to parse max_age in service config {%+v}: %v", string(c), err) |
| } |
| staleAge, err := convertDuration(rlsProto.GetStaleAge()) |
| if err != nil { |
| return nil, fmt.Errorf("rls: failed to parse staleAge in service config {%+v}: %v", string(c), err) |
| } |
| if staleAge != 0 && maxAge == 0 { |
| return nil, fmt.Errorf("rls: stale_age is set, but max_age is not in service config {%+v}", string(c)) |
| } |
| if staleAge >= maxAge { |
| logger.Info("rls: stale_age {%v} is greater than max_age {%v}, ignoring it", staleAge, maxAge) |
| staleAge = 0 |
| } |
| if maxAge == 0 || maxAge > maxMaxAge { |
| logger.Infof("rls: max_age in service config is %v, using %v", maxAge, maxMaxAge) |
| maxAge = maxMaxAge |
| } |
| cacheSizeBytes := rlsProto.GetCacheSizeBytes() |
| if cacheSizeBytes <= 0 { |
| return nil, fmt.Errorf("rls: cache_size_bytes must be greater than 0 in service config {%+v}", string(c)) |
| } |
| if childPolicy == nil { |
| return nil, fmt.Errorf("rls: childPolicy is invalid in service config {%+v}", string(c)) |
| } |
| if cfgJSON.ChildPolicyConfigTargetFieldName == "" { |
| return nil, fmt.Errorf("rls: childPolicyConfigTargetFieldName field is not set in service config {%+v}", string(c)) |
| } |
| // TODO(easwars): When we start instantiating the child policy from the |
| // parent RLS LB policy, we could make this function a method on the |
| // lbConfig object and share the code. We would be parsing the child policy |
| // config again during that time. The only difference betweeen now and then |
| // would be that we would be using real targetField name instead of the |
| // dummy. So, we could make the targetName field a parameter to this |
| // function during the refactor. |
| cpCfg, err := validateChildPolicyConfig(childPolicy, cfgJSON.ChildPolicyConfigTargetFieldName) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &lbConfig{ |
| kbMap: kbMap, |
| lookupService: lookupService, |
| lookupServiceTimeout: lookupServiceTimeout, |
| maxAge: maxAge, |
| staleAge: staleAge, |
| cacheSizeBytes: cacheSizeBytes, |
| defaultTarget: rlsProto.GetDefaultTarget(), |
| // TODO(easwars): Once we refactor validateChildPolicyConfig and make |
| // it a method on the lbConfig object, we could directly store the |
| // balancer.Builder and/or balancer.ConfigParser here instead of the |
| // Name. That would mean that we would have to create the lbConfig |
| // object here first before validating the childPolicy config, but |
| // that's a minor detail. |
| cpName: childPolicy.Name, |
| cpTargetField: cfgJSON.ChildPolicyConfigTargetFieldName, |
| cpConfig: cpCfg, |
| }, nil |
| } |
| |
| // validateChildPolicyConfig validates the child policy config received in the |
| // service config. This makes it possible for us to reject service configs |
| // which contain invalid child policy configs which we know will fail for sure. |
| // |
| // It does the following: |
| // * Unmarshals the provided child policy config into a map of string to |
| // json.RawMessage. This allows us to add an entry to the map corresponding |
| // to the targetFieldName that we received in the service config. |
| // * Marshals the map back into JSON, finds the config parser associated with |
| // the child policy and asks it to validate the config. |
| // * If the validation succeeded, removes the dummy entry from the map and |
| // returns it. If any of the above steps failed, it returns an error. |
| func validateChildPolicyConfig(cp *loadBalancingConfig, cpTargetField string) (map[string]json.RawMessage, error) { |
| var childConfig map[string]json.RawMessage |
| if err := json.Unmarshal(cp.Config, &childConfig); err != nil { |
| return nil, fmt.Errorf("rls: json unmarshal failed for child policy config {%+v}: %v", cp.Config, err) |
| } |
| childConfig[cpTargetField], _ = json.Marshal(dummyChildPolicyTarget) |
| |
| jsonCfg, err := json.Marshal(childConfig) |
| if err != nil { |
| return nil, fmt.Errorf("rls: json marshal failed for child policy config {%+v}: %v", childConfig, err) |
| } |
| builder := balancer.Get(cp.Name) |
| if builder == nil { |
| // This should never happen since we already made sure that the child |
| // policy name mentioned in the service config is a valid one. |
| return nil, fmt.Errorf("rls: balancer builder not found for child_policy %v", cp.Name) |
| } |
| parser, ok := builder.(balancer.ConfigParser) |
| if !ok { |
| return nil, fmt.Errorf("rls: balancer builder for child_policy does not implement balancer.ConfigParser: %v", cp.Name) |
| } |
| _, err = parser.ParseConfig(jsonCfg) |
| if err != nil { |
| return nil, fmt.Errorf("rls: childPolicy config validation failed: %v", err) |
| } |
| delete(childConfig, cpTargetField) |
| return childConfig, nil |
| } |
| |
| func convertDuration(d *durationpb.Duration) (time.Duration, error) { |
| if d == nil { |
| return 0, nil |
| } |
| return ptypes.Duration(d) |
| } |