// Copyright (c) 2016 The Rouille developers | |
// Licensed under the Apache License, Version 2.0 | |
// <LICENSE-APACHE or | |
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT | |
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, | |
// at your option. All files in the project carrying such | |
// notice may not be copied, modified, or distributed except | |
// according to those terms. | |
use std::io; | |
use std::io::Write; | |
use std::mem; | |
use std::sync::mpsc::Sender; | |
use ReadWrite; | |
use Upgrade; | |
use websocket::low_level; | |
/// A successful websocket. An open channel of communication. Implements `Read` and `Write`. | |
pub struct Websocket { | |
// The socket. `None` if closed. | |
socket: Option<Box<ReadWrite + Send>>, | |
// The websocket state machine. | |
state_machine: low_level::StateMachine, | |
// True if the fragmented message currently being processed is binary. False if string. Pings | |
// are excluded. | |
current_message_binary: bool, | |
// Buffer for the fragmented message currently being processed. Pings are excluded. | |
current_message_payload: Vec<u8>, | |
// Opcode of the fragment currently being processed. | |
current_frame_opcode: u8, | |
// Fin flag of the fragment currently being processed. | |
current_frame_fin: bool, | |
// Data of the fragment currently being processed. | |
current_frame_payload: Vec<u8>, | |
// Queue of the messages that are going to be returned by `next()`. | |
messages_in_queue: Vec<Message>, | |
} | |
/// A message produced by a websocket connection. | |
#[derive(Debug, Clone, PartialEq, Eq)] | |
pub enum Message { | |
/// Text data. If the client is in Javascript, this happens when the client called `send()` | |
/// with a string. | |
Text(String), | |
/// Binary data. If the client is in Javascript, this happens when the client called `send()` | |
/// with a blob or an arraybuffer. | |
Binary(Vec<u8>), | |
} | |
/// Error that can happen when sending a message to the client. | |
#[derive(Debug)] | |
pub enum SendError { | |
/// Failed to transfer the message on the socket. | |
IoError(io::Error), | |
/// The websocket connection is closed. | |
Closed, | |
} | |
impl From<io::Error> for SendError { | |
#[inline] | |
fn from(err: io::Error) -> SendError { | |
SendError::IoError(err) | |
} | |
} | |
impl Websocket { | |
/// Sends text data over the websocket. | |
/// | |
/// Returns an error if the message didn't send correctly or if the connection is closed. | |
/// | |
/// If the client is in javascript, the message will contain a string. | |
#[inline] | |
pub fn send_text(&mut self, data: &str) -> Result<(), SendError> { | |
let socket = match self.socket { | |
Some(ref mut s) => s, | |
None => return Err(SendError::Closed), | |
}; | |
try!(send(data.as_bytes(), Write::by_ref(socket), 0x1)); | |
Ok(()) | |
} | |
/// Sends binary data over the websocket. | |
/// | |
/// Returns an error if the message didn't send correctly or if the connection is closed. | |
/// | |
/// If the client is in javascript, the message will contain a blob or an arraybuffer. | |
#[inline] | |
pub fn send_binary(&mut self, data: &[u8]) -> Result<(), SendError> { | |
let socket = match self.socket { | |
Some(ref mut s) => s, | |
None => return Err(SendError::Closed), | |
}; | |
try!(send(data, Write::by_ref(socket), 0x2)); | |
Ok(()) | |
} | |
/// Returns `true` if the websocket has been closed by either the client (voluntarily or not) | |
/// or by the server (if the websocket protocol was violated). | |
#[inline] | |
pub fn is_closed(&self) -> bool { | |
self.socket.is_none() | |
} | |
// TODO: give access to close reason | |
} | |
impl Upgrade for Sender<Websocket> { | |
fn build(&mut self, socket: Box<ReadWrite + Send>) { | |
let websocket = Websocket { | |
socket: Some(socket), | |
state_machine: low_level::StateMachine::new(), | |
current_message_binary: false, | |
current_message_payload: Vec::new(), | |
current_frame_opcode: 0, | |
current_frame_fin: false, | |
current_frame_payload: Vec::new(), | |
messages_in_queue: Vec::new(), | |
}; | |
let _ = self.send(websocket); | |
} | |
} | |
impl Iterator for Websocket { | |
type Item = Message; | |
fn next(&mut self) -> Option<Message> { | |
loop { | |
// If the socket is `None`, the connection has been closed. | |
if self.socket.is_none() { | |
return None; | |
} | |
// There may be some messages waiting to be processed. | |
if !self.messages_in_queue.is_empty() { | |
return Some(self.messages_in_queue.remove(0)); | |
} | |
// Read `n` bytes in `buf`. | |
let mut buf = [0; 256]; | |
let n = match self.socket.as_mut().unwrap().read(&mut buf) { | |
Ok(n) => n, | |
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => 0, | |
Err(_) => { | |
self.socket = None; | |
return None; | |
}, | |
}; | |
// Fill `messages_in_queue` by analyzing the packets. | |
for element in self.state_machine.feed(&buf[0 .. n]) { | |
match element { | |
low_level::Element::FrameStart { fin, opcode, .. } => { | |
debug_assert!(self.current_frame_payload.is_empty()); | |
self.current_frame_fin = fin; | |
self.current_frame_opcode = opcode; | |
}, | |
low_level::Element::Data { data, last_in_frame } => { | |
// Under normal circumstances we just handle data by pushing it to | |
// `current_frame_payload`. | |
self.current_frame_payload.extend(data); | |
// But if the frame is finished we additionnally need to dispatch it. | |
if last_in_frame { | |
match self.current_frame_opcode { | |
// Frame is a continuation of the current message. | |
0x0 => { | |
self.current_message_payload.append(&mut self.current_frame_payload); | |
// If the message is finished, dispatch it. | |
if self.current_frame_fin { | |
let binary = mem::replace(&mut self.current_message_payload, Vec::new()); | |
if self.current_message_binary { | |
self.messages_in_queue.push(Message::Binary(binary)); | |
} else { | |
let string = match String::from_utf8(binary) { | |
Ok(s) => s, | |
Err(_) => { | |
// Closing connection because text wasn't UTF-8 | |
let _ = send(b"1007 Invalid UTF-8 encoding", | |
Write::by_ref(self.socket.as_mut().unwrap()), 0x8); | |
self.socket = None; | |
return None; | |
}, | |
}; | |
self.messages_in_queue.push(Message::Text(string)); | |
} | |
} | |
}, | |
// Frame is an individual text frame. | |
0x1 => { | |
// If we're in the middle of a message, this frame is invalid | |
// and we need to close. | |
if !self.current_message_payload.is_empty() { | |
let _ = send(b"1002 Expected continuation frame", | |
Write::by_ref(self.socket.as_mut().unwrap()), 0x8); | |
self.socket = None; | |
return None; | |
} | |
if self.current_frame_fin { | |
// There's only one frame in this message. | |
let binary = mem::replace(&mut self.current_frame_payload, | |
Vec::new()); | |
let string = match String::from_utf8(binary) { | |
Ok(s) => s, | |
Err(_err) => { | |
// Closing connection because text wasn't UTF-8 | |
let _ = send(b"1007 Invalid UTF-8 encoding", | |
Write::by_ref(self.socket.as_mut().unwrap()), | |
0x8); | |
self.socket = None; | |
return None; | |
}, | |
}; | |
self.messages_in_queue.push(Message::Text(string)); | |
} else { | |
// Start of a fragmented message. | |
self.current_message_binary = false; | |
self.current_message_payload.append(&mut self.current_frame_payload); | |
} | |
}, | |
// Frame is an individual binary frame. | |
0x2 => { | |
// If we're in the middle of a message, this frame is invalid | |
// and we need to close. | |
if !self.current_message_payload.is_empty() { | |
let _ = send(b"1002 Expected continuation frame", | |
Write::by_ref(self.socket.as_mut().unwrap()), 0x8); | |
self.socket = None; | |
return None; | |
} | |
if self.current_frame_fin { | |
let binary = mem::replace(&mut self.current_frame_payload, | |
Vec::new()); | |
self.messages_in_queue.push(Message::Binary(binary)); | |
} else { | |
// Start of a fragmented message. | |
self.current_message_binary = true; | |
self.current_message_payload.append(&mut self.current_frame_payload); | |
} | |
}, | |
// Close request. | |
0x8 => { | |
// We need to send a confirmation. | |
let _ = send(&self.current_frame_payload, | |
Write::by_ref(self.socket.as_mut().unwrap()), 0x8); | |
// Since the packets are always received in order, and since | |
// the server is considered dead as soon as it sends the | |
// confirmation, we have no risk of losing packets. | |
self.socket = None; | |
return None; | |
}, | |
// Ping. | |
0x9 => { | |
// Send the pong. | |
let _ = send(&self.current_frame_payload, | |
Write::by_ref(self.socket.as_mut().unwrap()), 0xA); | |
}, | |
// Pong. We ignore this as there's nothing to do. | |
0xA => {}, | |
// Unknown opcode means error and close. | |
_ => { | |
let _ = send(b"Unknown opcode", | |
Write::by_ref(self.socket.as_mut().unwrap()), 0x8); | |
self.socket = None; | |
return None; | |
}, | |
} | |
self.current_frame_payload.clear(); | |
} | |
}, | |
low_level::Element::Error { desc } => { | |
// The low level layer signaled an error. Sending it to client and closing. | |
let _ = send(desc.as_bytes(), Write::by_ref(self.socket.as_mut().unwrap()), 0x8); | |
self.socket = None; | |
return None; | |
}, | |
} | |
} | |
} | |
} | |
} | |
// Sends a mesage to a websocket. | |
// TODO: message fragmentation? | |
fn send<W: Write>(data: &[u8], mut dest: W, opcode: u8) -> io::Result<()> { | |
// Write the opcode | |
assert!(opcode <= 0xf); | |
let first_byte = 0x80 | opcode; | |
try!(dest.write_all(&[first_byte])); | |
// Write the length | |
if data.len() >= 65536 { | |
try!(dest.write_all(&[127u8])); | |
let len = data.len() as u64; | |
assert!(len < 0x8000_0000_0000_0000); | |
let len1 = (len >> 56) as u8; | |
let len2 = (len >> 48) as u8; | |
let len3 = (len >> 40) as u8; | |
let len4 = (len >> 32) as u8; | |
let len5 = (len >> 24) as u8; | |
let len6 = (len >> 16) as u8; | |
let len7 = (len >> 8) as u8; | |
let len8 = (len >> 0) as u8; | |
try!(dest.write_all(&[len1, len2, len3, len4, len5, len6, len7, len8])); | |
} else if data.len() >= 126 { | |
try!(dest.write_all(&[126u8])); | |
let len = data.len() as u16; | |
let len1 = (len >> 8) as u8; | |
let len2 = len as u8; | |
try!(dest.write_all(&[len1, len2])); | |
} else { | |
try!(dest.write_all(&[data.len() as u8])); | |
} | |
// Write the data | |
try!(dest.write_all(data)); | |
try!(dest.flush()); | |
Ok(()) | |
} |