xds: Add RLS Cluster Specifier Plugin (#5004)
* xds: Add RLS Cluster Specifier Plugin
diff --git a/balancer/rls/rls.go b/balancer/rls/rls.go
new file mode 100644
index 0000000..473c3c1
--- /dev/null
+++ b/balancer/rls/rls.go
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package rls imports to init the rls lb policy for testing purposes.
+package rls
+
+import (
+ // Blank import to init the rls lb policy for external use.
+ _ "google.golang.org/grpc/balancer/rls/internal"
+)
diff --git a/xds/internal/clusterspecifier/rls/rls.go b/xds/internal/clusterspecifier/rls/rls.go
new file mode 100644
index 0000000..98b0d56
--- /dev/null
+++ b/xds/internal/clusterspecifier/rls/rls.go
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package rls implements the RLS cluster specifier plugin.
+package rls
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/internal/envconfig"
+ "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
+ "google.golang.org/grpc/xds/internal/clusterspecifier"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/types/known/anypb"
+
+ // Blank import to init the RLS LB policy.
+ _ "google.golang.org/grpc/balancer/rls"
+)
+
+const rlsBalancerName = "rls_experimental"
+
+func init() {
+ if envconfig.XDSRLS {
+ clusterspecifier.Register(rls{})
+ }
+}
+
+type rls struct{}
+
+func (rls) TypeURLs() []string {
+ return []string{"type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier"}
+}
+
+// lbConfigJSON is the RLS LB Policies configuration in JSON format.
+// RouteLookupConfig will be a raw JSON string from the passed in proto
+// configuration, and the other fields will be hardcoded.
+type lbConfigJSON struct {
+ RouteLookupConfig json.RawMessage `json:"routeLookupConfig"`
+ ChildPolicy []map[string]json.RawMessage `json:"childPolicy"`
+ ChildPolicyConfigTargetFieldName string `json:"childPolicyConfigTargetFieldName"`
+}
+
+func (rls) ParseClusterSpecifierConfig(cfg proto.Message) (clusterspecifier.BalancerConfig, error) {
+ if cfg == nil {
+ return nil, fmt.Errorf("rls_csp: nil configuration message provided")
+ }
+ any, ok := cfg.(*anypb.Any)
+ if !ok {
+ return nil, fmt.Errorf("rls_csp: error parsing config %v: unknown type %T", cfg, cfg)
+ }
+ rlcs := new(grpc_lookup_v1.RouteLookupClusterSpecifier)
+
+ if err := ptypes.UnmarshalAny(any, rlcs); err != nil {
+ return nil, fmt.Errorf("rls_csp: error parsing config %v: %v", cfg, err)
+ }
+ rlcJSON, err := protojson.Marshal(rlcs.GetRouteLookupConfig())
+ if err != nil {
+ return nil, fmt.Errorf("rls_csp: error marshaling route lookup config: %v: %v", rlcs.GetRouteLookupConfig(), err)
+ }
+ lbCfgJSON := &lbConfigJSON{
+ RouteLookupConfig: rlcJSON, // "JSON form of RouteLookupClusterSpecifier.config" - RLS in xDS Design Doc
+ ChildPolicy: []map[string]json.RawMessage{
+ {
+ "cds_experimental": json.RawMessage("{}"),
+ },
+ },
+ ChildPolicyConfigTargetFieldName: "cluster",
+ }
+
+ rawJSON, err := json.Marshal(lbCfgJSON)
+ if err != nil {
+ return nil, fmt.Errorf("rls_csp: error marshaling load balancing config %v: %v", lbCfgJSON, err)
+ }
+
+ rlsBB := balancer.Get(rlsBalancerName)
+ if rlsBB == nil {
+ return nil, fmt.Errorf("RLS LB policy not registered")
+ }
+ _, err = rlsBB.(balancer.ConfigParser).ParseConfig(rawJSON)
+ if err != nil {
+ return nil, fmt.Errorf("rls_csp: validation error from rls lb policy parsing %v", err)
+ }
+
+ return clusterspecifier.BalancerConfig{{rlsBalancerName: lbCfgJSON}}, nil
+}
diff --git a/xds/internal/clusterspecifier/rls/rls_test.go b/xds/internal/clusterspecifier/rls/rls_test.go
new file mode 100644
index 0000000..69bf165
--- /dev/null
+++ b/xds/internal/clusterspecifier/rls/rls_test.go
@@ -0,0 +1,168 @@
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package rls
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
+ _ "google.golang.org/grpc/balancer/rls"
+ "google.golang.org/grpc/internal/grpctest"
+ "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
+ "google.golang.org/grpc/internal/testutils"
+ _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
+ "google.golang.org/grpc/xds/internal/clusterspecifier"
+ "google.golang.org/protobuf/types/known/durationpb"
+)
+
+func init() {
+ clusterspecifier.Register(rls{})
+}
+
+type s struct {
+ grpctest.Tester
+}
+
+func Test(t *testing.T) {
+ grpctest.RunSubTests(t, s{})
+}
+
+// TestParseClusterSpecifierConfig tests the parsing functionality of the RLS
+// Cluster Specifier Plugin.
+func (s) TestParseClusterSpecifierConfig(t *testing.T) {
+ tests := []struct {
+ name string
+ rlcs proto.Message
+ wantConfig clusterspecifier.BalancerConfig
+ wantErr bool
+ }{
+ {
+ name: "invalid-rls-cluster-specifier",
+ rlcs: rlsClusterSpecifierConfigError,
+ wantErr: true,
+ },
+ {
+ name: "valid-rls-cluster-specifier",
+ rlcs: rlsClusterSpecifierConfigWithoutTransformations,
+ wantConfig: configWithoutTransformationsWant,
+ },
+ }
+ for _, test := range tests {
+ cs := clusterspecifier.Get("type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier")
+ if cs == nil {
+ t.Fatal("Error getting cluster specifier")
+ }
+ lbCfg, err := cs.ParseClusterSpecifierConfig(test.rlcs)
+
+ if (err != nil) != test.wantErr {
+ t.Fatalf("ParseClusterSpecifierConfig(%+v) returned err: %v, wantErr: %v", test.rlcs, err, test.wantErr)
+ }
+ if test.wantErr { // Successfully received an error.
+ return
+ }
+ // Marshal and then unmarshal into interface{} to get rid of
+ // nondeterministic protojson Marshaling.
+ lbCfgJSON, err := json.Marshal(lbCfg)
+ if err != nil {
+ t.Fatalf("json.Marshal(%+v) returned err %v", lbCfg, err)
+ }
+ var got interface{}
+ err = json.Unmarshal(lbCfgJSON, got)
+ if err != nil {
+ t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
+ }
+ wantCfgJSON, err := json.Marshal(test.wantConfig)
+ if err != nil {
+ t.Fatalf("json.Marshal(%+v) returned err %v", test.wantConfig, err)
+ }
+ var want interface{}
+ err = json.Unmarshal(wantCfgJSON, want)
+ if err != nil {
+ t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
+ }
+ if diff := cmp.Diff(want, got, cmpopts.EquateEmpty()); diff != "" {
+ t.Fatalf("ParseClusterSpecifierConfig(%+v) returned expected, diff (-want +got) %v", test.rlcs, diff)
+ }
+ }
+}
+
+// This will error because the required match field is set in grpc key builder.
+var rlsClusterSpecifierConfigError = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
+ RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
+ GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
+ {
+ Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
+ {
+ Service: "service",
+ Method: "method",
+ },
+ },
+ Headers: []*grpc_lookup_v1.NameMatcher{
+ {
+ Key: "k1",
+ RequiredMatch: true,
+ Names: []string{"v1"},
+ },
+ },
+ },
+ },
+ },
+})
+
+// Corresponds to the rls unit test case in
+// balancer/rls/internal/config_test.go.
+var rlsClusterSpecifierConfigWithoutTransformations = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
+ RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
+ GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
+ {
+ Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
+ {
+ Service: "service",
+ Method: "method",
+ },
+ },
+ Headers: []*grpc_lookup_v1.NameMatcher{
+ {
+ Key: "k1",
+ Names: []string{"v1"},
+ },
+ },
+ },
+ },
+ LookupService: "target",
+ LookupServiceTimeout: &durationpb.Duration{Seconds: 100},
+ MaxAge: &durationpb.Duration{Seconds: 60},
+ StaleAge: &durationpb.Duration{Seconds: 50},
+ CacheSizeBytes: 1000,
+ DefaultTarget: "passthrough:///default",
+ },
+})
+
+var configWithoutTransformationsWant = clusterspecifier.BalancerConfig{{"rls_experimental": &lbConfigJSON{
+ RouteLookupConfig: []byte(`{"grpcKeybuilders":[{"names":[{"service":"service","method":"method"}],"headers":[{"key":"k1","names":["v1"]}]}],"lookupService":"target","lookupServiceTimeout":"100s","maxAge":"60s","staleAge":"50s","cacheSizeBytes":"1000","defaultTarget":"passthrough:///default"}`),
+ ChildPolicy: []map[string]json.RawMessage{
+ {
+ "cds_experimental": []byte(`{}`),
+ },
+ },
+ ChildPolicyConfigTargetFieldName: "cluster",
+}}}