Add Snippet Client V2 for Android, Step 2 (#808)

diff --git a/mobly/controllers/android_device_lib/ b/mobly/controllers/android_device_lib/
index ac7bb38..78e6b70 100644
--- a/mobly/controllers/android_device_lib/
+++ b/mobly/controllers/android_device_lib/
@@ -13,10 +13,14 @@
 # limitations under the License.
 """Snippet Client V2 for Interacting with Snippet Server on Android Device."""
+import enum
+import json
 import re
+import socket
 from mobly import utils
 from mobly.controllers.android_device_lib import adb
+from mobly.controllers.android_device_lib import callback_handler
 from mobly.controllers.android_device_lib import errors as android_device_lib_errors
 from mobly.snippet import client_base
 from mobly.snippet import errors
@@ -58,6 +62,31 @@
 _NOHUP_COMMAND = 'nohup'
+# UID of the 'unknown' JSON RPC session. Will cause creation of a new session
+# in the snippet server.
+# Maximum time to wait for the socket to open on the device.
+# Maximum time to wait for a response message on the socket.
+class ConnectionHandshakeCommand(enum.Enum):
+  """Commands to send to the server when sending the handshake request.
+  After creating the socket connection, the client must send a handshake request
+  to the server. When receiving the handshake request, the server will prepare
+  to communicate with the client. According to the command in the request,
+  the server will create a new session or reuse the current session.
+  INIT: Initiates a new session and makes a connection with this session.
+  CONTINUE: Makes a connection with the current session.
+  """
+  INIT = 'initiate'
+  CONTINUE = 'continue'
 class SnippetClientV2(client_base.ClientBase):
   """Snippet client V2 for interacting with snippet server on Android Device.
@@ -72,6 +101,9 @@
     host_port: int, the host port used for communicating with the snippet
     device_port: int, the device port listened by the snippet server.
+    uid: int, the uid of the server session with which this client communicates.
+      Default is `UNKNOWN_UID` and it will be set to a positive number after
+      the connection to the server is made successfully.
   def __init__(self, package, ad):
@@ -84,9 +116,13 @@
     super().__init__(package=package, device=ad)
     self.host_port = None
     self.device_port = None
+    self.uid = UNKNOWN_UID
     self._adb = ad.adb
     self._user_id = None
     self._proc = None
+    self._client = None  # keep it to prevent close errors on connect failure
+    self._conn = None
+    self._event_client = None
   def user_id(self):
@@ -110,6 +146,11 @@
       self._user_id = self._adb.current_user_id
     return self._user_id
+  @property
+  def is_alive(self):
+    """Does the client have an active connection to the snippet server."""
+    return self._conn is not None
   def before_starting_server(self):
     """Performs the preparation steps before starting the remote server.
@@ -276,10 +317,245 @@
       self.log.debug('Discarded line from instrumentation output: "%s"', line)
+  def make_connection(self):
+    """Makes a connection to the snippet server on the remote device.
+    This function makes a persistent connection to the server. This connection
+    will be used for all the RPCs, and must be closed when deconstructing.
+    To connect to the Android device, it first forwards the device port to a
+    host port. Then, it creates a socket connection to the server on the device.
+    Finally, it sends a handshake request to the server, which requests the
+    server to prepare for the communication with the client.
+    This function uses self.host_port for communicating with the server. If
+    self.host_port is 0 or None, this function finds an available host port to
+    make the connection and set self.host_port to the found port.
+    """
+    self._forward_device_port()
+    self.create_socket_connection()
+    self.send_handshake_request()
+  def _forward_device_port(self):
+    """Forwards the device port to a host port."""
+    if not self.host_port:
+      self.host_port = utils.get_available_host_port()
+    self._adb.forward([f'tcp:{self.host_port}', f'tcp:{self.device_port}'])
+  def create_socket_connection(self):
+    """Creates a socket connection to the server.
+    After creating the connection successfully, it sets two attributes:
+    * `self._conn`: the created socket object, which will be used when it needs
+      to close the connection.
+    * `self._client`: the socket file, which will be used to send and receive
+      messages.
+    This function only creates a socket connection without sending any message
+    to the server.
+    """
+    try:
+      self.log.debug(
+          'Snippet client is creating socket connection to the snippet server '
+          'of %s through host port %d.', self.package, self.host_port)
+      self._conn = socket.create_connection(('localhost', self.host_port),
+                                            _SOCKET_CONNECTION_TIMEOUT)
+    except ConnectionRefusedError as err:
+      # Retry using '' for IPv4 enabled machines that only resolve
+      # 'localhost' to '[::1]'.
+      self.log.debug('Failed to connect to localhost, trying %s',
+                     str(err))
+      self._conn = socket.create_connection(('', self.host_port),
+                                            _SOCKET_CONNECTION_TIMEOUT)
+    self._conn.settimeout(_SOCKET_READ_TIMEOUT)
+    self._client = self._conn.makefile(mode='brw')
+  def send_handshake_request(self,
+                             uid=UNKNOWN_UID,
+                             cmd=ConnectionHandshakeCommand.INIT):
+    """Sends a handshake request to the server to prepare for the communication.
+    Through the handshake response, this function checks whether the server
+    is ready for the communication. If ready, it sets `self.uid` to the
+    server session id. Otherwise, it sets `self.uid` to `UNKNOWN_UID`.
+    Args:
+      uid: int, the uid of the server session to continue. It will be ignored
+        if the `cmd` requires the server to create a new session.
+      cmd: ConnectionHandshakeCommand, the handshake command Enum for the
+        server, which requires the server to create a new session or use the
+        current session.
+    Raises:
+      errors.ProtocolError: something went wrong when sending the handshake
+        request.
+    """
+    request = json.dumps({'cmd': cmd.value, 'uid': uid})
+    self.log.debug('Sending handshake request %s.', request)
+    self._client_send(request)
+    response = self._client_receive()
+    if not response:
+      raise errors.ProtocolError(
+          self._device, errors.ProtocolError.NO_RESPONSE_FROM_HANDSHAKE)
+    response = self._decode_socket_response_bytes(response)
+    result = json.loads(response)
+    if result['status']:
+      self.uid = result['uid']
+    else:
+      self.uid = UNKNOWN_UID
+  def check_server_proc_running(self):
+    """See base class.
+    This client does nothing at this stage.
+    """
+  def send_rpc_request(self, request):
+    """Sends an RPC request to the server and receives a response.
+    Args:
+      request: str, the request to send the server.
+    Returns:
+      The string of the RPC response.
+    Raises:
+      errors.Error: if failed to send the request or receive a response.
+      errors.ProtocolError: if received an empty response from the server.
+      UnicodeError: if failed to decode the received response.
+    """
+    self._client_send(request)
+    response = self._client_receive()
+    if not response:
+      raise errors.ProtocolError(self._device,
+                                 errors.ProtocolError.NO_RESPONSE_FROM_SERVER)
+    return self._decode_socket_response_bytes(response)
+  def _client_send(self, message):
+    """Sends an RPC message through the connection.
+    Args:
+      message: str, the message to send.
+    Raises:
+      errors.Error: if a socket error occurred during the send.
+    """
+    try:
+      self._client.write(f'{message}\n'.encode('utf8'))
+      self._client.flush()
+    except socket.error as e:
+      raise errors.Error(
+          self._device,
+          f'Encountered socket error "{e}" sending RPC message "{message}"'
+      ) from e
+  def _client_receive(self):
+    """Receives the server's response of an RPC message.
+    Returns:
+      Raw bytes of the response.
+    Raises:
+      errors.Error: if a socket error occurred during the read.
+    """
+    try:
+      return self._client.readline()
+    except socket.error as e:
+      raise errors.Error(
+          self._device,
+          f'Encountered socket error "{e}" reading RPC response') from e
+  def _decode_socket_response_bytes(self, response):
+    """Returns a string decoded from the socket response bytes.
+    Args:
+      response: bytes, the response to be decoded.
+    Returns:
+      The string decoded from the given bytes.
+    Raises:
+      UnicodeError: if failed to decode the given bytes using encoding utf8.
+    """
+    try:
+      return str(response, encoding='utf8')
+    except UnicodeError:
+      self.log.error(
+          'Failed to decode socket response bytes using encoding '
+          'utf8: %s', response)
+      raise
+  def handle_callback(self, callback_id, ret_value, rpc_func_name):
+    """Creates the callback handler object.
+    If the client doesn't have an event client, it will start an event client
+    before creating a callback handler.
+    Args:
+      callback_id: see base class.
+      ret_value: see base class.
+      rpc_func_name: see base class.
+    Returns:
+      The callback handler object.
+    """
+    if self._event_client is None:
+      self._create_event_client()
+    return callback_handler.CallbackHandler(callback_id=callback_id,
+                                            event_client=self._event_client,
+                                            ret_value=ret_value,
+                                            method_name=rpc_func_name,
+                                            ad=self._device)
+  def _create_event_client(self):
+    """Creates a separate client to the same session for propagating events.
+    As the server is already started by the snippet server on which this
+    function is called, the created event client connects to the same session
+    as the snippet server. It also reuses the same host port and device port.
+    """
+    self._event_client = SnippetClientV2(package=self.package, ad=self._device)
+    self._event_client.make_connection_with_forwarded_port(
+        self.host_port, self.device_port, self.uid,
+        ConnectionHandshakeCommand.CONTINUE)
+  def make_connection_with_forwarded_port(self,
+                                          host_port,
+                                          device_port,
+                                          uid=UNKNOWN_UID,
+                                          cmd=ConnectionHandshakeCommand.INIT):
+    """Makes a connection to the server with the given forwarded port.
+    This process assumes that a device port has already been forwarded to a
+    host port, and it only makes a connection to the snippet server based on
+    the forwarded port. This is typically used by clients that share the same
+    snippet server, e.g. the snippet client and its event client.
+    Args:
+      host_port: int, the host port which has already been forwarded.
+      device_port: int, the device port listened by the snippet server.
+      uid: int, the uid of the server session to continue. It will be ignored
+        if the `cmd` requires the server to create a new session.
+      cmd: ConnectionHandshakeCommand, the handshake command Enum for the
+        server, which requires the server to create a new session or use the
+        current session.
+    """
+    self.host_port = host_port
+    self.device_port = device_port
+    self._counter = self._id_counter()
+    self.create_socket_connection()
+    self.send_handshake_request(uid, cmd)
   def stop(self):
     """Releases all the resources acquired in `initialize`.
     This function releases following resources:
+    * Close the socket connection.
+    * Stop forwarding the device port to host.
     * Stop the standing server subprocess running on the host side.
     * Stop the snippet server running on the device side.
@@ -287,13 +563,31 @@
       android_device_lib_errors.DeviceError: if the server exited with errors on
         the device side.
-    # TODO(mhaoli): This function is only partially implemented because we
-    # have not implemented the functionality of making connections in this
-    # class.
     self.log.debug('Stopping snippet package %s.', self.package)
+    self.close_connection()
     self.log.debug('Snippet package %s stopped.', self.package)
+  def close_connection(self):
+    """Closes the connection to the snippet server on the device.
+    This function closes the socket connection and stops forwarding the device
+    port to host.
+    """
+    try:
+      if self._conn:
+        self._conn.close()
+        self._conn = None
+    finally:
+      # Always clear the host port as part of the close step
+      self._stop_port_forwarding()
+  def _stop_port_forwarding(self):
+    """Stops the adb port forwarding used by this client."""
+    if self.host_port:
+      self._device.adb.forward(['--remove', f'tcp:{self.host_port}'])
+      self.host_port = None
   def _stop_server(self):
     """Releases all the resources acquired in `start_server`.
@@ -322,27 +616,71 @@
           f'Failed to stop existing apk. Unexpected output: {out}.')
