blob: d2930a680db157acdcc9da4a21afca617da431eb [file] [log] [blame]
from datetime import datetime
import os
import pathlib
import re
import shutil
import tempfile
from collections import defaultdict
_EVENT_TIME_FORMAT = "%Y/%m/%d %H:%M:%S.%f"
_EVENT_TIME_PATTERN = re.compile(r"(\d+(?:\/\d+){2}\s\d{2}(?::\d{2}){2}\.\d+)\sRead:")
_GNSS_CLOCK_START_LOG_PATTERN = re.compile(r"^GnssClock:")
_GNSS_CLOCK_TIME_NANOS_PATTERN = re.compile(f"^\s+TimeNanos\s*=\s*([-]?\d*)")
_GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN = re.compile(f"^\s+FullBiasNanos\s*=\s*([-]?\d*)")
_GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN = re.compile(f"^\s+elapsedRealtimeNanos\s*=\s*([-]?\d*)")
class AdrInfo:
"""Represent one ADR value
An ADR value is a decimal number range from 0 - 31
How to parse the ADR value:
First, transform the decimal number to binary then we will get a 5 bit number
The meaning of each bit is as follow:
0 0 0 0 0
HalfCycleReported HalfCycleResolved CycleSlip Reset Valid
Special rule:
For an ADR value in binary fits the pattern: * * 0 0 1, we call it a usable ADR
More insight of ADR value:
go/adrstates
Attributes:
is_valid: (bool)
is_reset: (bool)
is_cycle_slip: (bool)
is_half_cycle_resolved: (bool)
is_half_cycle_reported: (bool)
is_usable: (bool)
"""
def __init__(self, adr_value: int, count: int):
src = bin(int(adr_value))
self._valid = int(src[-1])
self._reset = int(src[-2])
self._cycle_slip = int(src[-3])
self._half_cycle_resolved = int(src[-4])
self._half_cycle_reported = int(src[-5])
self.count = count
@property
def is_usable(self):
return self.is_valid and not self.is_reset and not self.is_cycle_slip
@property
def is_valid(self):
return bool(self._valid)
@property
def is_reset(self):
return bool(self._reset)
@property
def is_cycle_slip(self):
return bool(self._cycle_slip)
@property
def is_half_cycle_resolved(self):
return bool(self._half_cycle_resolved)
@property
def is_half_cycle_reported(self):
return bool(self._half_cycle_reported)
class AdrStatistic:
"""Represent the ADR statistic
Attributes:
usable_count: (int)
valid_count: (int)
reset_count: (int)
cycle_slip_count: (int)
half_cycle_resolved_count: (int)
half_cycle_reported_count: (int)
total_count: (int)
usable_rate: (float)
usable_count / total_count
valid_rate: (float)
valid_count / total_count
"""
def __init__(self):
self.usable_count = 0
self.valid_count = 0
self.reset_count = 0
self.cycle_slip_count = 0
self.half_cycle_resolved_count = 0
self.half_cycle_reported_count = 0
self.total_count = 0
@property
def usable_rate(self):
denominator = max(1, self.total_count)
result = self.usable_count / denominator
return round(result, 3)
@property
def valid_rate(self):
denominator = max(1, self.total_count)
result = self.valid_count / denominator
return round(result, 3)
def add_adr_info(self, adr_info: AdrInfo):
"""Add ADR info record to increase the statistic
Args:
adr_info: AdrInfo object
"""
if adr_info.is_valid:
self.valid_count += adr_info.count
if adr_info.is_reset:
self.reset_count += adr_info.count
if adr_info.is_cycle_slip:
self.cycle_slip_count += adr_info.count
if adr_info.is_half_cycle_resolved:
self.half_cycle_resolved_count += adr_info.count
if adr_info.is_half_cycle_reported:
self.half_cycle_reported_count += adr_info.count
if adr_info.is_usable:
self.usable_count += adr_info.count
self.total_count += adr_info.count
class GnssClockSubEvent:
time_nanos: int
full_bias_nanos: int
elapsed_real_time_nanos: int
def __init__(self, event_time):
self.event_time = event_time
def parse(self, line):
if _GNSS_CLOCK_TIME_NANOS_PATTERN.search(line):
self.time_nanos = int(_GNSS_CLOCK_TIME_NANOS_PATTERN.search(line).group(1))
elif _GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line):
self.full_bias_nanos = int(
_GNSS_CLOCK_FULL_BIAS_NANOS_PATTERN.search(line).group(1))
elif _GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line):
self.elapsed_real_time_nanos = int(
_GNSS_CLOCK_ELAPSED_TIME_NANOS_PATTERN.search(line).group(1))
def __repr__(self) -> str:
return (f"event time: {self.event_time}, "
f"timenanos: {self.time_nanos}, "
f"full_bias: {self.full_bias_nanos}, "
f"elapsed_realtime: {self.elapsed_real_time_nanos}")
@property
def gps_elapsed_realtime_diff(self):
return self.time_nanos - self.full_bias_nanos - self.elapsed_real_time_nanos
class GnssMeasurement:
"""Represent the content of measurement file generated by gps tool"""
FILE_PATTERN = "/storage/emulated/0/Android/data/com.android.gpstool/files/MEAS*.txt"
def __init__(self, ad):
self.ad = ad
def _generate_local_temp_path(self, file_name="file.txt"):
"""Generate a file path for temporarily usage
Returns:
string: local file path
"""
folder = tempfile.mkdtemp()
file_path = os.path.join(folder, file_name)
return file_path
def _get_latest_measurement_file_path(self):
"""Get the latest measurement file path on device
Returns:
string: file path on device
"""
command = f"ls -tr {self.FILE_PATTERN} | tail -1"
result = self.ad.adb.shell(command)
return result
def get_latest_measurement_file(self):
"""Pull the latest measurement file from device to local
Returns:
string: local file path to the measurement file
Raise:
FileNotFoundError: can't get measurement file from device
"""
self.ad.log.info("Get measurement file from device")
dest = self._generate_local_temp_path(file_name="measurement.txt")
src = self._get_latest_measurement_file_path()
if not src:
raise FileNotFoundError(f"Can not find measurement file: pattern {self.FILE_PATTERN}")
self.ad.pull_files(src, dest)
return dest
def _get_adr_src_value(self):
"""Get ADR value from measurement file
Returns:
dict: {<ADR_value>: count, <ADR_value>: count...}
"""
try:
file_path = self.get_latest_measurement_file()
adr_src = defaultdict(int)
adr_src_regex = re.compile("=\s(\d*)")
with open(file_path) as f:
for line in f:
if "AccumulatedDeltaRangeState" in line:
value = re.search(adr_src_regex, line)
if not value:
self.ad.log.warn("Can't get ADR value %s" % line)
continue
key = value.group(1)
adr_src[key] += 1
return adr_src
finally:
folder = pathlib.PurePosixPath(file_path).parent
shutil.rmtree(folder, ignore_errors=True)
def get_adr_static(self):
"""Get ADR statistic
Summarize ADR value from measurement file
Returns:
AdrStatistic object
"""
self.ad.log.info("Get ADR statistic")
adr_src = self._get_adr_src_value()
adr_static = AdrStatistic()
for key, value in adr_src.items():
self.ad.log.debug("ADR value: %s - count: %s" % (key, value))
adr_info = AdrInfo(key, value)
adr_static.add_adr_info(adr_info)
return adr_static
def get_gnss_clock_info(self):
sub_events = []
event_time = None
with tempfile.TemporaryDirectory() as folder:
local_measurement_file = os.path.join(folder, "measurement_file")
self.ad.pull_files(self._get_latest_measurement_file_path(), local_measurement_file)
with open(local_measurement_file) as f:
for line in f:
if re.search(_EVENT_TIME_PATTERN, line):
event_time = re.search(_EVENT_TIME_PATTERN, line).group(1)
event_time = datetime.strptime(event_time, _EVENT_TIME_FORMAT)
elif re.search(_GNSS_CLOCK_START_LOG_PATTERN, line):
sub_events.append(GnssClockSubEvent(event_time))
elif line.startswith(" "):
sub_events[-1].parse(line)
return sub_events