| #![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)] |
| use failure::Error; |
| use failure::err_msg; |
| use futures::channel::mpsc; |
| use futures::{Stream}; |
| use futures::task::Poll; |
| use futures::task::LocalWaker; |
| use core::pin::Pin; |
| use std::sync::Arc; |
| use parking_lot::RwLock; |
| use fuchsia_async as fasync; |
| |
| pub trait Actor { |
| type Message: Send + Sync; |
| |
| /* |
| Could have this be Envelope<Message>, to allow some standard headers. |
| |
| Could do it via message traits ala Actix What about types responders? |
| What do I send as the message target? Perhaps just route via oneshot channels? |
| */ |
| fn update(&mut self, msg: Self::Message, context: ActorContext<Self>); |
| // TODO - handle errors by returning a result? |
| // Then can terminate actors if they return an error? |
| // Or terminate the whole system? |
| // fn update(&mut self, msg: Message, system: System) -> Result<(),Error>; |
| } |
| |
| pub struct ActorContext<A: Actor + ?Sized> { |
| system: System, |
| handle: ActorHandle<A::Message>, |
| } |
| |
| impl<A: Actor + ?Sized> Clone for ActorContext<A> { |
| fn clone(&self) -> Self { |
| ActorContext{ system: self.system.clone(), handle: self.handle.clone() } |
| } |
| } |
| |
| struct ActorCell<A: Actor> { |
| actor: A, |
| inbox: mpsc::UnboundedReceiver<A::Message>, |
| cx: ActorContext<A> |
| } |
| |
| trait ActorProc { |
| fn run_next(&mut self, lw: &LocalWaker) -> Poll<Option<()>>; |
| } |
| |
| struct ActorStream { |
| inner: Box<dyn ActorProc> |
| } |
| |
| impl Stream for ActorStream { |
| type Item = (); |
| fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> { |
| Pin::get_mut(self).inner.run_next(lw) |
| } |
| } |
| |
| impl<A: Actor> ActorProc for ActorCell<A> { |
| fn run_next(&mut self, lw: &LocalWaker) -> Poll<Option<()>> { |
| let inbox = Pin::new(&mut self.inbox); |
| match inbox.poll_next(lw) { |
| Poll::Ready(Some(msg)) => Poll::Ready(Some(self.actor.update(msg, self.cx.clone()))), |
| Poll::Ready(None) => Poll::Ready(None), |
| Poll::Pending => Poll::Pending |
| } |
| } |
| } |
| |
| /* |
| An Actor System, which contains multiple actors which can all be |
| executed via a single executor. |
| |
| This should include being executed in a multi-threaded scenerio, in which case |
| we want to ensure that each actor is only executed on a single thread at a time. |
| |
| * In theory each actor being processed is a stream of incoming messages |
| * for_each-ing the stream gives a future per actor |
| * which should become a task per actor in the executor |
| * The system can just for-each each actor and then join the futures |
| * But what about dynamic spawning? |
| * With a handle to the system, actors can spawn more actors |
| * |
| |
| */ |
| |
| #[derive(Clone)] |
| pub struct System { |
| inner: Arc<RwLock<SystemInner>> |
| } |
| |
| impl System { |
| pub fn spawn<A: Actor + 'static>(&mut self, actor: A) -> ActorHandle<A::Message> { |
| let (sender, inbox) = mpsc::unbounded::<A::Message>(); |
| let cx = ActorContext{ system: self.clone(), handle: ActorHandle{ channel: sender.clone() }}; |
| let dyn_actor: Box<dyn ActorProc> = Box::new(ActorCell{ actor, inbox, cx }); |
| self.inner.write().spawn(dyn_actor); |
| ActorHandle{ channel: sender } |
| } |
| |
| pub fn run(self, executor: fasync::Executor) -> Result<(),Error> { |
| self.inner.write().run(executor) |
| } |
| |
| pub fn new() -> System { |
| System { inner: Arc::new(RwLock::new(SystemInner::new())) } |
| } |
| } |
| |
| struct SystemInner { |
| actors: Vec<ActorStream>, |
| executor: Option<fasync::Executor> |
| } |
| |
| impl SystemInner { |
| fn new() -> SystemInner { |
| SystemInner { |
| actors: vec![], |
| executor: None |
| } |
| } |
| fn spawn(&mut self, actor: Box<dyn ActorProc>) { |
| /* |
| if let Some(executor) = self.executor { |
| executor.spawn(runActor(a, self)); |
| } |
| */ |
| self.actors.push(ActorStream{ inner: actor }) |
| } |
| |
| fn run(&mut self, executor: fasync::Executor) -> Result<(),Error> { |
| if self.executor.is_some() { |
| return Err(err_msg("Actor System is already running")); |
| } |
| self.executor = Some(executor); |
| if let Some(ref _ex) = self.executor { |
| for _a in self.actors.iter() { |
| //ex.spawn(runActor(a, self)); |
| } |
| //ex.run_until_stalled(); |
| } |
| Ok(()) |
| } |
| } |
| |
| /* |
| fn main() { |
| println!("Hello, world!"); |
| |
| let mut system = System::new(); |
| let hello = system.spawn(Hello{}); |
| let executor = Executor{}; |
| let _ = hello.unbounded_send(String::from("World")); |
| let _ = system.run(executor); |
| } |
| */ |
| |
| struct Hello {} |
| |
| impl Actor for Hello { |
| type Message = String; |
| |
| fn update(&mut self, msg: Self::Message, _cx: ActorContext<Hello>) { |
| println!("Hello, {}", msg); |
| } |
| } |
| |
| /* |
| struct Executor { |
| } |
| |
| impl Executor { |
| fn run_until_stalled(&self) {} |
| } |
| */ |
| |
| /// A mailbox that can receive messages of type `Msg` |
| pub struct ActorHandle<Msg> { |
| channel: mpsc::UnboundedSender<Msg> |
| } |
| |
| impl<Msg: Send + Sync> ActorHandle<Msg> { |
| pub fn send(&mut self, message: Msg) { |
| self.channel.unbounded_send(message); |
| } |
| } |
| |
| impl<Msg> Clone for ActorHandle<Msg> { |
| fn clone(&self) -> ActorHandle<Msg> { |
| ActorHandle{ channel: self.channel.clone() } |
| } |
| } |
| |