-  # TODO(mhaoli): Temporally override these abstract methods so that we can
-  # initialize the instances in unit tests. We are implementing these functions
-  # in the next PR as soon as possible.
-  def make_connection(self):
-    raise NotImplementedError('To be implemented.')
-  def close_connection(self):
-    raise NotImplementedError('To be implemented.')
-  def __del__(self):
-    # Override the destructor to not call close_connection for now.
-    pass
-  def send_rpc_request(self, request):
-    raise NotImplementedError('To be implemented.')
-  def check_server_proc_running(self):
-    raise NotImplementedError('To be implemented.')
-  def handle_callback(self, callback_id, ret_value, rpc_func_name):
-    raise NotImplementedError('To be implemented.')
   def restore_server_connection(self, port=None):
-    raise NotImplementedError('To be implemented.')
+    """Restores the server after the device got reconnected.
+    Instead of creating a new instance of the client:
+      - Uses the given port (or find a new available host port if none is
+      given).
+      - Tries to connect to the remote server with the selected port.
+    Args:
+      port: int, if given, this is the host port from which to connect to the
+        remote device port. If not provided, find a new available port as host
+        port.
+    Raises:
+      errors.ServerRestoreConnectionError: when failed to restore the connection
+        to the snippet server.
+    """
+    try:
+      # If self.host_port is None, self._make_connection finds a new available
+      # port.
+      self.host_port = port
+      self._make_connection()
+    except Exception as e:
+      # Log the original error and raise ServerRestoreConnectionError.
+      self.log.error('Failed to re-connect to the server.')
+      raise errors.ServerRestoreConnectionError(
+          self._device,
+          (f'Failed to restore server connection for {self.package} at '
+           f'host port {self.host_port}, device port {self.device_port}.'
+          )) from e
+    # Because the previous connection was lost, update self._proc
+    self._proc = None
+    self._restore_event_client()
+  def _restore_event_client(self):
+    """Restores the previously created event client or creates a new one.
+    This function restores the connection of the previously created event
+    client, or creates a new client and makes a connection if it didn't
+    exist before.
+    The event client to restore reuses the same host port and device port
+    with the client on which function is called.
+    """
+    if self._event_client:
+      self._event_client.make_connection_with_forwarded_port(
+          self.host_port, self.device_port)
+  def help(self, print_output=True):
+    """Calls the help RPC, which returns the list of RPC calls available.
+    This RPC should normally be used in an interactive console environment
+    where the output should be printed instead of returned. Otherwise,
+    newlines will be escaped, which will make the output difficult to read.
+    Args:
+      print_output: bool, for whether the output should be printed.
+    Returns:
+      A string containing the help output otherwise None if `print_output`
+        wasn't set.
+    """
+    help_text = self._rpc('help')
+    if print_output:
+      print(help_text)
+    else:
+      return help_text
diff --git a/tests/mobly/controllers/android_device_lib/ b/tests/mobly/controllers/android_device_lib/
index b97774b..b674ce0 100644
--- a/tests/mobly/controllers/android_device_lib/
+++ b/tests/mobly/controllers/android_device_lib/
@@ -13,6 +13,7 @@
 # limitations under the License.
 """Unit tests for mobly.controllers.android_device_lib.snippet_client_v2."""
