xds: Add RLS Cluster Specifier Plugin (#5004)

* xds: Add RLS Cluster Specifier Plugin
diff --git a/balancer/rls/rls.go b/balancer/rls/rls.go
new file mode 100644
index 0000000..473c3c1
--- /dev/null
+++ b/balancer/rls/rls.go
@@ -0,0 +1,25 @@
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package rls imports to init the rls lb policy for testing purposes.
+package rls
+
+import (
+	// Blank import to init the rls lb policy for external use.
+	_ "google.golang.org/grpc/balancer/rls/internal"
+)
diff --git a/xds/internal/clusterspecifier/rls/rls.go b/xds/internal/clusterspecifier/rls/rls.go
new file mode 100644
index 0000000..98b0d56
--- /dev/null
+++ b/xds/internal/clusterspecifier/rls/rls.go
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package rls implements the RLS cluster specifier plugin.
+package rls
+
+import (
+	"encoding/json"
+	"fmt"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/internal/envconfig"
+	"google.golang.org/grpc/internal/proto/grpc_lookup_v1"
+	"google.golang.org/grpc/xds/internal/clusterspecifier"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/types/known/anypb"
+
+	// Blank import to init the RLS LB policy.
+	_ "google.golang.org/grpc/balancer/rls"
+)
+
+const rlsBalancerName = "rls_experimental"
+
+func init() {
+	if envconfig.XDSRLS {
+		clusterspecifier.Register(rls{})
+	}
+}
+
+type rls struct{}
+
+func (rls) TypeURLs() []string {
+	return []string{"type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier"}
+}
+
+// lbConfigJSON is the RLS LB Policies configuration in JSON format.
+// RouteLookupConfig will be a raw JSON string from the passed in proto
+// configuration, and the other fields will be hardcoded.
+type lbConfigJSON struct {
+	RouteLookupConfig                json.RawMessage              `json:"routeLookupConfig"`
+	ChildPolicy                      []map[string]json.RawMessage `json:"childPolicy"`
+	ChildPolicyConfigTargetFieldName string                       `json:"childPolicyConfigTargetFieldName"`
+}
+
+func (rls) ParseClusterSpecifierConfig(cfg proto.Message) (clusterspecifier.BalancerConfig, error) {
+	if cfg == nil {
+		return nil, fmt.Errorf("rls_csp: nil configuration message provided")
+	}
+	any, ok := cfg.(*anypb.Any)
+	if !ok {
+		return nil, fmt.Errorf("rls_csp: error parsing config %v: unknown type %T", cfg, cfg)
+	}
+	rlcs := new(grpc_lookup_v1.RouteLookupClusterSpecifier)
+
+	if err := ptypes.UnmarshalAny(any, rlcs); err != nil {
+		return nil, fmt.Errorf("rls_csp: error parsing config %v: %v", cfg, err)
+	}
+	rlcJSON, err := protojson.Marshal(rlcs.GetRouteLookupConfig())
+	if err != nil {
+		return nil, fmt.Errorf("rls_csp: error marshaling route lookup config: %v: %v", rlcs.GetRouteLookupConfig(), err)
+	}
+	lbCfgJSON := &lbConfigJSON{
+		RouteLookupConfig: rlcJSON, // "JSON form of RouteLookupClusterSpecifier.config" - RLS in xDS Design Doc
+		ChildPolicy: []map[string]json.RawMessage{
+			{
+				"cds_experimental": json.RawMessage("{}"),
+			},
+		},
+		ChildPolicyConfigTargetFieldName: "cluster",
+	}
+
+	rawJSON, err := json.Marshal(lbCfgJSON)
+	if err != nil {
+		return nil, fmt.Errorf("rls_csp: error marshaling load balancing config %v: %v", lbCfgJSON, err)
+	}
+
+	rlsBB := balancer.Get(rlsBalancerName)
+	if rlsBB == nil {
+		return nil, fmt.Errorf("RLS LB policy not registered")
+	}
+	_, err = rlsBB.(balancer.ConfigParser).ParseConfig(rawJSON)
+	if err != nil {
+		return nil, fmt.Errorf("rls_csp: validation error from rls lb policy parsing %v", err)
+	}
+
+	return clusterspecifier.BalancerConfig{{rlsBalancerName: lbCfgJSON}}, nil
+}
diff --git a/xds/internal/clusterspecifier/rls/rls_test.go b/xds/internal/clusterspecifier/rls/rls_test.go
new file mode 100644
index 0000000..69bf165
--- /dev/null
+++ b/xds/internal/clusterspecifier/rls/rls_test.go
@@ -0,0 +1,168 @@
+/*
+ *
+ * Copyright 2021 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package rls
+
+import (
+	"encoding/json"
+	"testing"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/google/go-cmp/cmp"
+	"github.com/google/go-cmp/cmp/cmpopts"
+	_ "google.golang.org/grpc/balancer/rls"
+	"google.golang.org/grpc/internal/grpctest"
+	"google.golang.org/grpc/internal/proto/grpc_lookup_v1"
+	"google.golang.org/grpc/internal/testutils"
+	_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
+	"google.golang.org/grpc/xds/internal/clusterspecifier"
+	"google.golang.org/protobuf/types/known/durationpb"
+)
+
+func init() {
+	clusterspecifier.Register(rls{})
+}
+
+type s struct {
+	grpctest.Tester
+}
+
+func Test(t *testing.T) {
+	grpctest.RunSubTests(t, s{})
+}
+
+// TestParseClusterSpecifierConfig tests the parsing functionality of the RLS
+// Cluster Specifier Plugin.
+func (s) TestParseClusterSpecifierConfig(t *testing.T) {
+	tests := []struct {
+		name       string
+		rlcs       proto.Message
+		wantConfig clusterspecifier.BalancerConfig
+		wantErr    bool
+	}{
+		{
+			name:    "invalid-rls-cluster-specifier",
+			rlcs:    rlsClusterSpecifierConfigError,
+			wantErr: true,
+		},
+		{
+			name:       "valid-rls-cluster-specifier",
+			rlcs:       rlsClusterSpecifierConfigWithoutTransformations,
+			wantConfig: configWithoutTransformationsWant,
+		},
+	}
+	for _, test := range tests {
+		cs := clusterspecifier.Get("type.googleapis.com/grpc.lookup.v1.RouteLookupClusterSpecifier")
+		if cs == nil {
+			t.Fatal("Error getting cluster specifier")
+		}
+		lbCfg, err := cs.ParseClusterSpecifierConfig(test.rlcs)
+
+		if (err != nil) != test.wantErr {
+			t.Fatalf("ParseClusterSpecifierConfig(%+v) returned err: %v, wantErr: %v", test.rlcs, err, test.wantErr)
+		}
+		if test.wantErr { // Successfully received an error.
+			return
+		}
+		// Marshal and then unmarshal into interface{} to get rid of
+		// nondeterministic protojson Marshaling.
+		lbCfgJSON, err := json.Marshal(lbCfg)
+		if err != nil {
+			t.Fatalf("json.Marshal(%+v) returned err %v", lbCfg, err)
+		}
+		var got interface{}
+		err = json.Unmarshal(lbCfgJSON, got)
+		if err != nil {
+			t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
+		}
+		wantCfgJSON, err := json.Marshal(test.wantConfig)
+		if err != nil {
+			t.Fatalf("json.Marshal(%+v) returned err %v", test.wantConfig, err)
+		}
+		var want interface{}
+		err = json.Unmarshal(wantCfgJSON, want)
+		if err != nil {
+			t.Fatalf("json.Unmarshal(%+v) returned err %v", lbCfgJSON, err)
+		}
+		if diff := cmp.Diff(want, got, cmpopts.EquateEmpty()); diff != "" {
+			t.Fatalf("ParseClusterSpecifierConfig(%+v) returned expected, diff (-want +got) %v", test.rlcs, diff)
+		}
+	}
+}
+
+// This will error because the required match field is set in grpc key builder.
+var rlsClusterSpecifierConfigError = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
+	RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
+		GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
+			{
+				Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
+					{
+						Service: "service",
+						Method:  "method",
+					},
+				},
+				Headers: []*grpc_lookup_v1.NameMatcher{
+					{
+						Key:           "k1",
+						RequiredMatch: true,
+						Names:         []string{"v1"},
+					},
+				},
+			},
+		},
+	},
+})
+
+// Corresponds to the rls unit test case in
+// balancer/rls/internal/config_test.go.
+var rlsClusterSpecifierConfigWithoutTransformations = testutils.MarshalAny(&grpc_lookup_v1.RouteLookupClusterSpecifier{
+	RouteLookupConfig: &grpc_lookup_v1.RouteLookupConfig{
+		GrpcKeybuilders: []*grpc_lookup_v1.GrpcKeyBuilder{
+			{
+				Names: []*grpc_lookup_v1.GrpcKeyBuilder_Name{
+					{
+						Service: "service",
+						Method:  "method",
+					},
+				},
+				Headers: []*grpc_lookup_v1.NameMatcher{
+					{
+						Key:   "k1",
+						Names: []string{"v1"},
+					},
+				},
+			},
+		},
+		LookupService:        "target",
+		LookupServiceTimeout: &durationpb.Duration{Seconds: 100},
+		MaxAge:               &durationpb.Duration{Seconds: 60},
+		StaleAge:             &durationpb.Duration{Seconds: 50},
+		CacheSizeBytes:       1000,
+		DefaultTarget:        "passthrough:///default",
+	},
+})
+
+var configWithoutTransformationsWant = clusterspecifier.BalancerConfig{{"rls_experimental": &lbConfigJSON{
+	RouteLookupConfig: []byte(`{"grpcKeybuilders":[{"names":[{"service":"service","method":"method"}],"headers":[{"key":"k1","names":["v1"]}]}],"lookupService":"target","lookupServiceTimeout":"100s","maxAge":"60s","staleAge":"50s","cacheSizeBytes":"1000","defaultTarget":"passthrough:///default"}`),
+	ChildPolicy: []map[string]json.RawMessage{
+		{
+			"cds_experimental": []byte(`{}`),
+		},
+	},
+	ChildPolicyConfigTargetFieldName: "cluster",
+}}}