Client V2 cleans event client when it stops (#825)

diff --git a/mobly/controllers/android_device_lib/snippet_client_v2.py b/mobly/controllers/android_device_lib/snippet_client_v2.py
index 78e6b70..0c9540a 100644
--- a/mobly/controllers/android_device_lib/snippet_client_v2.py
+++ b/mobly/controllers/android_device_lib/snippet_client_v2.py
@@ -558,6 +558,7 @@
     * Stop forwarding the device port to host.
     * Stop the standing server subprocess running on the host side.
     * Stop the snippet server running on the device side.
+    * Stop the event client and set `self._event_client` to None.
 
     Raises:
       android_device_lib_errors.DeviceError: if the server exited with errors on
@@ -566,6 +567,7 @@
     self.log.debug('Stopping snippet package %s.', self.package)
     self.close_connection()
     self._stop_server()
+    self._destroy_event_client()
     self.log.debug('Snippet package %s stopped.', self.package)
 
   def close_connection(self):
@@ -616,6 +618,17 @@
           self._device,
           f'Failed to stop existing apk. Unexpected output: {out}.')
 
+  def _destroy_event_client(self):
+    """Releases all the resources acquired in `_create_event_client`."""
+    if self._event_client:
+      # Without cleaning host_port of event_client first, the close_connection
+      # will try to stop the port forwarding, which should only be stopped by
+      # the corresponding snippet client.
+      self._event_client.host_port = None
+      self._event_client.device_port = None
+      self._event_client.close_connection()
+      self._event_client = None
+
   def restore_server_connection(self, port=None):
     """Restores the server after the device got reconnected.
 
diff --git a/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py b/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py
index 534ba81..c7e8ef1 100644
--- a/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py
+++ b/tests/mobly/controllers/android_device_lib/snippet_client_v2_test.py
@@ -158,10 +158,11 @@
         mock_start_subprocess.return_value)
     self.assertFalse(self.client.is_alive)
     self.assertIs(self.client._conn, None)
-    self.socket_conn.close.assert_called_once_with()
+    self.socket_conn.close.assert_called()
     self.assertIs(self.client.host_port, None)
     self.adb.mock_forward_func.assert_any_call(
         ['--remove', f'tcp:{mock_get_port.return_value}'])
+    self.assertIsNone(self.client._event_client)
 
   @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
               'utils.get_available_host_port',
@@ -234,6 +235,7 @@
 
     self.client.initialize()
     rpc_result = self.client.some_async_rpc(1, 2, 'async')
+    event_client = self.client._event_client
     self.client.stop()
 
     self._assert_client_resources_released(mock_start_subprocess,
@@ -242,13 +244,14 @@
 
     self.assertListEqual(self.mock_socket_file.write.call_args_list,
                          expected_socket_writes)
-    mock_callback_class.assert_called_with(
-        callback_id='1-0',
-        event_client=self.client._event_client,
-        ret_value=123,
-        method_name='some_async_rpc',
-        ad=self.device)
+    mock_callback_class.assert_called_with(callback_id='1-0',
+                                           event_client=event_client,
+                                           ret_value=123,
+                                           method_name='some_async_rpc',
+                                           ad=self.device)
     self.assertIs(rpc_result, mock_callback_class.return_value)
+    self.assertIsNone(event_client.host_port, None)
+    self.assertIsNone(event_client.device_port, None)
 
   @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
               'utils.get_available_host_port',
@@ -299,17 +302,18 @@
     rpc_results.append(self.client.some_async_rpc(3, 4, 'async'))
     rpc_results.append(self.client.some_sync_rpc(5, 'hello'))
     rpc_results.append(self.client.some_async_rpc(6, 'async'))
+    event_client = self.client._event_client
     self.client.stop()
 
     # Assertions
     mock_callback_class_calls_expected = [
         mock.call(callback_id='1-0',
-                  event_client=self.client._event_client,
+                  event_client=event_client,
                   ret_value=456,
                   method_name='some_async_rpc',
                   ad=self.device),
         mock.call(callback_id='2-0',
-                  event_client=self.client._event_client,
+                  event_client=event_client,
                   ret_value=321,
                   method_name='some_async_rpc',
                   ad=self.device),
@@ -319,6 +323,8 @@
     self._assert_client_resources_released(mock_start_subprocess,
                                            mock_stop_standing_subprocess,
                                            mock_get_port)
+    self.assertIsNone(event_client.host_port, None)
+    self.assertIsNone(event_client.device_port, None)
 
   def test_check_app_installed_normally(self):
     """Tests that app checker runs normally when app installed correctly."""
@@ -589,6 +595,7 @@
     self.assertIs(self.client.host_port, None)
     self.device.adb.mock_forward_func.assert_called_once_with(
         ['--remove', 'tcp:12345'])
+    self.assertIsNone(self.client._event_client)
 
   @mock.patch('mobly.utils.stop_standing_subprocess')
   def test_stop_when_server_is_already_cleaned(self,
@@ -683,6 +690,68 @@
     self.device.adb.mock_forward_func.assert_called_once_with(
         ['--remove', 'tcp:12345'])
 
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  def test_stop_with_event_client(self, mock_send_handshake_func,
+                                  mock_create_socket_conn_func,
+                                  mock_stop_standing_subprocess):
+    """Tests that stopping with an event client works normally."""
+    del mock_send_handshake_func
+    del mock_create_socket_conn_func
+    del mock_stop_standing_subprocess
+    self._make_client()
+    self.client.host_port = 12345
+    self.client.device_port = 45678
+    snippet_client_conn = mock.Mock()
+    self.client._conn = snippet_client_conn
+    self.client._create_event_client()
+    event_client = self.client._event_client
+    event_client_conn = mock.Mock()
+    event_client._conn = event_client_conn
+
+    self.client.stop()
+
+    # The snippet client called close method once
+    snippet_client_conn.close.assert_called_once_with()
+    # The event client called close method once
+    event_client_conn.close.assert_called_once_with()
+    self.assertIsNone(event_client._conn)
+    self.assertIsNone(event_client.host_port)
+    self.assertIsNone(event_client.device_port)
+    self.assertIsNone(self.client._event_client)
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  def test_stop_with_event_client_stops_port_forwarding_once(
+      self, mock_send_handshake_func, mock_create_socket_conn_func,
+      mock_stop_standing_subprocess):
+    """Tests that client with an event client stops port forwarding once."""
+    del mock_send_handshake_func
+    del mock_create_socket_conn_func
+    del mock_stop_standing_subprocess
+    self._make_client()
+    self.client.host_port = 12345
+    self.client.device_port = 45678
+    self.client._create_event_client()
+    event_client = self.client._event_client
+
+    self.client.stop()
+    event_client.__del__()
+    self.client.__del__()
+
+    self.assertIsNone(self.client._event_client)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+
   def test_close_connection_normally(self):
     """Tests that closing connection works normally."""
     self._make_client()