+import socket
 import unittest
 from unittest import mock
@@ -25,19 +26,79 @@
+class _MockAdbProxy(mock_android_device.MockAdbProxy):
+  """Mock class of adb proxy which covers all the calls used by snippet clients.
+  To enable testing snippet clients, this class extends the functionality of
+  base class from the following aspects:
+  * Records the arguments of all the calls to the shell method and forward
+    method.
+  * Handles the adb calls to stop the snippet server in the shell function
+    properly.
+  Attributes:
+    mock_shell_func: mock.Mock, used for recording the calls to the shell
+      method.
+    mock_forward_func: mock.Mock, used for recording the calls to the forward
+      method.
+  """
+  def __init__(self, *args, **kwargs):
+    """Initializes the instance of _MockAdbProxy."""
+    super().__init__(*args, **kwargs)
+    self.mock_shell_func = mock.Mock()
+    self.mock_forward_func = mock.Mock()
+  def shell(self, *args, **kwargs):
+    """Mock `shell` of mobly.controllers.android_device_lib.adb.AdbProxy."""
+    # Record all the call args
+    self.mock_shell_func(*args, **kwargs)
+    # Handle the server stop command properly
+    if f'am instrument --user 0 -w -e action stop {MOCK_SERVER_PATH}' in args:
+      return b'OK (0 tests)'
+    # For other commands, hand it over to the base class.
+    return super().shell(*args, **kwargs)
+  def forward(self, *args, **kwargs):
+    """Mock `forward` of mobly.controllers.android_device_lib.adb.AdbProxy."""
+    self.mock_forward_func(*args, **kwargs)
+def _setup_mock_socket_file(mock_socket_create_conn, resp):
+  """Sets up a mock socket file from the mock connection.
+  Args:
+    mock_socket_create_conn: The mock method for creating a socket connection.
+    resp: iterable, the side effect of the `readline` function of the mock
+      socket file.
+  Returns:
+    The mock socket file that will be injected into the code.
+  """
+  fake_file = mock.Mock()
+  fake_file.readline.side_effect = resp
+  fake_conn = mock.Mock()
+  fake_conn.makefile.return_value = fake_file
+  mock_socket_create_conn.return_value = fake_conn
+  return fake_file
 class SnippetClientV2Test(unittest.TestCase):
   """Unit tests for SnippetClientV2."""
   def _make_client(self, adb_proxy=None, mock_properties=None):
-    adb_proxy = adb_proxy or mock_android_device.MockAdbProxy(
-        instrumented_packages=[
-            (MOCK_PACKAGE_NAME,
-             snippet_client_v2._INSTRUMENTATION_RUNNER_PACKAGE,
-             MOCK_PACKAGE_NAME)
-        ],
-        mock_properties=mock_properties)
+    adb_proxy = adb_proxy or _MockAdbProxy(instrumented_packages=[
+    ],
+                                           mock_properties=mock_properties)
+    self.adb = adb_proxy
     device = mock.Mock()
     device.adb = adb_proxy
@@ -48,6 +109,7 @@
+    self.device = device
     self.client = snippet_client_v2.SnippetClientV2(MOCK_PACKAGE_NAME, device)
@@ -56,11 +118,208 @@
-  def _mock_server_process_starting_response(self, mock_start_subprocess,
-                                             resp_lines):
+  def _mock_server_process_starting_response(self,
+                                             mock_start_subprocess,
+                                             resp_lines=None):
+    resp_lines = resp_lines or [
+    ]
     mock_proc = mock_start_subprocess.return_value
     mock_proc.stdout.readline.side_effect = resp_lines
