| /* |
| * Copyright (C) 2020 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #define TRACE_TAG INCREMENTAL |
| |
| #include "incremental_server.h" |
| |
| #include "adb.h" |
| #include "adb_io.h" |
| #include "adb_trace.h" |
| #include "adb_unique_fd.h" |
| #include "adb_utils.h" |
| #include "sysdeps.h" |
| |
| #include <android-base/endian.h> |
| #include <android-base/strings.h> |
| #include <inttypes.h> |
| #include <lz4.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <unistd.h> |
| |
| #include <array> |
| #include <deque> |
| #include <fstream> |
| #include <thread> |
| #include <type_traits> |
| #include <unordered_set> |
| |
| namespace incremental { |
| |
| static constexpr int kBlockSize = 4096; |
| static constexpr int kCompressedSizeMax = kBlockSize * 0.95; |
| static constexpr short kCompressionNone = 0; |
| static constexpr short kCompressionLZ4 = 1; |
| static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize)); |
| static constexpr auto kReadBufferSize = 128 * 1024; |
| |
| using BlockSize = int16_t; |
| using FileId = int16_t; |
| using BlockIdx = int32_t; |
| using NumBlocks = int32_t; |
| using CompressionType = int16_t; |
| using RequestType = int16_t; |
| using ChunkHeader = int32_t; |
| using MagicType = uint32_t; |
| |
| static constexpr MagicType INCR = 0x494e4352; // LE INCR |
| |
| static constexpr RequestType EXIT = 0; |
| static constexpr RequestType BLOCK_MISSING = 1; |
| static constexpr RequestType PREFETCH = 2; |
| |
| static constexpr inline int64_t roundDownToBlockOffset(int64_t val) { |
| return val & ~(kBlockSize - 1); |
| } |
| |
| static constexpr inline int64_t roundUpToBlockOffset(int64_t val) { |
| return roundDownToBlockOffset(val + kBlockSize - 1); |
| } |
| |
| static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) { |
| return roundUpToBlockOffset(bytes) / kBlockSize; |
| } |
| |
| static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) { |
| return static_cast<off64_t>(blockIdx) * kBlockSize; |
| } |
| |
| template <typename T> |
| static inline constexpr T toBigEndian(T t) { |
| using unsigned_type = std::make_unsigned_t<T>; |
| if constexpr (std::is_same_v<T, int16_t>) { |
| return htobe16(static_cast<unsigned_type>(t)); |
| } else if constexpr (std::is_same_v<T, int32_t>) { |
| return htobe32(static_cast<unsigned_type>(t)); |
| } else if constexpr (std::is_same_v<T, int64_t>) { |
| return htobe64(static_cast<unsigned_type>(t)); |
| } else { |
| return t; |
| } |
| } |
| |
| template <typename T> |
| static inline constexpr T readBigEndian(void* data) { |
| using unsigned_type = std::make_unsigned_t<T>; |
| if constexpr (std::is_same_v<T, int16_t>) { |
| return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data))); |
| } else if constexpr (std::is_same_v<T, int32_t>) { |
| return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data))); |
| } else if constexpr (std::is_same_v<T, int64_t>) { |
| return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data))); |
| } else { |
| return T(); |
| } |
| } |
| |
| // Received from device |
| // !Does not include magic! |
| struct RequestCommand { |
| RequestType request_type; // 2 bytes |
| FileId file_id; // 2 bytes |
| union { |
| BlockIdx block_idx; |
| NumBlocks num_blocks; |
| }; // 4 bytes |
| } __attribute__((packed)); |
| |
| // Placed before actual data bytes of each block |
| struct ResponseHeader { |
| FileId file_id; // 2 bytes |
| CompressionType compression_type; // 2 bytes |
| BlockIdx block_idx; // 4 bytes |
| BlockSize block_size; // 2 bytes |
| } __attribute__((packed)); |
| |
| // Holds streaming state for a file |
| class File { |
| public: |
| // Plain file |
| File(const char* filepath, FileId id, int64_t size, unique_fd fd) : File(filepath, id, size) { |
| this->fd_ = std::move(fd); |
| } |
| int64_t ReadBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed, |
| std::string* error) const { |
| char* buf_ptr = static_cast<char*>(buf); |
| int64_t bytes_read = -1; |
| const off64_t offsetStart = blockIndexToOffset(block_idx); |
| bytes_read = adb_pread(fd_, &buf_ptr[sizeof(ResponseHeader)], kBlockSize, offsetStart); |
| return bytes_read; |
| } |
| |
| const unique_fd& RawFd() const { return fd_; } |
| |
| std::vector<bool> sentBlocks; |
| NumBlocks sentBlocksCount = 0; |
| |
| const char* const filepath; |
| const FileId id; |
| const int64_t size; |
| |
| private: |
| File(const char* filepath, FileId id, int64_t size) : filepath(filepath), id(id), size(size) { |
| sentBlocks.resize(numBytesToNumBlocks(size)); |
| } |
| unique_fd fd_; |
| }; |
| |
| class IncrementalServer { |
| public: |
| IncrementalServer(unique_fd fd, std::vector<File> files) |
| : adb_fd_(std::move(fd)), files_(std::move(files)) { |
| buffer_.reserve(kReadBufferSize); |
| } |
| |
| bool Serve(); |
| |
| private: |
| struct PrefetchState { |
| const File* file; |
| BlockIdx overallIndex = 0; |
| BlockIdx overallEnd = 0; |
| |
| PrefetchState(const File& f) : file(&f), overallEnd((BlockIdx)f.sentBlocks.size()) {} |
| PrefetchState(const File& f, BlockIdx start, int count) |
| : file(&f), |
| overallIndex(start), |
| overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {} |
| |
| bool done() const { return overallIndex >= overallEnd; } |
| }; |
| |
| bool SkipToRequest(void* buffer, size_t* size, bool blocking); |
| std::optional<RequestCommand> ReadRequest(bool blocking); |
| |
| void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); } |
| |
| enum class SendResult { Sent, Skipped, Error }; |
| SendResult SendBlock(FileId fileId, BlockIdx blockIdx, bool flush = false); |
| bool SendDone(); |
| void RunPrefetching(); |
| |
| void Send(const void* data, size_t size, bool flush); |
| void Flush(); |
| using TimePoint = decltype(std::chrono::high_resolution_clock::now()); |
| bool Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent); |
| |
| unique_fd const adb_fd_; |
| std::vector<File> files_; |
| |
| // Incoming data buffer. |
| std::vector<char> buffer_; |
| |
| std::deque<PrefetchState> prefetches_; |
| int compressed_ = 0, uncompressed_ = 0; |
| long long sentSize_ = 0; |
| |
| std::vector<char> pendingBlocks_; |
| }; |
| |
| bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) { |
| while (true) { |
| // Looking for INCR magic. |
| bool magic_found = false; |
| int bcur = 0; |
| for (int bsize = buffer_.size(); bcur + 4 < bsize; ++bcur) { |
| uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur)); |
| if (magic == INCR) { |
| magic_found = true; |
| break; |
| } |
| } |
| |
| if (bcur > 0) { |
| // Stream the rest to stderr. |
| fprintf(stderr, "%.*s", bcur, buffer_.data()); |
| erase_buffer_head(bcur); |
| } |
| |
| if (magic_found && buffer_.size() >= *size + sizeof(INCR)) { |
| // fine, return |
| memcpy(buffer, buffer_.data() + sizeof(INCR), *size); |
| erase_buffer_head(*size + sizeof(INCR)); |
| return true; |
| } |
| |
| adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0}; |
| auto res = adb_poll(&pfd, 1, blocking ? -1 : 0); |
| if (res != 1) { |
| if (res < 0) { |
| fprintf(stderr, "Failed to poll: %s\n", strerror(errno)); |
| return false; |
| } |
| *size = 0; |
| return true; |
| } |
| |
| auto bsize = buffer_.size(); |
| buffer_.resize(kReadBufferSize); |
| int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize); |
| if (r > 0) { |
| buffer_.resize(bsize + r); |
| continue; |
| } |
| |
| if (r == -1) { |
| fprintf(stderr, "Failed to read from fd %d: %d. Exit\n", adb_fd_.get(), errno); |
| return false; |
| } |
| |
| // socket is closed |
| return false; |
| } |
| } |
| |
| std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) { |
| uint8_t commandBuf[sizeof(RequestCommand)]; |
| auto size = sizeof(commandBuf); |
| if (!SkipToRequest(&commandBuf, &size, blocking)) { |
| return {{EXIT}}; |
| } |
| if (size < sizeof(RequestCommand)) { |
| return {}; |
| } |
| RequestCommand request; |
| request.request_type = readBigEndian<RequestType>(&commandBuf[0]); |
| request.file_id = readBigEndian<FileId>(&commandBuf[2]); |
| request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]); |
| return request; |
| } |
| |
| auto IncrementalServer::SendBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult { |
| auto& file = files_[fileId]; |
| if (blockIdx >= static_cast<long>(file.sentBlocks.size())) { |
| fprintf(stderr, "Failed to read file %s at block %" PRId32 " (past end).\n", file.filepath, |
| blockIdx); |
| return SendResult::Error; |
| } |
| if (file.sentBlocks[blockIdx]) { |
| return SendResult::Skipped; |
| } |
| std::string error; |
| char raw[sizeof(ResponseHeader) + kBlockSize]; |
| bool isZipCompressed = false; |
| const int64_t bytesRead = file.ReadBlock(blockIdx, &raw, &isZipCompressed, &error); |
| if (bytesRead < 0) { |
| fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%s).\n", file.filepath, blockIdx, |
| error.c_str()); |
| return SendResult::Error; |
| } |
| |
| ResponseHeader* header = nullptr; |
| char data[sizeof(ResponseHeader) + kCompressBound]; |
| char* compressed = data + sizeof(*header); |
| int16_t compressedSize = 0; |
| if (!isZipCompressed) { |
| compressedSize = |
| LZ4_compress_default(raw + sizeof(*header), compressed, bytesRead, kCompressBound); |
| } |
| int16_t blockSize; |
| if (compressedSize > 0 && compressedSize < kCompressedSizeMax) { |
| ++compressed_; |
| blockSize = compressedSize; |
| header = reinterpret_cast<ResponseHeader*>(data); |
| header->compression_type = toBigEndian(kCompressionLZ4); |
| } else { |
| ++uncompressed_; |
| blockSize = bytesRead; |
| header = reinterpret_cast<ResponseHeader*>(raw); |
| header->compression_type = toBigEndian(kCompressionNone); |
| } |
| |
| header->file_id = toBigEndian(fileId); |
| header->block_size = toBigEndian(blockSize); |
| header->block_idx = toBigEndian(blockIdx); |
| |
| file.sentBlocks[blockIdx] = true; |
| file.sentBlocksCount += 1; |
| Send(header, sizeof(*header) + blockSize, flush); |
| return SendResult::Sent; |
| } |
| |
| bool IncrementalServer::SendDone() { |
| ResponseHeader header; |
| header.file_id = -1; |
| header.compression_type = 0; |
| header.block_idx = 0; |
| header.block_size = 0; |
| Send(&header, sizeof(header), true); |
| return true; |
| } |
| |
| void IncrementalServer::RunPrefetching() { |
| constexpr auto kPrefetchBlocksPerIteration = 128; |
| |
| int blocksToSend = kPrefetchBlocksPerIteration; |
| while (!prefetches_.empty() && blocksToSend > 0) { |
| auto& prefetch = prefetches_.front(); |
| const auto& file = *prefetch.file; |
| for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) { |
| if (auto res = SendBlock(file.id, i); res == SendResult::Sent) { |
| --blocksToSend; |
| } else if (res == SendResult::Error) { |
| fprintf(stderr, "Failed to send block %" PRId32 "\n", i); |
| } |
| } |
| if (prefetch.done()) { |
| prefetches_.pop_front(); |
| } |
| } |
| } |
| |
| void IncrementalServer::Send(const void* data, size_t size, bool flush) { |
| constexpr auto kChunkFlushSize = 31 * kBlockSize; |
| |
| if (pendingBlocks_.empty()) { |
| pendingBlocks_.resize(sizeof(ChunkHeader)); |
| } |
| pendingBlocks_.insert(pendingBlocks_.end(), static_cast<const char*>(data), |
| static_cast<const char*>(data) + size); |
| if (flush || pendingBlocks_.size() > kChunkFlushSize) { |
| Flush(); |
| } |
| } |
| |
| void IncrementalServer::Flush() { |
| if (pendingBlocks_.empty()) { |
| return; |
| } |
| |
| *(ChunkHeader*)pendingBlocks_.data() = |
| toBigEndian<int32_t>(pendingBlocks_.size() - sizeof(ChunkHeader)); |
| if (!WriteFdExactly(adb_fd_, pendingBlocks_.data(), pendingBlocks_.size())) { |
| fprintf(stderr, "Failed to write %d bytes\n", int(pendingBlocks_.size())); |
| } |
| sentSize_ += pendingBlocks_.size(); |
| pendingBlocks_.clear(); |
| } |
| |
| bool IncrementalServer::Exit(std::optional<TimePoint> startTime, int missesCount, int missesSent) { |
| using namespace std::chrono; |
| auto endTime = high_resolution_clock::now(); |
| fprintf(stderr, |
| "Connection failed or received exit command. Exit.\n" |
| "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: " |
| "%d, mb: %.3f\n" |
| "Total time taken: %.3fms\n", |
| missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0, |
| duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / |
| 1000.0); |
| return true; |
| } |
| |
| bool IncrementalServer::Serve() { |
| // Initial handshake to verify connection is still alive |
| if (!SendOkay(adb_fd_)) { |
| fprintf(stderr, "Connection is dead. Abort.\n"); |
| return false; |
| } |
| |
| std::unordered_set<FileId> prefetchedFiles; |
| bool doneSent = false; |
| int missesCount = 0; |
| int missesSent = 0; |
| |
| using namespace std::chrono; |
| std::optional<TimePoint> startTime; |
| |
| while (true) { |
| if (!doneSent && prefetches_.empty() && |
| std::all_of(files_.begin(), files_.end(), [](const File& f) { |
| return f.sentBlocksCount == NumBlocks(f.sentBlocks.size()); |
| })) { |
| fprintf(stdout, "All files should be loaded. Notifying the device.\n"); |
| SendDone(); |
| doneSent = true; |
| } |
| |
| const bool blocking = prefetches_.empty(); |
| if (blocking) { |
| // We've no idea how long the blocking call is, so let's flush whatever is still unsent. |
| Flush(); |
| } |
| auto request = ReadRequest(blocking); |
| |
| if (!startTime) { |
| startTime = high_resolution_clock::now(); |
| } |
| |
| if (request) { |
| FileId fileId = request->file_id; |
| BlockIdx blockIdx = request->block_idx; |
| |
| switch (request->request_type) { |
| case EXIT: { |
| // Stop everything. |
| return Exit(startTime, missesCount, missesSent); |
| } |
| case BLOCK_MISSING: { |
| ++missesCount; |
| // Sends one single block ASAP. |
| if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 || |
| blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) { |
| fprintf(stderr, |
| "Received invalid data request for file_id %" PRId16 |
| " block_idx %" PRId32 ".\n", |
| fileId, blockIdx); |
| break; |
| } |
| // fprintf(stderr, "\treading file %d block %04d\n", (int)fileId, |
| // (int)blockIdx); |
| if (auto res = SendBlock(fileId, blockIdx, true); res == SendResult::Error) { |
| fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx); |
| } else if (res == SendResult::Sent) { |
| ++missesSent; |
| // Make sure we send more pages from this place onward, in case if the OS is |
| // reading a bigger block. |
| prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7); |
| } |
| break; |
| } |
| case PREFETCH: { |
| // Start prefetching for a file |
| if (fileId < 0) { |
| fprintf(stderr, |
| "Received invalid prefetch request for file_id %" PRId16 "\n", |
| fileId); |
| break; |
| } |
| if (!prefetchedFiles.insert(fileId).second) { |
| fprintf(stderr, |
| "Received duplicate prefetch request for file_id %" PRId16 "\n", |
| fileId); |
| break; |
| } |
| D("Received prefetch request for file_id %" PRId16 ".\n", fileId); |
| prefetches_.emplace_back(files_[fileId]); |
| break; |
| } |
| default: |
| fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n", |
| request->request_type, fileId, blockIdx); |
| break; |
| } |
| } |
| |
| RunPrefetching(); |
| } |
| } |
| |
| bool serve(int adb_fd, int argc, const char** argv) { |
| auto connection_fd = unique_fd(adb_fd); |
| if (argc <= 0) { |
| error_exit("inc-server: must specify at least one file."); |
| } |
| |
| std::vector<File> files; |
| files.reserve(argc); |
| for (int i = 0; i < argc; ++i) { |
| auto filepath = argv[i]; |
| |
| struct stat st; |
| if (stat(filepath, &st)) { |
| fprintf(stderr, "Failed to stat input file %s. Abort.\n", filepath); |
| return {}; |
| } |
| |
| unique_fd fd(adb_open(filepath, O_RDONLY)); |
| if (fd < 0) { |
| error_exit("inc-server: failed to open file '%s'.", filepath); |
| } |
| files.emplace_back(filepath, i, st.st_size, std::move(fd)); |
| } |
| |
| IncrementalServer server(std::move(connection_fd), std::move(files)); |
| printf("Serving...\n"); |
| fclose(stdin); |
| fclose(stdout); |
| return server.Serve(); |
| } |
| |
| } // namespace incremental |