blob: 3b30c4ca3309dde05cb50d90173d4c2b9c8125f0 [file]
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/status/status.h"
#include "src/core/ext/transport/chaotic_good/client_transport.h"
#include "src/core/lib/transport/promise_endpoint.h"
#include "src/core/lib/transport/transport.h"
// IWYU pragma: no_include <sys/socket.h>
#include <stddef.h>
#include <algorithm> // IWYU pragma: keep
#include <memory>
#include <string> // IWYU pragma: keep
#include <tuple>
#include <utility>
#include <vector> // IWYU pragma: keep
#include "absl/functional/any_invocable.h"
#include "absl/status/statusor.h" // IWYU pragma: keep
#include "absl/strings/str_format.h" // IWYU pragma: keep
#include "absl/types/optional.h" // IWYU pragma: keep
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
#include <grpc/event_engine/slice.h> // IWYU pragma: keep
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h>
#include <grpc/status.h> // IWYU pragma: keep
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
using testing::MockFunction;
using testing::Return;
using testing::Sequence;
using testing::StrictMock;
using testing::WithArgs;
namespace grpc_core {
namespace chaotic_good {
namespace testing {
class MockEndpoint
: public grpc_event_engine::experimental::EventEngine::Endpoint {
public:
MOCK_METHOD(
bool, Read,
(absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args),
(override));
MOCK_METHOD(
bool, Write,
(absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args),
(override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetPeerAddress, (), (const, override));
MOCK_METHOD(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
GetLocalAddress, (), (const, override));
};
class ClientTransportTest : public ::testing::Test {
public:
ClientTransportTest()
: control_endpoint_ptr_(new StrictMock<MockEndpoint>()),
data_endpoint_ptr_(new StrictMock<MockEndpoint>()),
memory_allocator_(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"test")),
control_endpoint_(*control_endpoint_ptr_),
data_endpoint_(*data_endpoint_ptr_),
event_engine_(std::make_shared<
grpc_event_engine::experimental::FuzzingEventEngine>(
[]() {
grpc_timer_manager_set_threading(false);
grpc_event_engine::experimental::FuzzingEventEngine::Options
options;
return options;
}(),
fuzzing_event_engine::Actions())),
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
pipe_server_to_client_messages_(arena_.get()),
pipe_server_intial_metadata_(arena_.get()),
pipe_client_to_server_messages_second_(arena_.get()),
pipe_server_to_client_messages_second_(arena_.get()),
pipe_server_intial_metadata_second_(arena_.get()) {}
// Initial ClientTransport with read expecations
void InitialClientTransport() {
client_transport_ = std::make_unique<ClientTransport>(
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
SliceBuffer()),
std::make_unique<PromiseEndpoint>(
std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
event_engine_);
}
// Send messages from client to server.
auto SendClientToServerMessages(
Pipe<MessageHandle>& pipe_client_to_server_messages,
int num_of_messages) {
return Loop([&pipe_client_to_server_messages, num_of_messages,
this]() mutable {
bool has_message = (num_of_messages > 0);
return If(
has_message,
Seq(pipe_client_to_server_messages.sender.Push(
arena_->MakePooled<Message>()),
[&num_of_messages]() -> LoopCtl<absl::Status> {
num_of_messages--;
return Continue();
}),
[&pipe_client_to_server_messages]() mutable -> LoopCtl<absl::Status> {
pipe_client_to_server_messages.sender.Close();
return absl::OkStatus();
});
});
}
// Add stream into client transport, and expect return trailers of
// "grpc-status:code".
auto AddStream(CallArgs args) {
return client_transport_->AddStream(std::move(args));
}
private:
MockEndpoint* control_endpoint_ptr_;
MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_;
protected:
MockEndpoint& control_endpoint_;
MockEndpoint& data_endpoint_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
std::unique_ptr<ClientTransport> client_transport_;
ScopedArenaPtr arena_;
Pipe<MessageHandle> pipe_client_to_server_messages_;
Pipe<MessageHandle> pipe_server_to_client_messages_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
// Added for mutliple streams tests.
Pipe<MessageHandle> pipe_client_to_server_messages_second_;
Pipe<MessageHandle> pipe_server_to_client_messages_second_;
Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
absl::AnyInvocable<void(absl::Status)> read_callback_;
Sequence control_endpoint_sequence_;
Sequence data_endpoint_sequence_;
// Added to verify received message payload.
const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
};
TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
// Mock write failed and read is pending.
EXPECT_CALL(control_endpoint_, Write)
.WillOnce(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("control endpoint write failed."));
return false;
}));
EXPECT_CALL(data_endpoint_, Write)
.WillOnce(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("data endpoint write failed."));
return false;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(Return(false));
InitialClientTransport();
ClientMetadataHandle md;
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages in client transport.
Join(
// Add first stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(args)),
// Send messages to call_args.client_to_server_messages pipe,
// which will be eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
// Mock read failed.
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(WithArgs<0>(
[](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
on_read(absl::InternalError("control endpoint read failed."));
// Return false to mock EventEngine read not finish.
return false;
}));
InitialClientTransport();
ClientMetadataHandle md;
auto args = CallArgs{std::move(md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages in client transport.
Join(
// Add first stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(args)),
// Send messages to call_args.client_to_server_messages pipe.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
// Mock write failed at first stream and second stream's write will fail too.
EXPECT_CALL(control_endpoint_, Write)
.Times(1)
.WillRepeatedly(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("control endpoint write failed."));
return false;
}));
EXPECT_CALL(data_endpoint_, Write)
.Times(1)
.WillRepeatedly(
WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
on_write(absl::InternalError("data endpoint write failed."));
return false;
}));
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(Return(false));
InitialClientTransport();
ClientMetadataHandle first_stream_md;
ClientMetadataHandle second_stream_md;
auto first_stream_args =
CallArgs{std::move(first_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages from client transport.
Join(
// Add first stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(first_stream_args)),
// Send messages to first stream's
// call_args.client_to_server_messages pipe.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
},
Join(
// Add second stream with call_args into client transport.
// Expect return trailers "grpc-status:unavailable".
AddStream(std::move(second_stream_args)),
// Send messages to second stream's
// call_args.client_to_server_messages pipe.
SendClientToServerMessages(pipe_client_to_server_messages_second_,
1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
// Mock read failed at first stream, and second stream's write will fail too.
EXPECT_CALL(control_endpoint_, Read)
.InSequence(control_endpoint_sequence_)
.WillOnce(WithArgs<0>(
[](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
on_read(absl::InternalError("control endpoint read failed."));
// Return false to mock EventEngine read not finish.
return false;
}));
InitialClientTransport();
ClientMetadataHandle first_stream_md;
ClientMetadataHandle second_stream_md;
auto first_stream_args =
CallArgs{std::move(first_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender};
auto second_stream_args =
CallArgs{std::move(second_stream_md),
ClientInitialMetadataOutstandingToken::Empty(),
nullptr,
&pipe_server_intial_metadata_second_.sender,
&pipe_client_to_server_messages_second_.receiver,
&pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
auto activity = MakeActivity(
Seq(
// Concurrently: write and read messages from client transport.
Join(
// Add first stream with call_args into client transport.
AddStream(std::move(first_stream_args)),
// Send messages to first stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
},
Join(
// Add second stream with call_args into client transport.
AddStream(std::move(second_stream_args)),
// Send messages to second stream's
// call_args.client_to_server_messages pipe, which will be
// eventually sent to control/data endpoints.
SendClientToServerMessages(pipe_client_to_server_messages_second_,
1)),
// Once complete, verify successful sending and the received value.
[](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
GRPC_STATUS_UNAVAILABLE);
EXPECT_TRUE(std::get<1>(ret).ok());
return absl::OkStatus();
}),
EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
event_engine_->UnsetGlobalHooks();
}
} // namespace testing
} // namespace chaotic_good
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
// Must call to create default EventEngine.
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();
return ret;
}