blob: f77a626d7f5336a742dc5b981706619e2e3f34dd [file] [log] [blame]
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifdef GPR_APPLE
#include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/gprpp/strerror.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
int kDefaultReadBufferSize = 8192;
absl::Status CFErrorToStatus(CFTypeUniqueRef<CFErrorRef> cf_error) {
if (cf_error == nullptr) {
return absl::OkStatus();
}
CFErrorDomain cf_domain = CFErrorGetDomain((cf_error));
CFIndex code = CFErrorGetCode((cf_error));
CFTypeUniqueRef<CFStringRef> cf_desc = CFErrorCopyDescription((cf_error));
char domain_buf[256];
char desc_buf[256];
CFStringGetCString(cf_domain, domain_buf, 256, kCFStringEncodingUTF8);
CFStringGetCString(cf_desc, desc_buf, 256, kCFStringEncodingUTF8);
return absl::Status(absl::StatusCode::kUnknown,
absl::StrFormat("(domain:%s, code:%ld, description:%s)",
domain_buf, code, desc_buf));
}
absl::StatusOr<EventEngine::ResolvedAddress> CFReadStreamLocallAddress(
CFReadStreamRef stream) {
CFTypeUniqueRef<CFDataRef> cf_native_handle = static_cast<CFDataRef>(
CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle));
CFSocketNativeHandle socket;
CFDataGetBytes(cf_native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
(UInt8*)&socket);
EventEngine::ResolvedAddress addr;
socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
if (getsockname(socket, const_cast<sockaddr*>(addr.address()), &len) < 0) {
return absl::InternalError(
absl::StrCat("getsockname:", grpc_core::StrError(errno)));
}
return EventEngine::ResolvedAddress(addr.address(), len);
}
} // namespace
bool CFStreamEndpointImpl::CancelConnect(absl::Status status) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
"CFStreamEndpointImpl::CancelConnect: status: %s, this: %p",
status.ToString().c_str(), this);
return open_event_.SetShutdown(std::move(status));
}
void CFStreamEndpointImpl::Connect(
absl::AnyInvocable<void(absl::Status)> on_connect,
EventEngine::ResolvedAddress addr) {
auto addr_uri = ResolvedAddressToURI(addr);
if (!addr_uri.ok()) {
on_connect(std::move(addr_uri).status());
return;
}
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Connect: %s",
addr_uri.value().c_str());
peer_address_ = std::move(addr);
auto host_port = ResolvedAddressToNormalizedString(peer_address_);
if (!host_port.ok()) {
on_connect(std::move(host_port).status());
return;
}
peer_address_string_ = host_port.value();
GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
"CFStreamEndpointImpl::Connect, host_port: %s", host_port->c_str());
std::string host_string;
std::string port_string;
grpc_core::SplitHostPort(host_port.value(), &host_string, &port_string);
CFStringRef host = CFStringCreateWithCString(NULL, host_string.c_str(),
kCFStringEncodingUTF8);
int port = ResolvedAddressGetPort(peer_address_);
CFStreamCreatePairWithSocketToHost(NULL, host, port, &cf_read_stream_,
&cf_write_stream_);
CFStreamClientContext cf_context = {0, this, Retain, Release, nullptr};
CFReadStreamSetClient(
cf_read_stream_,
kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
ReadCallback, &cf_context);
CFWriteStreamSetClient(
cf_write_stream_,
kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
WriteCallback, &cf_context);
CFReadStreamSetDispatchQueue(cf_read_stream_,
dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
CFWriteStreamSetDispatchQueue(
cf_write_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
if (!CFReadStreamOpen(cf_read_stream_)) {
auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
on_connect(std::move(status));
return;
}
if (!CFWriteStreamOpen(cf_write_stream_)) {
auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_));
on_connect(std::move(status));
return;
}
open_event_.NotifyOn(new PosixEngineClosure(
[that = Ref(),
on_connect = std::move(on_connect)](absl::Status status) mutable {
if (!status.ok()) {
on_connect(std::move(status));
return;
}
auto local_addr = CFReadStreamLocallAddress(that->cf_read_stream_);
if (!local_addr.ok()) {
on_connect(std::move(local_addr).status());
return;
}
that->local_address_ = local_addr.value();
that->local_address_string_ =
*ResolvedAddressToURI(that->local_address_);
on_connect(absl::OkStatus());
},
false /* is_permanent */));
}
/* static */ void CFStreamEndpointImpl::ReadCallback(
CFReadStreamRef stream, CFStreamEventType type,
void* client_callback_info) {
auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
"CFStreamEndpointImpl::ReadCallback, type: %lu, this: %p", type, self);
switch (type) {
case kCFStreamEventOpenCompleted:
// wait for write stream open completed to signal connection ready
break;
case kCFStreamEventHasBytesAvailable:
ABSL_FALLTHROUGH_INTENDED;
case kCFStreamEventEndEncountered:
self->read_event_.SetReady();
break;
case kCFStreamEventErrorOccurred: {
auto status = CFErrorToStatus(CFReadStreamCopyError(stream));
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Read error: %s",
status.ToString().c_str());
self->open_event_.SetShutdown(status);
self->read_event_.SetShutdown(status);
self->write_event_.SetShutdown(status);
} break;
default:
GPR_UNREACHABLE_CODE(return);
}
}
/* static */
void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream,
CFStreamEventType type,
void* client_callback_info) {
auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info);
GRPC_EVENT_ENGINE_ENDPOINT_TRACE(
"CFStreamEndpointImpl::WriteCallback, type: %lu, this: %p", type, self);
switch (type) {
case kCFStreamEventOpenCompleted:
self->open_event_.SetReady();
break;
case kCFStreamEventCanAcceptBytes:
ABSL_FALLTHROUGH_INTENDED;
case kCFStreamEventEndEncountered:
self->write_event_.SetReady();
break;
case kCFStreamEventErrorOccurred: {
auto status = CFErrorToStatus(CFWriteStreamCopyError(stream));
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Write error: %s",
status.ToString().c_str());
self->open_event_.SetShutdown(status);
self->read_event_.SetShutdown(status);
self->write_event_.SetShutdown(status);
} break;
default:
GPR_UNREACHABLE_CODE(return);
}
}
CFStreamEndpointImpl::CFStreamEndpointImpl(
std::shared_ptr<CFEventEngine> engine, MemoryAllocator memory_allocator)
: engine_(std::move(engine)),
memory_allocator_(std::move(memory_allocator)),
open_event_(engine_.get()),
read_event_(engine_.get()),
write_event_(engine_.get()) {
open_event_.InitEvent();
read_event_.InitEvent();
write_event_.InitEvent();
}
CFStreamEndpointImpl::~CFStreamEndpointImpl() {
open_event_.DestroyEvent();
read_event_.DestroyEvent();
write_event_.DestroyEvent();
}
void CFStreamEndpointImpl::Shutdown() {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Shutdown: this: %p",
this);
auto shutdownStatus =
absl::Status(absl::StatusCode::kUnknown,
absl::StrFormat("Shutting down CFStreamEndpointImpl"));
open_event_.SetShutdown(shutdownStatus);
read_event_.SetShutdown(shutdownStatus);
write_event_.SetShutdown(shutdownStatus);
CFReadStreamSetClient(cf_read_stream_, kCFStreamEventNone, nullptr, nullptr);
CFWriteStreamSetClient(cf_write_stream_, kCFStreamEventNone, nullptr,
nullptr);
CFReadStreamClose(cf_read_stream_);
CFWriteStreamClose(cf_write_stream_);
}
bool CFStreamEndpointImpl::Read(
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const EventEngine::Endpoint::ReadArgs* /* args */) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Read, this: %p",
this);
read_event_.NotifyOn(new PosixEngineClosure(
[that = Ref(), on_read = std::move(on_read),
buffer](absl::Status status) mutable {
if (status.ok()) {
that->DoRead(std::move(on_read), buffer);
} else {
on_read(status);
}
},
false /* is_permanent*/));
return false;
}
void CFStreamEndpointImpl::DoRead(
absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoRead, this: %p",
this);
auto buffer_index = buffer->AppendIndexed(
Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize)));
CFIndex read_size = CFReadStreamRead(
cf_read_stream_,
internal::SliceCast<MutableSlice>(buffer->MutableSliceAt(buffer_index))
.begin(),
kDefaultReadBufferSize);
if (read_size < 0) {
auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream read error: %s, read_size: %ld",
status.ToString().c_str(), read_size);
on_read(status);
return;
}
buffer->RemoveLastNBytes(buffer->Length() - read_size);
on_read(absl::OkStatus());
}
bool CFStreamEndpointImpl::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const EventEngine::Endpoint::WriteArgs* /* args */) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Write, this: %p",
this);
write_event_.NotifyOn(new PosixEngineClosure(
[that = Ref(), on_writable = std::move(on_writable),
data](absl::Status status) mutable {
if (status.ok()) {
that->DoWrite(std::move(on_writable), data);
} else {
on_writable(status);
}
},
false /* is_permanent*/));
return false;
}
void CFStreamEndpointImpl::DoWrite(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoWrite, this: %p",
this);
size_t total_written_size = 0;
for (size_t i = 0; i < data->Count(); i++) {
auto slice = data->RefSlice(i);
size_t written_size =
CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size());
total_written_size += written_size;
if (written_size < slice.size()) {
SliceBuffer written;
data->MoveFirstNBytesIntoSliceBuffer(total_written_size, written);
write_event_.NotifyOn(new PosixEngineClosure(
[that = Ref(), on_writable = std::move(on_writable),
data](absl::Status status) mutable {
if (status.ok()) {
that->DoWrite(std::move(on_writable), data);
} else {
on_writable(status);
}
},
false /* is_permanent*/));
return;
}
}
on_writable(absl::OkStatus());
}
} // namespace experimental
} // namespace grpc_event_engine
#endif // GPR_APPLE