Check DUT authenticated/associated/authorized state on AP side.
This change gives tests the ability to see whether a DUT is
authenticated, associated, and authorized on an AP. This is useful
in particular for tests like roaming, which cause a DUT to move
through these states on multiple APs.
Change-Id: Ic5fe7e1eb05cc1d93671b34ad1e37846556bb5fa
Reviewed-on: https://fuchsia-review.googlesource.com/c/antlion/+/918193
Reviewed-by: Sam Balana <sbalana@google.com>
Commit-Queue: Karl Ward <karlward@google.com>
diff --git a/packages/antlion/controllers/access_point.py b/packages/antlion/controllers/access_point.py
index 28477a3..96a63cf 100755
--- a/packages/antlion/controllers/access_point.py
+++ b/packages/antlion/controllers/access_point.py
@@ -718,6 +718,27 @@
raise ValueError(f"Invalid identifier {identifier} given")
return instance.hostapd.get_stas()
+ def sta_authenticated(self, identifier: str, sta_mac: str) -> bool:
+ """Is STA authenticated?"""
+ instance = self._aps.get(identifier)
+ if instance is None:
+ raise ValueError(f"Invalid identifier {identifier} given")
+ return instance.hostapd.sta_authenticated(sta_mac)
+
+ def sta_associated(self, identifier: str, sta_mac: str) -> bool:
+ """Is STA associated?"""
+ instance = self._aps.get(identifier)
+ if instance is None:
+ raise ValueError(f"Invalid identifier {identifier} given")
+ return instance.hostapd.sta_associated(sta_mac)
+
+ def sta_authorized(self, identifier: str, sta_mac: str) -> bool:
+ """Is STA authorized (802.1X controlled port open)?"""
+ instance = self._aps.get(identifier)
+ if instance is None:
+ raise ValueError(f"Invalid identifier {identifier} given")
+ return instance.hostapd.sta_authorized(sta_mac)
+
def get_sta_extended_capabilities(
self, identifier: str, sta_mac: str
) -> ExtendedCapabilities:
diff --git a/packages/antlion/controllers/ap_lib/hostapd.py b/packages/antlion/controllers/ap_lib/hostapd.py
index 75de9e6..ef9c5c5 100644
--- a/packages/antlion/controllers/ap_lib/hostapd.py
+++ b/packages/antlion/controllers/ap_lib/hostapd.py
@@ -201,6 +201,48 @@
except ValueError:
raise Error(f"ext_capab contains invalid hex string repr {raw_ext_capab}")
+ def sta_authenticated(self, sta_mac: str) -> bool:
+ """Is the given STA authenticated?
+
+ Args:
+ sta_mac: MAC address of the STA in question.
+ Returns:
+ True if AP sees that the STA is authenticated, False otherwise.
+ Raises:
+ Error if authenticated status for the STA cannot be obtained.
+ """
+ sta_result = self._sta(sta_mac)
+ m = re.search(r"flags=.*\[AUTH\]", sta_result.stdout, re.MULTILINE)
+ return bool(m)
+
+ def sta_associated(self, sta_mac: str) -> bool:
+ """Is the given STA associated?
+
+ Args:
+ sta_mac: MAC address of the STA in question.
+ Returns:
+ True if AP sees that the STA is associated, False otherwise.
+ Raises:
+ Error if associated status for the STA cannot be obtained.
+ """
+ sta_result = self._sta(sta_mac)
+ m = re.search(r"flags=.*\[ASSOC\]", sta_result.stdout, re.MULTILINE)
+ return bool(m)
+
+ def sta_authorized(self, sta_mac: str) -> bool:
+ """Is the given STA authorized (802.1X controlled port open)?
+
+ Args:
+ sta_mac: MAC address of the STA in question.
+ Returns:
+ True if AP sees that the STA is 802.1X authorized, False otherwise.
+ Raises:
+ Error if authorized status for the STA cannot be obtained.
+ """
+ sta_result = self._sta(sta_mac)
+ m = re.search(r"flags=.*\[AUTHORIZED\]", sta_result.stdout, re.MULTILINE)
+ return bool(m)
+
def _bss_tm_req(
self, client_mac: str, request: BssTransitionManagementRequest
) -> Result:
diff --git a/packages/antlion/unit_tests/controllers/ap_lib/hostapd_test.py b/packages/antlion/unit_tests/controllers/ap_lib/hostapd_test.py
new file mode 100644
index 0000000..239c3fe
--- /dev/null
+++ b/packages/antlion/unit_tests/controllers/ap_lib/hostapd_test.py
@@ -0,0 +1,103 @@
+#!/usr/bin/env python3
+#
+# Copyright 2023 The Fuchsia Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from unittest.mock import Mock
+
+from antlion.controllers.ap_lib import hostapd
+from antlion.libs.proc.job import Result
+
+# MAC address that will be used in these tests.
+STA_MAC = "aa:bb:cc:dd:ee:ff"
+
+# Abbreviated output of hostapd_cli STA commands, showing various AUTH/ASSOC/AUTHORIZED states.
+STA_OUTPUT_WITHOUT_STA_AUTHENTICATED = b"""aa:bb:cc:dd:ee:ff
+flags=[WMM][HT][VHT]"""
+
+STA_OUTPUT_WITH_STA_AUTHENTICATED = b"""aa:bb:cc:dd:ee:ff
+flags=[AUTH][WMM][HT][VHT]"""
+
+STA_OUTPUT_WITH_STA_ASSOCIATED = b"""aa:bb:cc:dd:ee:ff
+flags=[AUTH][ASSOC][WMM][HT][VHT]
+aid=42"""
+
+STA_OUTPUT_WITH_STA_AUTHORIZED = b"""aa:bb:cc:dd:ee:ff
+flags=[AUTH][ASSOC][AUTHORIZED][WMM][HT][VHT]
+aid=42"""
+
+
+class HostapdTest(unittest.TestCase):
+ def test_sta_authenticated_true_for_authenticated_sta(self):
+ hostapd_mock = hostapd.Hostapd("mock_runner", "wlan0")
+ hostapd_mock._run_hostapd_cli_cmd = Mock(
+ return_value=Result(
+ command=list(), stdout=STA_OUTPUT_WITH_STA_AUTHENTICATED, exit_status=0
+ )
+ )
+ self.assertTrue(hostapd_mock.sta_authenticated(STA_MAC))
+
+ def test_sta_authenticated_false_for_unauthenticated_sta(self):
+ hostapd_mock = hostapd.Hostapd("mock_runner", "wlan0")
+ hostapd_mock._run_hostapd_cli_cmd = Mock(
+ return_value=Result(
+ command=list(),
+ stdout=STA_OUTPUT_WITHOUT_STA_AUTHENTICATED,
+ exit_status=0,
+ )
+ )
+ self.assertFalse(hostapd_mock.sta_authenticated(STA_MAC))
+
+ def test_sta_associated_true_for_associated_sta(self):
+ hostapd_mock = hostapd.Hostapd("mock_runner", "wlan0")
+ hostapd_mock._run_hostapd_cli_cmd = Mock(
+ return_value=Result(
+ command=list(), stdout=STA_OUTPUT_WITH_STA_ASSOCIATED, exit_status=0
+ )
+ )
+ self.assertTrue(hostapd_mock.sta_associated(STA_MAC))
+
+ def test_sta_associated_false_for_unassociated_sta(self):
+ hostapd_mock = hostapd.Hostapd("mock_runner", "wlan0")
+ # Uses the authenticated-only CLI output.
+ hostapd_mock._run_hostapd_cli_cmd = Mock(
+ return_value=Result(
+ command=list(), stdout=STA_OUTPUT_WITH_STA_AUTHENTICATED, exit_status=0
+ )
+ )
+ self.assertFalse(hostapd_mock.sta_associated(STA_MAC))
+
+ def test_sta_authorized_true_for_authorized_sta(self):
+ hostapd_mock = hostapd.Hostapd("mock_runner", "wlan0")
+ hostapd_mock._run_hostapd_cli_cmd = Mock(
+ return_value=Result(
+ command=list(), stdout=STA_OUTPUT_WITH_STA_AUTHORIZED, exit_status=0
+ )
+ )
+ self.assertTrue(hostapd_mock.sta_authorized(STA_MAC))
+
+ def test_sta_associated_false_for_unassociated_sta(self):
+ hostapd_mock = hostapd.Hostapd("mock_runner", "wlan0")
+ # Uses the associated-only CLI output.
+ hostapd_mock._run_hostapd_cli_cmd = Mock(
+ return_value=Result(
+ command=list(), stdout=STA_OUTPUT_WITH_STA_ASSOCIATED, exit_status=0
+ )
+ )
+ self.assertFalse(hostapd_mock.sta_authorized(STA_MAC))
+
+
+if __name__ == "__main__":
+ unittest.main()