blob: 7790a1f739ce4806b7384aa1058b97e43ebb8431 [file] [log] [blame]
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/trace_processor/dynamic/thread_state_generator.h"
#include <memory>
#include <set>
#include "src/trace_processor/types/trace_processor_context.h"
namespace perfetto {
namespace trace_processor {
ThreadStateGenerator::ThreadStateGenerator(TraceProcessorContext* context)
: running_string_id_(context->storage->InternString("Running")),
runnable_string_id_(context->storage->InternString("R")),
context_(context) {}
ThreadStateGenerator::~ThreadStateGenerator() = default;
base::Status ThreadStateGenerator::ValidateConstraints(
const QueryConstraints&) {
return base::OkStatus();
}
base::Status ThreadStateGenerator::ComputeTable(
const std::vector<Constraint>&,
const std::vector<Order>&,
const BitVector&,
std::unique_ptr<Table>& table_return) {
if (!unsorted_thread_state_table_) {
int64_t trace_end_ts =
context_->storage->GetTraceTimestampBoundsNs().second;
unsorted_thread_state_table_ = ComputeThreadStateTable(trace_end_ts);
// We explicitly sort by ts here as ComputeThreadStateTable does not insert
// rows in sorted order but we expect our clients to always want to sort
// on ts. Writing ComputeThreadStateTable to insert in sorted order is
// more trouble than its worth.
sorted_thread_state_table_ = unsorted_thread_state_table_->Sort(
{unsorted_thread_state_table_->ts().ascending()});
}
// TODO(rsavitski): return base::ErrStatus instead?
PERFETTO_CHECK(sorted_thread_state_table_);
table_return =
std::unique_ptr<Table>(new Table(sorted_thread_state_table_->Copy()));
return base::OkStatus();
}
std::unique_ptr<tables::ThreadStateTable>
ThreadStateGenerator::ComputeThreadStateTable(int64_t trace_end_ts) {
std::unique_ptr<tables::ThreadStateTable> table(new tables::ThreadStateTable(
context_->storage->mutable_string_pool(), nullptr));
const auto& raw_sched = context_->storage->sched_slice_table();
const auto& instants = context_->storage->instant_table();
// In both tables, exclude utid == 0 which represents the idle thread.
Table sched = raw_sched.Filter({raw_sched.utid().ne(0)},
RowMap::OptimizeFor::kLookupSpeed);
Table waking = instants.Filter(
{instants.name().eq("sched_waking"), instants.ref().ne(0)},
RowMap::OptimizeFor::kLookupSpeed);
// We prefer to use waking if at all possible and fall back to wakeup if not
// available.
if (waking.row_count() == 0) {
waking = instants.Filter(
{instants.name().eq("sched_wakeup"), instants.ref().ne(0)},
RowMap::OptimizeFor::kLookupSpeed);
}
Table sched_blocked_reason = instants.Filter(
{instants.name().eq("sched_blocked_reason"), instants.ref().ne(0)},
RowMap::OptimizeFor::kLookupSpeed);
const auto& sched_ts_col = sched.GetTypedColumnByName<int64_t>("ts");
const auto& waking_ts_col = waking.GetTypedColumnByName<int64_t>("ts");
const auto& blocked_ts_col =
sched_blocked_reason.GetTypedColumnByName<int64_t>("ts");
uint32_t sched_idx = 0;
uint32_t waking_idx = 0;
uint32_t blocked_idx = 0;
TidInfoMap state_map(/*initial_capacity=*/1024);
while (sched_idx < sched.row_count() || waking_idx < waking.row_count() ||
blocked_idx < sched_blocked_reason.row_count()) {
int64_t sched_ts = sched_idx < sched.row_count()
? sched_ts_col[sched_idx]
: std::numeric_limits<int64_t>::max();
int64_t waking_ts = waking_idx < waking.row_count()
? waking_ts_col[waking_idx]
: std::numeric_limits<int64_t>::max();
int64_t blocked_ts = blocked_idx < sched_blocked_reason.row_count()
? blocked_ts_col[blocked_idx]
: std::numeric_limits<int64_t>::max();
// We go through all tables, picking the earliest timestamp from any
// to process that event.
int64_t min_ts = std::min({sched_ts, waking_ts, blocked_ts});
if (min_ts == sched_ts) {
AddSchedEvent(sched, sched_idx++, state_map, trace_end_ts, table.get());
} else if (min_ts == waking_ts) {
AddWakingEvent(waking, waking_idx++, state_map);
} else /* (min_ts == blocked_ts) */ {
AddBlockedReasonEvent(sched_blocked_reason, blocked_idx++, state_map);
}
}
// At the end, go through and flush any remaining pending events.
for (auto it = state_map.GetIterator(); it; ++it) {
// for (const auto& utid_to_pending_info : state_map) {
UniqueTid utid = it.key();
const ThreadSchedInfo& pending_info = it.value();
FlushPendingEventsForThread(utid, pending_info, table.get(), base::nullopt);
}
return table;
}
void ThreadStateGenerator::AddSchedEvent(const Table& sched,
uint32_t sched_idx,
TidInfoMap& state_map,
int64_t trace_end_ts,
tables::ThreadStateTable* table) {
int64_t ts = sched.GetTypedColumnByName<int64_t>("ts")[sched_idx];
UniqueTid utid = sched.GetTypedColumnByName<uint32_t>("utid")[sched_idx];
ThreadSchedInfo* info = &state_map[utid];
// Due to races in the kernel, it is possible for the same thread to be
// scheduled on different CPUs at the same time. This will manifest itself
// here by having |info->desched_ts| in the future of this scheduling slice
// (i.e. there was a scheduling slice in the past which ended after the start
// of the current scheduling slice).
//
// We work around this problem by truncating the previous slice to the start
// of this slice and not adding the descheduled slice (i.e. we don't call
// |FlushPendingEventsForThread| which adds this slice).
//
// See b/186509316 for details and an example on when this happens.
if (info->desched_ts && info->desched_ts.value() > ts) {
uint32_t prev_sched_row = info->scheduled_row.value();
int64_t prev_sched_start = table->ts()[prev_sched_row];
// Just a double check that descheduling slice would have started at the
// same time the scheduling slice would have ended.
PERFETTO_DCHECK(prev_sched_start + table->dur()[prev_sched_row] ==
info->desched_ts.value());
// Truncate the duration of the old slice to end at the start of this
// scheduling slice.
table->mutable_dur()->Set(prev_sched_row, ts - prev_sched_start);
} else {
FlushPendingEventsForThread(utid, *info, table, ts);
}
// Reset so we don't have any leftover data on the next round.
*info = {};
// Undo the expansion of the final sched slice for each CPU to the end of the
// trace by setting the duration back to -1. This counteracts the code in
// SchedEventTracker::FlushPendingEvents
// TODO(lalitm): remove this hack when we stop expanding the last slice to the
// end of the trace.
int64_t dur = sched.GetTypedColumnByName<int64_t>("dur")[sched_idx];
if (ts + dur == trace_end_ts) {
dur = -1;
}
// Now add the sched slice itself as "Running" with the other fields
// unchanged.
tables::ThreadStateTable::Row sched_row;
sched_row.ts = ts;
sched_row.dur = dur;
sched_row.cpu = sched.GetTypedColumnByName<uint32_t>("cpu")[sched_idx];
sched_row.state = running_string_id_;
sched_row.utid = utid;
auto id_and_row = table->Insert(sched_row);
// If the sched row had a negative duration, don't add any descheduled slice
// because it would be meaningless.
if (sched_row.dur == -1) {
return;
}
// This will be flushed to the table on the next sched slice (or the very end
// of the big loop).
info->desched_ts = ts + dur;
info->desched_end_state =
sched.GetTypedColumnByName<StringId>("end_state")[sched_idx];
info->scheduled_row = id_and_row.row;
}
void ThreadStateGenerator::AddWakingEvent(const Table& waking,
uint32_t waking_idx,
TidInfoMap& state_map) {
int64_t ts = waking.GetTypedColumnByName<int64_t>("ts")[waking_idx];
UniqueTid utid = static_cast<UniqueTid>(
waking.GetTypedColumnByName<int64_t>("ref")[waking_idx]);
ThreadSchedInfo* info = &state_map[utid];
// Occasionally, it is possible to get a waking event for a thread
// which is already in a runnable state. When this happens, we just
// ignore the waking event.
// See b/186509316 for details and an example on when this happens.
if (info->desched_end_state &&
*info->desched_end_state == runnable_string_id_) {
return;
}
// As counter-intuitive as it seems, occasionally we can get a waking
// event for a thread which is currently running.
//
// There are two cases when this can happen:
// 1. The kernel legitimately send a waking event for a "running" thread
// because the thread was woken up before the kernel switched away
// from it. In this case, the waking timestamp will be in the past
// because we added the descheduled slice when we processed the sched
// event.
// 2. We're close to the end of the trace or had data-loss and we missed
// the switch out event for a thread but we see a waking after.
// Case 1 described above. In this situation, we should drop the waking
// entirely.
if (info->desched_ts && *info->desched_ts > ts) {
return;
}
// For case 2 and otherwise, we should just note the fact that the thread
// became runnable at this time. Note that we cannot check if runnable is
// already not set because we could have data-loss which leads to us getting
// back to back waking for a single thread.
info->runnable_ts = ts;
}
Table::Schema ThreadStateGenerator::CreateSchema() {
auto schema = tables::ThreadStateTable::Schema();
// Because we expect our users to generally want ordered by ts, we set the
// ordering for the schema to match our forced sort pass in ComputeTable.
auto ts_it = std::find_if(
schema.columns.begin(), schema.columns.end(),
[](const Table::Schema::Column& col) { return col.name == "ts"; });
ts_it->is_sorted = true;
auto id_it = std::find_if(
schema.columns.begin(), schema.columns.end(),
[](const Table::Schema::Column& col) { return col.name == "id"; });
id_it->is_sorted = false;
return schema;
}
void ThreadStateGenerator::FlushPendingEventsForThread(
UniqueTid utid,
const ThreadSchedInfo& info,
tables::ThreadStateTable* table,
base::Optional<int64_t> end_ts) {
// First, let's flush the descheduled period (if any) to the table.
if (info.desched_ts) {
PERFETTO_DCHECK(info.desched_end_state);
int64_t dur;
if (end_ts) {
int64_t desched_end_ts = info.runnable_ts ? *info.runnable_ts : *end_ts;
dur = desched_end_ts - *info.desched_ts;
} else {
dur = -1;
}
tables::ThreadStateTable::Row row;
row.ts = *info.desched_ts;
row.dur = dur;
row.state = *info.desched_end_state;
row.utid = utid;
row.io_wait = info.io_wait;
row.blocked_function = info.blocked_function;
table->Insert(row);
}
// Next, flush the runnable period (if any) to the table.
if (info.runnable_ts) {
tables::ThreadStateTable::Row row;
row.ts = *info.runnable_ts;
row.dur = end_ts ? *end_ts - row.ts : -1;
row.state = runnable_string_id_;
row.utid = utid;
table->Insert(row);
}
}
void ThreadStateGenerator::AddBlockedReasonEvent(const Table& blocked_reason,
uint32_t blocked_idx,
TidInfoMap& state_map) {
const auto& utid_col = blocked_reason.GetTypedColumnByName<int64_t>("ref");
const auto& arg_set_id_col =
blocked_reason.GetTypedColumnByName<uint32_t>("arg_set_id");
UniqueTid utid = static_cast<UniqueTid>(utid_col[blocked_idx]);
uint32_t arg_set_id = arg_set_id_col[blocked_idx];
ThreadSchedInfo& info = state_map[utid];
base::Optional<Variadic> opt_value;
base::Status status =
context_->storage->ExtractArg(arg_set_id, "io_wait", &opt_value);
// We can't do anything better than ignoring any errors here.
// TODO(lalitm): see if there's a better way to handle this.
if (status.ok() && opt_value) {
PERFETTO_CHECK(opt_value->type == Variadic::Type::kBool);
info.io_wait = opt_value->bool_value;
}
status = context_->storage->ExtractArg(arg_set_id, "function", &opt_value);
if (status.ok() && opt_value) {
PERFETTO_CHECK(opt_value->type == Variadic::Type::kString);
info.blocked_function = opt_value->string_value;
}
}
std::string ThreadStateGenerator::TableName() {
return "thread_state";
}
uint32_t ThreadStateGenerator::EstimateRowCount() {
return context_->storage->sched_slice_table().row_count();
}
} // namespace trace_processor
} // namespace perfetto