blob: 5af26a562cddb43c0d0a665e5dd29edf2c86a88c [file] [log] [blame]
#![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)]
use failure::Error;
use failure::err_msg;
use futures::channel::mpsc;
use futures::{Stream};
use futures::task::Poll;
use futures::task::LocalWaker;
use core::pin::Pin;
use std::sync::Arc;
use parking_lot::RwLock;
use fuchsia_async as fasync;
pub trait Actor {
type Message: Send + Sync;
/*
Could have this be Envelope<Message>, to allow some standard headers.
Could do it via message traits ala Actix What about types responders?
What do I send as the message target? Perhaps just route via oneshot channels?
*/
fn update(&mut self, msg: Self::Message, context: ActorContext<Self>);
// TODO - handle errors by returning a result?
// Then can terminate actors if they return an error?
// Or terminate the whole system?
// fn update(&mut self, msg: Message, system: System) -> Result<(),Error>;
}
pub struct ActorContext<A: Actor + ?Sized> {
system: System,
handle: ActorHandle<A::Message>,
}
impl<A: Actor + ?Sized> Clone for ActorContext<A> {
fn clone(&self) -> Self {
ActorContext{ system: self.system.clone(), handle: self.handle.clone() }
}
}
struct ActorCell<A: Actor> {
actor: A,
inbox: mpsc::UnboundedReceiver<A::Message>,
cx: ActorContext<A>
}
trait ActorProc {
fn run_next(&mut self, lw: &LocalWaker) -> Poll<Option<()>>;
}
struct ActorStream {
inner: Box<dyn ActorProc>
}
impl Stream for ActorStream {
type Item = ();
fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Option<Self::Item>> {
Pin::get_mut(self).inner.run_next(lw)
}
}
impl<A: Actor> ActorProc for ActorCell<A> {
fn run_next(&mut self, lw: &LocalWaker) -> Poll<Option<()>> {
let inbox = Pin::new(&mut self.inbox);
match inbox.poll_next(lw) {
Poll::Ready(Some(msg)) => Poll::Ready(Some(self.actor.update(msg, self.cx.clone()))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending
}
}
}
/*
An Actor System, which contains multiple actors which can all be
executed via a single executor.
This should include being executed in a multi-threaded scenerio, in which case
we want to ensure that each actor is only executed on a single thread at a time.
* In theory each actor being processed is a stream of incoming messages
* for_each-ing the stream gives a future per actor
* which should become a task per actor in the executor
* The system can just for-each each actor and then join the futures
* But what about dynamic spawning?
* With a handle to the system, actors can spawn more actors
*
*/
#[derive(Clone)]
pub struct System {
inner: Arc<RwLock<SystemInner>>
}
impl System {
pub fn spawn<A: Actor + 'static>(&mut self, actor: A) -> ActorHandle<A::Message> {
let (sender, inbox) = mpsc::unbounded::<A::Message>();
let cx = ActorContext{ system: self.clone(), handle: ActorHandle{ channel: sender.clone() }};
let dyn_actor: Box<dyn ActorProc> = Box::new(ActorCell{ actor, inbox, cx });
self.inner.write().spawn(dyn_actor);
ActorHandle{ channel: sender }
}
pub fn run(self, executor: fasync::Executor) -> Result<(),Error> {
self.inner.write().run(executor)
}
pub fn new() -> System {
System { inner: Arc::new(RwLock::new(SystemInner::new())) }
}
}
struct SystemInner {
actors: Vec<ActorStream>,
executor: Option<fasync::Executor>
}
impl SystemInner {
fn new() -> SystemInner {
SystemInner {
actors: vec![],
executor: None
}
}
fn spawn(&mut self, actor: Box<dyn ActorProc>) {
/*
if let Some(executor) = self.executor {
executor.spawn(runActor(a, self));
}
*/
self.actors.push(ActorStream{ inner: actor })
}
fn run(&mut self, executor: fasync::Executor) -> Result<(),Error> {
if self.executor.is_some() {
return Err(err_msg("Actor System is already running"));
}
self.executor = Some(executor);
if let Some(ref _ex) = self.executor {
for _a in self.actors.iter() {
//ex.spawn(runActor(a, self));
}
//ex.run_until_stalled();
}
Ok(())
}
}
/*
fn main() {
println!("Hello, world!");
let mut system = System::new();
let hello = system.spawn(Hello{});
let executor = Executor{};
let _ = hello.unbounded_send(String::from("World"));
let _ = system.run(executor);
}
*/
struct Hello {}
impl Actor for Hello {
type Message = String;
fn update(&mut self, msg: Self::Message, _cx: ActorContext<Hello>) {
println!("Hello, {}", msg);
}
}
/*
struct Executor {
}
impl Executor {
fn run_until_stalled(&self) {}
}
*/
/// A mailbox that can receive messages of type `Msg`
pub struct ActorHandle<Msg> {
channel: mpsc::UnboundedSender<Msg>
}
impl<Msg: Send + Sync> ActorHandle<Msg> {
pub fn send(&mut self, message: Msg) {
self.channel.unbounded_send(message);
}
}
impl<Msg> Clone for ActorHandle<Msg> {
fn clone(&self) -> ActorHandle<Msg> {
ActorHandle{ channel: self.channel.clone() }
}
}