+  def _make_client_and_mock_socket_conn(self,
+                                        mock_socket_create_conn,
+                                        socket_resp=None,
+                                        device_port=MOCK_DEVICE_PORT,
+                                        adb_proxy=None,
+                                        mock_properties=None,
+                                        set_counter=True):
+    """Makes the snippet client and mocks the socket connection."""
+    self._make_client(adb_proxy, mock_properties)
+    if socket_resp is None:
+      socket_resp = [b'{"status": true, "uid": 1}']
+    self.mock_socket_file = _setup_mock_socket_file(mock_socket_create_conn,
+                                                    socket_resp)
+    self.client.device_port = device_port
+    self.socket_conn = mock_socket_create_conn.return_value
+    if set_counter:
+      self.client._counter = self.client._id_counter()
+  def _assert_client_resources_released(self, mock_start_subprocess,
+                                        mock_stop_standing_subprocess,
+                                        mock_get_port):
+    """Asserts the resources had been released before the client stopped."""
+    self.assertIs(self.client._proc, None)
+    self.adb.mock_shell_func.assert_any_call(
+        f'am instrument --user {MOCK_USER_ID} -w -e action stop '
+        f'{MOCK_SERVER_PATH}')
+    mock_stop_standing_subprocess.assert_called_once_with(
+        mock_start_subprocess.return_value)
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    self.socket_conn.close.assert_called_once_with()
+    self.assertIs(self.client.host_port, None)
+    self.adb.mock_forward_func.assert_any_call(
+        ['--remove', f'tcp:{mock_get_port.return_value}'])
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_the_whole_lifecycle_with_a_sync_rpc(self, mock_start_subprocess,
+                                               mock_stop_standing_subprocess,
+                                               mock_socket_create_conn,
+                                               mock_get_port):
+    """Tests the whole lifecycle of the client with sending a sync RPC."""
+    socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": null}',
+    ]
+    expected_socket_writes = [
+'{"cmd": "initiate", "uid": -1}\n'),
+'{"id": 0, "method": "some_sync_rpc", '
+                  b'"params": [1, 2, "hello"]}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           socket_resp,
+                                           set_counter=False)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.initialize()
+    rpc_result = self.client.some_sync_rpc(1, 2, 'hello')
+    self.client.stop()
+    self._assert_client_resources_released(mock_start_subprocess,
+                                           mock_stop_standing_subprocess,
+                                           mock_get_port)
+    self.assertListEqual(self.mock_socket_file.write.call_args_list,
+                         expected_socket_writes)
+    self.assertEqual(rpc_result, 123)
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.callback_handler.'
+              'CallbackHandler')
+  def test_the_whole_lifecycle_with_an_async_rpc(self, mock_callback_class,
+                                                 mock_start_subprocess,
+                                                 mock_stop_standing_subprocess,
+                                                 mock_socket_create_conn,
+                                                 mock_get_port):
+    """Tests the whole lifecycle of the client with sending an async RPC."""
+    mock_socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": "1-0"}',
+        b'{"status": true, "uid": 1}',
+    ]
+    expected_socket_writes = [
+'{"cmd": "initiate", "uid": -1}\n'),
+'{"id": 0, "method": "some_async_rpc", '
+                  b'"params": [1, 2, "async"]}\n'),
+'{"cmd": "continue", "uid": 1}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           mock_socket_resp,
+                                           set_counter=False)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.initialize()
+    rpc_result = self.client.some_async_rpc(1, 2, 'async')
+    self.client.stop()
+    self._assert_client_resources_released(mock_start_subprocess,
+                                           mock_stop_standing_subprocess,
+                                           mock_get_port)
+    self.assertListEqual(self.mock_socket_file.write.call_args_list,
+                         expected_socket_writes)
+    mock_callback_class.assert_called_with(
+        callback_id='1-0',
+        event_client=self.client._event_client,
+        ret_value=123,
+        method_name='some_async_rpc',
+        ad=self.device)
+    self.assertIs(rpc_result, mock_callback_class.return_value)
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.callback_handler.'
+              'CallbackHandler')
+  def test_the_whole_lifecycle_with_multiple_rpcs(self, mock_callback_class,
+                                                  mock_start_subprocess,
+                                                  mock_stop_standing_subprocess,
+                                                  mock_socket_create_conn,
+                                                  mock_get_port):
+    """Tests the whole lifecycle of the client with sending multiple RPCs."""
+    # Prepare the test
+    mock_socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": null}',
+        b'{"id": 1, "result": 456, "error": null, "callback": "1-0"}',
+        # Response for starting the event client
+        b'{"status": true, "uid": 1}',
+        b'{"id": 2, "result": 789, "error": null, "callback": null}',
+        b'{"id": 3, "result": 321, "error": null, "callback": "2-0"}',
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           mock_socket_resp,
+                                           set_counter=False)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    rpc_results_expected = [
+        123,
+        mock.Mock(),
+        789,
+        mock.Mock(),
+    ]
+    # Extract the two mock objects to use as return values of callback handler
+    # class
+    mock_callback_class.side_effect = [
+        rpc_results_expected[1], rpc_results_expected[3]
+    ]
+    # Run tests
+    rpc_results = []
+    self.client.initialize()
+    rpc_results.append(self.client.some_sync_rpc(1, 2, 'hello'))
+    rpc_results.append(self.client.some_async_rpc(3, 4, 'async'))
+    rpc_results.append(self.client.some_sync_rpc(5, 'hello'))
+    rpc_results.append(self.client.some_async_rpc(6, 'async'))
+    self.client.stop()
+    # Assertions
+    mock_callback_class_calls_expected = [
+                  event_client=self.client._event_client,
+                  ret_value=456,
+                  method_name='some_async_rpc',
+                  ad=self.device),
+                  event_client=self.client._event_client,
+                  ret_value=321,
+                  method_name='some_async_rpc',
+                  ad=self.device),
+    ]
+    self.assertListEqual(rpc_results, rpc_results_expected)
+    mock_callback_class.assert_has_calls(mock_callback_class_calls_expected)
+    self._assert_client_resources_released(mock_start_subprocess,
+                                           mock_stop_standing_subprocess,
+                                           mock_get_port)
   def test_check_app_installed_normally(self):
     """Tests that app checker runs normally when app installed correctly."""
@@ -68,16 +327,14 @@
   def test_check_app_installed_fail_app_not_installed(self):
     """Tests that app checker fails without installing app."""
-    self._make_client(mock_android_device.MockAdbProxy())
+    self._make_client(_MockAdbProxy())
     expected_msg = f'.* {MOCK_PACKAGE_NAME} is not installed.'
     with self.assertRaisesRegex(errors.ServerStartPreCheckError, expected_msg):
   def test_check_app_installed_fail_not_instrumented(self):
     """Tests that app checker fails without instrumenting app."""
-    self._make_client(
-        mock_android_device.MockAdbProxy(
-            installed_packages=[MOCK_PACKAGE_NAME]))
+    self._make_client(_MockAdbProxy(installed_packages=[MOCK_PACKAGE_NAME]))
     expected_msg = (
         f'.* {MOCK_PACKAGE_NAME} is installed, but it is not instrumented.')
     with self.assertRaisesRegex(errors.ServerStartPreCheckError, expected_msg):
@@ -86,7 +343,7 @@
   def test_check_app_installed_fail_instrumentation_not_installed(self):
     """Tests that app checker fails without installing instrumentation."""
