| # Copyright 2020 The gRPC Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from concurrent.futures import ThreadPoolExecutor |
| import logging |
| import threading |
| from typing import Iterator |
| |
| import grpc |
| |
| import phone_pb2 |
| import phone_pb2_grpc |
| |
| |
| class CallMaker: |
| def __init__( |
| self, |
| executor: ThreadPoolExecutor, |
| channel: grpc.Channel, |
| phone_number: str, |
| ) -> None: |
| self._executor = executor |
| self._channel = channel |
| self._stub = phone_pb2_grpc.PhoneStub(self._channel) |
| self._phone_number = phone_number |
| self._session_id = None |
| self._audio_session_link = None |
| self._call_state = None |
| self._peer_responded = threading.Event() |
| self._call_finished = threading.Event() |
| self._consumer_future = None |
| |
| def _response_watcher( |
| self, response_iterator: Iterator[phone_pb2.StreamCallResponse] |
| ) -> None: |
| try: |
| for response in response_iterator: |
| # NOTE: All fields in Proto3 are optional. This is the recommended way |
| # to check if a field is present or not, or to exam which one-of field is |
| # fulfilled by this message. |
| if response.HasField("call_info"): |
| self._on_call_info(response.call_info) |
| elif response.HasField("call_state"): |
| self._on_call_state(response.call_state.state) |
| else: |
| raise RuntimeError( |
| "Received StreamCallResponse without call_info and" |
| " call_state" |
| ) |
| except Exception as e: |
| self._peer_responded.set() |
| raise |
| |
| def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None: |
| self._session_id = call_info.session_id |
| self._audio_session_link = call_info.media |
| |
| def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None: |
| logging.info( |
| "Call toward [%s] enters [%s] state", |
| self._phone_number, |
| phone_pb2.CallState.State.Name(call_state), |
| ) |
| self._call_state = call_state |
| if call_state == phone_pb2.CallState.State.ACTIVE: |
| self._peer_responded.set() |
| if call_state == phone_pb2.CallState.State.ENDED: |
| self._peer_responded.set() |
| self._call_finished.set() |
| |
| def call(self) -> None: |
| request = phone_pb2.StreamCallRequest() |
| request.phone_number = self._phone_number |
| response_iterator = self._stub.StreamCall(iter((request,))) |
| # Instead of consuming the response on current thread, spawn a consumption thread. |
| self._consumer_future = self._executor.submit( |
| self._response_watcher, response_iterator |
| ) |
| |
| def wait_peer(self) -> bool: |
| logging.info("Waiting for peer to connect [%s]...", self._phone_number) |
| self._peer_responded.wait(timeout=None) |
| if self._consumer_future.done(): |
| # If the future raises, forwards the exception here |
| self._consumer_future.result() |
| return self._call_state == phone_pb2.CallState.State.ACTIVE |
| |
| def audio_session(self) -> None: |
| assert self._audio_session_link is not None |
| logging.info("Consuming audio resource [%s]", self._audio_session_link) |
| self._call_finished.wait(timeout=None) |
| logging.info("Audio session finished [%s]", self._audio_session_link) |
| |
| |
| def process_call( |
| executor: ThreadPoolExecutor, channel: grpc.Channel, phone_number: str |
| ) -> None: |
| call_maker = CallMaker(executor, channel, phone_number) |
| call_maker.call() |
| if call_maker.wait_peer(): |
| call_maker.audio_session() |
| logging.info("Call finished!") |
| else: |
| logging.info("Call failed: peer didn't answer") |
| |
| |
| def run(): |
| executor = ThreadPoolExecutor() |
| with grpc.insecure_channel("localhost:50051") as channel: |
| future = executor.submit( |
| process_call, executor, channel, "555-0100-XXXX" |
| ) |
| future.result() |
| |
| |
| if __name__ == "__main__": |
| logging.basicConfig(level=logging.INFO) |
| run() |