Add WlanDriverRestartTest to wlan functional tests

This test verifies the iface list contains only one iface with the
correct iface id after iwlwifi driver is restarted.

Change-Id: I832e86c2e9d09aa93afe34d4b4e3fbcbfbad183a
Reviewed-on: https://fuchsia-review.googlesource.com/c/antlion/+/917576
Reviewed-by: Sam Balana <sbalana@google.com>
Commit-Queue: Zhiyi Chen <zhiyichen@google.com>
diff --git a/tests/wlan/functional/BUILD.gn b/tests/wlan/functional/BUILD.gn
index 40e51a3..886ad04 100644
--- a/tests/wlan/functional/BUILD.gn
+++ b/tests/wlan/functional/BUILD.gn
@@ -46,36 +46,33 @@
   environments = []
 }
 
+antlion_host_test("wlan_driver_restart_test") {
+  main_source = "WlanDriverRestartTest.py"
+  environments = [ nuc_env ]
+}
+
 antlion_host_test("wlan_reboot_ap_test") {
   main_source = "WlanRebootTest.py"
   environments = display_ap_iperf_envs
-  test_cases = [
-    "test_.+_ap_.+",
-  ]
+  test_cases = [ "test_.+_ap_.+" ]
 }
 
 antlion_host_test("wlan_reboot_ap_test_quick") {
   main_source = "WlanRebootTest.py"
   environments = display_ap_iperf_envs
-  test_cases = [
-    "test_soft_reboot_dut_5g_open_ipv4",
-  ]
+  test_cases = [ "test_soft_reboot_dut_5g_open_ipv4" ]
 }
 
 antlion_host_test("wlan_reboot_dut_test") {
   main_source = "WlanRebootTest.py"
   environments = display_ap_iperf_envs
-  test_cases = [
-    "test_.+_dut_.+",
-  ]
+  test_cases = [ "test_.+_dut_.+" ]
 }
 
 antlion_host_test("wlan_reboot_dut_test_quick") {
   main_source = "WlanRebootTest.py"
   environments = display_ap_iperf_envs
-  test_cases = [
-    "test_soft_reboot_ap_5g_open_ipv4",
-  ]
+  test_cases = [ "test_soft_reboot_ap_5g_open_ipv4" ]
 }
 
 antlion_host_test("wlan_scan_test") {
@@ -111,6 +108,7 @@
   testonly = true
   public_deps = [
     ":ping_stress_test($host_toolchain)",
+    ":wlan_driver_restart_test($host_toolchain)",
     ":wlan_reboot_ap_test_quick($host_toolchain)",
     ":wlan_reboot_dut_test_quick($host_toolchain)",
   ]
diff --git a/tests/wlan/functional/WlanDriverRestartTest.py b/tests/wlan/functional/WlanDriverRestartTest.py
new file mode 100644
index 0000000..2dfca22
--- /dev/null
+++ b/tests/wlan/functional/WlanDriverRestartTest.py
@@ -0,0 +1,128 @@
+#!/usr/bin/env python3
+#
+# Copyright 2023 The Fuchsia Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+
+from mobly import asserts, signals, test_runner
+
+from antlion.controllers.fuchsia_lib.wlan_lib import WlanError, WlanMacRole
+from antlion.test_utils.wifi import base_test
+
+DELAY_FOR_DRIVER_RESTART_SEC = 2
+
+
+class WlanDriverRestartTest(base_test.WifiBaseTest):
+    def setup_class(self):
+        """Example Config in yaml Format:
+          driver_restart_test_params:
+            iterations: 10
+
+        If no configuration is provided, the test will run with a default iteration count,
+        which is 10.
+        """
+        super().setup_class()
+        self.log = logging.getLogger()
+
+        if not self.fuchsia_devices:
+            raise signals.TestFailure("No test devices are available.")
+
+        test_params = self.user_params.get("driver_restart_test_params", {})
+        self.iterations = test_params.get("iterations", 10)
+        self.fd = self.fuchsia_devices[0]
+
+        response = self.fd.ffx.run("driver list")
+        driver_list = str(response.stdout, "UTF-8")
+        if not driver_list.find("iwlwifi"):
+            raise signals.TestSkip(
+                "No intel WiFi driver found on this device, skipping test"
+            )
+
+        response = self.fd.sl4f.wlan_policy_lib.wlanCreateClientController()
+        asserts.assert_is_none(
+            response.get("error", None),
+            f"Failed to create client controller. Err: {response['error']}",
+        )
+
+        response = self.fd.sl4f.wlan_policy_lib.wlanStopClientConnections()
+        asserts.assert_is_none(
+            response.get("error", None),
+            f"Failed to stop client connections. Err: {response['error']}",
+        )
+
+    def test_driver_restart(self):
+        """Test that the WLAN interface gets cleaned up when after driver restarts.
+
+        Steps:
+        1. Restart iwlwifi driver
+        2. Get the iface list
+        3. Go to step (1) until it's done for 10 iterations
+
+        Expected Result:
+        Verify the interface gets shutdown in each iteration and restarted with a +1 number.
+
+        Returns:
+          signals.TestPass if no errors
+          signals.TestFailure if there are any errors during the test.
+
+        TAGS: WLAN
+        Priority: 2
+        """
+        for i in range(self.iterations):
+            self.fd.ffx.run(
+                "driver restart fuchsia-pkg://fuchsia.com/iwlwifi#meta/iwlwifi.cm"
+            )
+            time.sleep(DELAY_FOR_DRIVER_RESTART_SEC)
+
+            response = self.fd.sl4f.wlan_lib.wlanGetIfaceIdList()
+            asserts.assert_is_none(
+                response.get("error", None),
+                f"Failed to get iface list: {response['error']}",
+            )
+
+            asserts.assert_equal(
+                response.get("result", None), [], "Iface was not deleted properly."
+            )
+
+            response = self.fd.sl4f.wlan_lib.wlanPhyIdList()
+            asserts.assert_is_none(
+                response.get("error", None),
+                f"Failed to get phy id list. Err: {response['error']}",
+            )
+
+            self.phy_id_list = response.get("result", [])
+            asserts.assert_equal(len(self.phy_id_list), 1, "Only expect one phy_id.")
+
+            try:
+                response = self.fd.sl4f.wlan_lib.wlanCreateIface(
+                    self.phy_id_list[0], WlanMacRole.CLIENT
+                )
+            except WlanError as e:
+                self.log.warn(f"Create iface failed: {e}")
+
+            response = self.fd.sl4f.wlan_lib.wlanGetIfaceIdList()
+            asserts.assert_is_none(
+                response.get("error", None),
+                f"Failed to get iface list: {response['error']}",
+            )
+
+            asserts.assert_not_equal(
+                response.get("result", None), [], "Iface was not created properly."
+            )
+
+
+if __name__ == "__main__":
+    test_runner.main()