-        mock_android_device.MockAdbProxy(instrumented_packages=[(
+        _MockAdbProxy(instrumented_packages=[(
@@ -94,53 +351,44 @@
     with self.assertRaisesRegex(errors.ServerStartPreCheckError, expected_msg):
-  @mock.patch.object(mock_android_device.MockAdbProxy, 'shell')
-  def test_disable_hidden_api_normally(self, mock_shell_func):
+  def test_disable_hidden_api_normally(self):
     """Tests the disabling hidden api process works normally."""
         '': 'S',
         '': '31',
-    self.client._device.is_rootable = True
+    self.device.is_rootable = True
-    mock_shell_func.assert_called_with(
+    self.adb.mock_shell_func.assert_called_with(
         'settings put global hidden_api_blacklist_exemptions "*"')
-  @mock.patch.object(mock_android_device.MockAdbProxy, 'shell')
-  def test_disable_hidden_api_low_sdk(self, mock_shell_func):
+  def test_disable_hidden_api_low_sdk(self):
     """Tests it doesn't disable hidden api with low SDK."""
         '': 'O',
         '': '26',
-    self.client._device.is_rootable = True
+    self.device.is_rootable = True
-    mock_shell_func.assert_not_called()
+    self.adb.mock_shell_func.assert_not_called()
-  @mock.patch.object(mock_android_device.MockAdbProxy, 'shell')
-  def test_disable_hidden_api_non_rootable(self, mock_shell_func):
+  def test_disable_hidden_api_non_rootable(self):
     """Tests it doesn't disable hidden api with non-rootable device."""
         '': 'S',
         '': '31',
-    self.client._device.is_rootable = False
+    self.device.is_rootable = False
-    mock_shell_func.assert_not_called()
+    self.adb.mock_shell_func.assert_not_called()
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'setsid')
+  @mock.patch.object(_MockAdbProxy, 'shell', return_value=b'setsid')
   def test_start_server_with_user_id(self, mock_adb, mock_start_subprocess):
     """Tests that `--user` is added to starting command with SDK >= 24."""
     self._make_client_with_extra_adb_properties({'': '30'})
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
     start_cmd_list = [
@@ -155,17 +403,11 @@
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'setsid')
+  @mock.patch.object(_MockAdbProxy, 'shell', return_value=b'setsid')
   def test_start_server_without_user_id(self, mock_adb, mock_start_subprocess):
     """Tests that `--user` is not added to starting command on SDK < 24."""
     self._make_client_with_extra_adb_properties({'': '21'})
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
     start_cmd_list = [
@@ -179,7 +421,7 @@
-  @mock.patch.object(mock_android_device.MockAdbProxy,
+  @mock.patch.object(_MockAdbProxy,
                      side_effect=adb.AdbError('cmd', 'stdout', 'stderr',
@@ -187,11 +429,7 @@
     """Checks the starting server command without persisting commands."""
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
     start_cmd_list = [
@@ -211,11 +449,7 @@
   def test_start_server_with_nohup(self, mock_start_subprocess):
     """Checks the starting server command with nohup."""
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
     def _mocked_shell(arg):
       if 'nohup' in arg:
@@ -239,11 +473,7 @@
   def test_start_server_with_setsid(self, mock_start_subprocess):
     """Checks the starting server command with setsid."""
-    self._mock_server_process_starting_response(
-        mock_start_subprocess,
-        resp_lines=[
-            b'SNIPPET START, PROTOCOL 1 234', b'SNIPPET SERVING, PORT 1234'
-        ])
+    self._mock_server_process_starting_response(mock_start_subprocess)
     def _mocked_shell(arg):
       if 'setsid' in arg:
@@ -337,57 +567,699 @@
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'OK (0 tests)')
-  def test_stop_server_normally(self, mock_android_device_shell,
-                                mock_stop_standing_subprocess):
+  def test_stop_normally(self, mock_stop_standing_subprocess):
     """Tests that stopping server process works normally."""
     mock_proc = mock.Mock()
     self.client._proc = mock_proc
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
     self.assertIs(self.client._proc, None)
-    mock_android_device_shell.assert_called_once_with(
+    self.adb.mock_shell_func.assert_called_once_with(
         f'am instrument --user {MOCK_USER_ID} -w -e action stop '
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    mock_conn.close.assert_called_once_with()
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'OK (0 tests)')
-  def test_stop_server_server_already_cleaned(self, mock_android_device_shell,
-                                              mock_stop_standing_subprocess):
-    """Tests stopping server process when subprocess is already cleaned."""
+  def test_stop_when_server_is_already_cleaned(self,
+                                               mock_stop_standing_subprocess):
+    """Tests that stop server process when subprocess is already cleaned."""
     self.client._proc = None
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
     self.assertIs(self.client._proc, None)
-    mock_android_device_shell.assert_called_once_with(
+    self.adb.assert_called_once_with(
         f'am instrument --user {MOCK_USER_ID} -w -e action stop '
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    mock_conn.close.assert_called_once_with()
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
-  @mock.patch.object(mock_android_device.MockAdbProxy,
-                     'shell',
-                     return_value=b'Closed with error.')
-  def test_stop_server_stop_with_error(self, mock_android_device_shell,
-                                       mock_stop_standing_subprocess):
-    """Tests all resources are cleaned even if stopping server has error."""
+  def test_stop_when_conn_is_already_cleaned(self,
+                                             mock_stop_standing_subprocess):
+    """Tests that stop server process when the connection is already closed."""
     mock_proc = mock.Mock()
     self.client._proc = mock_proc
+    self.client._conn = None
+    self.client.host_port = 12345
+    self.client.stop()
+    self.assertIs(self.client._proc, None)
+    mock_stop_standing_subprocess.assert_called_once_with(mock_proc)
+    self.adb.assert_called_once_with(
+        f'am instrument --user {MOCK_USER_ID} -w -e action stop '
+        f'{MOCK_SERVER_PATH}')
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  @mock.patch.object(_MockAdbProxy, 'shell', return_value=b'Closed with error.')
+  def test_stop_with_device_side_error(self, mock_adb_shell,
+                                       mock_stop_standing_subprocess):
+    """Tests all resources will be cleaned when server stop throws an error."""
+    self._make_client()
+    mock_proc = mock.Mock()
+    self.client._proc = mock_proc
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
     with self.assertRaisesRegex(android_device_lib_errors.DeviceError,
                                 'Closed with error'):
     self.assertIs(self.client._proc, None)
-    mock_android_device_shell.assert_called_once_with(
+    mock_adb_shell.assert_called_once_with(
         f'am instrument --user {MOCK_USER_ID} -w -e action stop '
+    self.assertFalse(self.client.is_alive)
+    self.assertIs(self.client._conn, None)
+    mock_conn.close.assert_called_once_with()
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+  @mock.patch('mobly.utils.stop_standing_subprocess')
+  def test_stop_with_conn_close_error(self, mock_stop_standing_subprocess):
+    """Tests port resource will be cleaned when socket close throws an error."""
+    del mock_stop_standing_subprocess
+    self._make_client()
+    mock_proc = mock.Mock()
+    self.client._proc = mock_proc
+    mock_conn = mock.Mock()
+    # The deconstructor will call this mock function again after tests, so
+    # only throw this error when it is called the first time.
+    mock_conn.close.side_effect = (OSError('Closed with error'), None)
+    self.client._conn = mock_conn
+    self.client.host_port = 12345
+    with self.assertRaisesRegex(OSError, 'Closed with error'):
+      self.client.stop()
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:12345'])
+  def test_close_connection_normally(self):
+    """Tests that closing connection works normally."""
+    self._make_client()
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = 123
+    self.client.close_connection()
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    mock_conn.close.assert_called_once_with()
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:123'])
+  def test_close_connection_when_host_port_has_been_released(self):
+    """Tests that close connection when the host port has been released."""
+    self._make_client()
+    mock_conn = mock.Mock()
+    self.client._conn = mock_conn
+    self.client.host_port = None
+    self.client.close_connection()
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    mock_conn.close.assert_called_once_with()
+    self.device.adb.mock_forward_func.assert_not_called()
+  def test_close_connection_when_conn_have_been_closed(self):
+    """Tests that close connection when the connection has been closed."""
+    self._make_client()
+    self.client._conn = None
+    self.client.host_port = 123
+    self.client.close_connection()
+    self.assertIs(self.client._conn, None)
+    self.assertIs(self.client.host_port, None)
+    self.device.adb.mock_forward_func.assert_called_once_with(
+        ['--remove', 'tcp:123'])
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_send_sync_rpc_normally(self, mock_start_subprocess,
+                                  mock_socket_create_conn):
+    """Tests that sending a sync RPC works normally."""
+    socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": null}',
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.make_connection()
+    rpc_result = self.client.some_rpc(1, 2, 'hello')
+    self.assertEqual(rpc_result, 123)
+    self.mock_socket_file.write.assert_called_with(
+        b'{"id": 0, "method": "some_rpc", "params": [1, 2, "hello"]}\n')
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.callback_handler.'
+              'CallbackHandler')
+  def test_async_rpc_start_event_client(self, mock_callback_class,
+                                        mock_start_subprocess,
+                                        mock_socket_create_conn):
+    """Tests that sending an async RPC starts the event client."""
+    socket_resp = [
+        b'{"status": true, "uid": 1}',
+        b'{"id": 0, "result": 123, "error": null, "callback": "1-0"}',
+        b'{"status": true, "uid": 1}',
+        b'{"id":1,"result":"async-rpc-event","callback":null,"error":null}',
+    ]
+    socket_write_expected = [
+'{"cmd": "initiate", "uid": -1}\n'),
+'{"id": 0, "method": "some_async_rpc", '
+                  b'"params": [1, 2, "hello"]}\n'),
+'{"cmd": "continue", "uid": 1}\n'),
+'{"id": 1, "method": "eventGetAll", '
+                  b'"params": ["1-0", "eventName"]}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           socket_resp,
+                                           set_counter=True)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.host_port = 12345
+    self.client.make_connection()
+    rpc_result = self.client.some_async_rpc(1, 2, 'hello')
+    mock_callback_class.assert_called_with(
+        callback_id='1-0',
+        event_client=self.client._event_client,
+        ret_value=123,
+        method_name='some_async_rpc',
+        ad=self.device)
+    self.assertIs(rpc_result, mock_callback_class.return_value)
+    # Ensure the event client is alive
+    self.assertTrue(self.client._event_client.is_alive)
+    # Ensure the event client shared the same ports and uid with main client
+    self.assertEqual(self.client._event_client.host_port, 12345)
+    self.assertEqual(self.client._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(self.client._event_client.uid, self.client.uid)
+    # Ensure the event client has reset its own RPC id counter
+    self.assertEqual(next(self.client._counter), 1)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+    # Ensure that event client can send RPCs
+    event_string = self.client._event_client.eventGetAll('1-0', 'eventName')
+    self.assertEqual(event_string, 'async-rpc-event')
+    self.assertListEqual(
+        self.mock_socket_file.write.call_args_list,
+        socket_write_expected,
+    )
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port')
+  def test_initialize_client_normally(self, mock_get_port,
+                                      mock_start_subprocess,
+                                      mock_socket_create_conn):
+    """Tests that initializing the client works normally."""
+    mock_get_port.return_value = 12345
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn,
+                                           socket_resp,
+                                           set_counter=True)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.initialize()
+    self.assertTrue(self.client.is_alive)
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._counter), 0)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port')
+  def test_restore_event_client(self, mock_get_port, mock_start_subprocess,
+                                mock_socket_create_conn):
+    """Tests restoring the event client."""
+    mock_get_port.return_value = 12345
+    socket_resp = [
+        # response of handshake when initializing the client
+        b'{"status": true, "uid": 1}',
+        # response of an async RPC
+        b'{"id": 0, "result": 123, "error": null, "callback": "1-0"}',
+        # response of starting event client
+        b'{"status": true, "uid": 1}',
+        # response of restoring server connection
+        b'{"status": true, "uid": 2}',
+        # response of restoring event client
+        b'{"status": true, "uid": 3}',
+        # response of restoring server connection
+        b'{"status": true, "uid": 4}',
+        # response of restoring event client
+        b'{"status": true, "uid": 5}',
+    ]
+    socket_write_expected = [
+        # request of handshake when initializing the client
+'{"cmd": "initiate", "uid": -1}\n'),
+        # request of an async RPC
+'{"id": 0, "method": "some_async_rpc", "params": []}\n'),
+        # request of starting event client
+'{"cmd": "continue", "uid": 1}\n'),
+        # request of restoring server connection
+'{"cmd": "initiate", "uid": -1}\n'),
+        # request of restoring event client
+'{"cmd": "initiate", "uid": -1}\n'),
+        # request of restoring server connection
+'{"cmd": "initiate", "uid": -1}\n'),
+        # request of restoring event client
+'{"cmd": "initiate", "uid": -1}\n'),
+    ]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.make_connection()
+    callback = self.client.some_async_rpc()
+    # before reconnect, clients use previously selected ports
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(callback._event_client.host_port, 12345)
+    self.assertEqual(callback._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+    # after reconnect, if host port specified, clients use specified port
+    self.client.restore_server_connection(port=54321)
+    self.assertEqual(self.client.host_port, 54321)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(callback._event_client.host_port, 54321)
+    self.assertEqual(callback._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+    # after reconnect, if host port not specified, clients use selected
+    # available port
+    mock_get_port.return_value = 56789
+    self.client.restore_server_connection()
+    self.assertEqual(self.client.host_port, 56789)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(callback._event_client.host_port, 56789)
+    self.assertEqual(callback._event_client.device_port, MOCK_DEVICE_PORT)
+    self.assertEqual(next(self.client._event_client._counter), 0)
+    # if unable to reconnect for any reason, a
+    # errors.ServerRestoreConnectionError is raised.
+    mock_socket_create_conn.side_effect = IOError('socket timed out')
+    with self.assertRaisesRegex(
+        errors.ServerRestoreConnectionError,
+        (f'Failed to restore server connection for {MOCK_PACKAGE_NAME} at '
+         f'host port 56789, device port {MOCK_DEVICE_PORT}')):
+      self.client.restore_server_connection()
+    self.assertListEqual(self.mock_socket_file.write.call_args_list,
+                         socket_write_expected)
+  @mock.patch.object(snippet_client_v2.SnippetClientV2, '_make_connection')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  def test_restore_server_connection_with_event_client(
+      self, mock_create_socket_conn_func, mock_send_handshake_func,
+      mock_make_connection):
+    """Tests restoring server connection when the event client is not None."""
+    self._make_client()
+    event_client = snippet_client_v2.SnippetClientV2('mock-package',
+                                                     mock.Mock())
+    self.client._event_client = event_client
+    self.client.device_port = 54321
+    self.client.uid = 5
+    self.client.restore_server_connection(port=12345)
+    mock_make_connection.assert_called_once_with()
+    self.assertEqual(event_client.host_port, 12345)
+    self.assertEqual(event_client.device_port, 54321)
+    self.assertEqual(next(event_client._counter), 0)
+    mock_create_socket_conn_func.assert_called_once_with()
+    mock_send_handshake_func.assert_called_once_with(
+        -1, snippet_client_v2.ConnectionHandshakeCommand.INIT)
+  @mock.patch('builtins.print')
+  def test_help_rpc_when_printing_by_default(self, mock_print):
+    """Tests the `help` method when it prints the output by default."""
+    self._make_client()
+    mock_rpc = mock.MagicMock()
+    self.client._rpc = mock_rpc
+    result =
+    mock_rpc.assert_called_once_with('help')
+    self.assertIsNone(result)
+    mock_print.assert_called_once_with(mock_rpc.return_value)
+  @mock.patch('builtins.print')
+  def test_help_rpc_when_not_printing(self, mock_print):
+    """Tests the `help` method when it was set not to print the output."""
+    self._make_client()
+    mock_rpc = mock.MagicMock()
+    self.client._rpc = mock_rpc
+    result =
+    mock_rpc.assert_called_once_with('help')
+    self.assertEqual(mock_rpc.return_value, result)
+    mock_print.assert_not_called()
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  def test_make_connection_normally(self, mock_get_port, mock_start_subprocess,
+                                    mock_socket_create_conn):
+    """Tests that making a connection works normally."""
+    del mock_get_port
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.adb.mock_forward_func.assert_called_once_with(
+        ['tcp:12345', f'tcp:{MOCK_DEVICE_PORT}'])
+    mock_socket_create_conn.assert_called_once_with(
+        ('localhost', 12345), snippet_client_v2._SOCKET_CONNECTION_TIMEOUT)
+    self.socket_conn.settimeout.assert_called_once_with(
+        snippet_client_v2._SOCKET_READ_TIMEOUT)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  def test_make_connection_with_preset_host_port(self, mock_get_port,
+                                                 mock_start_subprocess,
+                                                 mock_socket_create_conn):
+    """Tests that make a connection with the preset host port."""
+    del mock_get_port
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.host_port = 23456
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    # Test that the host port for forwarding is 23456 instead of 12345
+    self.adb.mock_forward_func.assert_called_once_with(
+        ['tcp:23456', f'tcp:{MOCK_DEVICE_PORT}'])
+    mock_socket_create_conn.assert_called_once_with(
+        ('localhost', 23456), snippet_client_v2._SOCKET_CONNECTION_TIMEOUT)
+    self.socket_conn.settimeout.assert_called_once_with(
+        snippet_client_v2._SOCKET_READ_TIMEOUT)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.get_available_host_port',
+              return_value=12345)
+  def test_make_connection_with_ip(self, mock_get_port, mock_start_subprocess,
+                                   mock_socket_create_conn):
+    """Tests that make a connection with instead of localhost."""
+    del mock_get_port
+    socket_resp = [b'{"status": true, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    mock_conn = mock_socket_create_conn.return_value
+    # Refuse creating socket connection with 'localhost', only accept
+    # '' as address
+    def _mock_create_conn_side_effect(address, *args, **kwargs):
+      del args, kwargs
+      if address[0] == '':
+        return mock_conn
+      raise ConnectionRefusedError(f'Refusing connection to {address[0]}.')
+    mock_socket_create_conn.side_effect = _mock_create_conn_side_effect
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, 1)
+    self.assertEqual(self.client.device_port, MOCK_DEVICE_PORT)
+    self.adb.mock_forward_func.assert_called_once_with(
+        ['tcp:12345', f'tcp:{MOCK_DEVICE_PORT}'])
+    mock_socket_create_conn.assert_any_call(
+        ('', 12345), snippet_client_v2._SOCKET_CONNECTION_TIMEOUT)
+    self.socket_conn.settimeout.assert_called_once_with(
+        snippet_client_v2._SOCKET_READ_TIMEOUT)
+  @mock.patch('socket.create_connection')
+  def test_make_connection_io_error(self, mock_socket_create_conn):
+    """Tests IOError occurred trying to create a socket connection."""
+    mock_socket_create_conn.side_effect = IOError()
+    with self.assertRaises(IOError):
+      self._make_client()
+      self.client.device_port = 123
+      self.client.make_connection()
+  @mock.patch('socket.create_connection')
+  def test_make_connection_timeout(self, mock_socket_create_conn):
+    """Tests timeout occurred trying to create a socket connection."""
+    mock_socket_create_conn.side_effect = socket.timeout
+    with self.assertRaises(socket.timeout):
+      self._make_client()
+      self.client.device_port = 123
+      self.client.make_connection()
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receives_none_handshake_response(
+      self, mock_start_subprocess, mock_socket_create_conn):
+    """Tests make_connection receives None as the handshake response."""
+    socket_resp = [None]
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    with self.assertRaisesRegex(
+        errors.ProtocolError, errors.ProtocolError.NO_RESPONSE_FROM_HANDSHAKE):
+      self.client.make_connection()
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receives_empty_handshake_response(
+      self, mock_start_subprocess, mock_socket_create_conn):
+    """Tests make_connection receives an empty handshake response."""
+    socket_resp = [b'']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    with self.assertRaisesRegex(
+        errors.ProtocolError, errors.ProtocolError.NO_RESPONSE_FROM_HANDSHAKE):
+      self.client.make_connection()
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receives_invalid_handshake_response(
+      self, mock_start_subprocess, mock_socket_create_conn):
+    """Tests make_connection receives an invalid handshake response."""
+    socket_resp = [b'{"status": false, "uid": 1}']
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn, socket_resp)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.make_connection()
+    self.assertEqual(self.client.uid, -1)
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_send_handshake_request_error(
+      self, mock_start_subprocess, mock_socket_create_conn):
+    """Tests that an error occurred trying to send a handshake request."""
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.mock_socket_file.write.side_effect = socket.error('Socket write error')
+    with self.assertRaisesRegex(errors.Error, 'Socket write error'):
+      self.client.make_connection()
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_receive_handshake_response_error(
+      self, mock_start_subprocess, mock_socket_create_conn):
+    """Tests that an error occurred trying to receive a handshake response."""
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.mock_socket_file.readline.side_effect = socket.error(
+        'Socket read error')
+    with self.assertRaisesRegex(errors.Error, 'Socket read error'):
+      self.client.make_connection()
+  @mock.patch('socket.create_connection')
+  @mock.patch('mobly.controllers.android_device_lib.snippet_client_v2.'
+              'utils.start_standing_subprocess')
+  def test_make_connection_decode_handshake_response_bytes_error(
+      self, mock_start_subprocess, mock_socket_create_conn):
+    """Tests that an error occurred trying to decode a handshake response."""
+    self._make_client_and_mock_socket_conn(mock_socket_create_conn)
+    self._mock_server_process_starting_response(mock_start_subprocess)
+    self.client.log = mock.Mock()
+    socket_response = bytes('{"status": false, "uid": 1}', encoding='cp037')
+    self.mock_socket_file.readline.side_effect = [socket_response]
+    with self.assertRaises(UnicodeError):
+      self.client.make_connection()
+    self.client.log.error.assert_has_calls([
+            'Failed to decode socket response bytes using encoding utf8: %s',
+            socket_response)
+    ])
+  def test_rpc_sending_and_receiving(self):
+    """Test RPC sending and receiving.
+    Tests that when sending and receiving an RPC the correct data is used.
+    """
+    self._make_client()
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    rpc_response_expected = ('{"id": 0, "result": 123, "error": null, '
+                             '"callback": null}')
+    socket_write_expected = [
+'{"id": 0, "method": "some_rpc", "params": []}\n')
+    ]
+    socket_response = (b'{"id": 0, "result": 123, "error": null, '
+                       b'"callback": null}')
+    mock_socket_file = mock.Mock()
+    mock_socket_file.readline.return_value = socket_response
+    self.client._client = mock_socket_file
+    rpc_response = self.client.send_rpc_request(rpc_request)
+    self.assertEqual(rpc_response, rpc_response_expected)
+    self.assertEqual(mock_socket_file.write.call_args_list,
+                     socket_write_expected)
+  def test_rpc_send_socket_write_error(self):
+    """Tests that an error occurred trying to write the socket file."""
+    self._make_client()
+    self.client._client = mock.Mock()
+    self.client._client.write.side_effect = socket.error('Socket write error')
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    with self.assertRaisesRegex(errors.Error, 'Socket write error'):
+      self.client.send_rpc_request(rpc_request)
+  def test_rpc_send_socket_read_error(self):
+    """Tests that an error occurred trying to read the socket file."""
+    self._make_client()
+    self.client._client = mock.Mock()
+    self.client._client.readline.side_effect = socket.error('Socket read error')
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    with self.assertRaisesRegex(errors.Error, 'Socket read error'):
+      self.client.send_rpc_request(rpc_request)
+  def test_rpc_send_decode_socket_response_bytes_error(self):
+    """Tests that an error occurred trying to decode the socket response."""
+    self._make_client()
+    self.client.log = mock.Mock()
+    self.client._client = mock.Mock()
+    socket_response = bytes(
+        '{"id": 0, "result": 123, "error": null, "callback": null}',
+        encoding='cp037')
+    self.client._client.readline.return_value = socket_response
+    rpc_request = '{"id": 0, "method": "some_rpc", "params": []}'
+    with self.assertRaises(UnicodeError):
+      self.client.send_rpc_request(rpc_request)
+    self.client.log.error.assert_has_calls([
+            'Failed to decode socket response bytes using encoding utf8: %s',
+            socket_response)
+    ])
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  def test_make_conn_with_forwarded_port_init(self,
+                                              mock_create_socket_conn_func,
+                                              mock_send_handshake_func):
+    """Tests make_connection_with_forwarded_port initiates a new session."""
+    self._make_client()
+    self.client._counter = None
+    self.client.make_connection_with_forwarded_port(12345, 54321)
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, 54321)
+    self.assertEqual(next(self.client._counter), 0)
+    mock_create_socket_conn_func.assert_called_once_with()
+    mock_send_handshake_func.assert_called_once_with(
+        -1, snippet_client_v2.ConnectionHandshakeCommand.INIT)
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'send_handshake_request')
+  @mock.patch.object(snippet_client_v2.SnippetClientV2,
+                     'create_socket_connection')
+  def test_make_conn_with_forwarded_port_continue(self,
+                                                  mock_create_socket_conn_func,
+                                                  mock_send_handshake_func):
+    """Tests make_connection_with_forwarded_port continues current session."""
+    self._make_client()
+    self.client._counter = None
+    self.client.make_connection_with_forwarded_port(
+        12345, 54321, 3, snippet_client_v2.ConnectionHandshakeCommand.CONTINUE)
+    self.assertEqual(self.client.host_port, 12345)
+    self.assertEqual(self.client.device_port, 54321)
+    self.assertEqual(next(self.client._counter), 0)
+    mock_create_socket_conn_func.assert_called_once_with()
+    mock_send_handshake_func.assert_called_once_with(
+        3, snippet_client_v2.ConnectionHandshakeCommand.CONTINUE)
 if __name__ == '__main__':