#![cfg(all(unix, feature = "os-poll", feature = "uds"))]

use mio::net::UnixStream;
use mio::{Interest, Token};
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::Shutdown;
use std::os::unix::net;
use std::path::Path;
use std::sync::mpsc::channel;
use std::sync::{Arc, Barrier};
use std::thread;

#[macro_use]
mod util;
use util::{
    assert_send, assert_socket_close_on_exec, assert_socket_non_blocking, assert_sync,
    assert_would_block, expect_events, expect_no_events, init, init_with_poll, temp_file,
    ExpectEvent, Readiness,
};

const DATA1: &[u8] = b"Hello same host!";
const DATA2: &[u8] = b"Why hello mio!";
const DATA1_LEN: usize = 16;
const DATA2_LEN: usize = 14;
const DEFAULT_BUF_SIZE: usize = 64;
const TOKEN_1: Token = Token(0);
const TOKEN_2: Token = Token(1);

#[test]
fn unix_stream_send_and_sync() {
    assert_send::<UnixStream>();
    assert_sync::<UnixStream>();
}

#[test]
fn unix_stream_smoke() {
    #[allow(clippy::redundant_closure)]
    smoke_test(|path| UnixStream::connect(path), "unix_stream_smoke");
}

#[test]
fn unix_stream_connect() {
    let (mut poll, mut events) = init_with_poll();
    let barrier = Arc::new(Barrier::new(2));
    let path = temp_file("unix_stream_connect");

    let listener = net::UnixListener::bind(path.clone()).unwrap();
    let mut stream = UnixStream::connect(path).unwrap();

    let barrier_clone = barrier.clone();
    let handle = thread::spawn(move || {
        let (stream, _) = listener.accept().unwrap();
        barrier_clone.wait();
        drop(stream);
    });

    poll.registry()
        .register(
            &mut stream,
            TOKEN_1,
            Interest::READABLE | Interest::WRITABLE,
        )
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    barrier.wait();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)],
    );

    handle.join().unwrap();
}

#[test]
fn unix_stream_from_std() {
    smoke_test(
        |path| {
            let local = net::UnixStream::connect(path).unwrap();
            // `std::os::unix::net::UnixStream`s are blocking by default, so make sure
            // it is in non-blocking mode before wrapping in a Mio equivalent.
            local.set_nonblocking(true).unwrap();
            Ok(UnixStream::from_std(local))
        },
        "unix_stream_from_std",
    )
}

#[test]
fn unix_stream_pair() {
    let (mut poll, mut events) = init_with_poll();

    let (mut s1, mut s2) = UnixStream::pair().unwrap();
    poll.registry()
        .register(&mut s1, TOKEN_1, Interest::READABLE | Interest::WRITABLE)
        .unwrap();
    poll.registry()
        .register(&mut s2, TOKEN_2, Interest::READABLE | Interest::WRITABLE)
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    let mut buf = [0; DEFAULT_BUF_SIZE];
    assert_would_block(s1.read(&mut buf));

    checked_write!(s1.write(&DATA1));
    s1.flush().unwrap();

    expect_read!(s2.read(&mut buf), DATA1);
    assert_would_block(s2.read(&mut buf));

    checked_write!(s2.write(&DATA2));
    s2.flush().unwrap();

    expect_read!(s1.read(&mut buf), DATA2);
    assert_would_block(s2.read(&mut buf));
}

#[test]
fn unix_stream_peer_addr() {
    init();
    let (handle, expected_addr) = new_echo_listener(1, "unix_stream_peer_addr");
    let expected_path = expected_addr.as_pathname().expect("failed to get pathname");

    let stream = UnixStream::connect(expected_path).unwrap();

    assert_eq!(
        stream.peer_addr().unwrap().as_pathname().unwrap(),
        expected_path
    );
    assert!(stream.local_addr().unwrap().as_pathname().is_none());

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

#[test]
fn unix_stream_shutdown_read() {
    let (mut poll, mut events) = init_with_poll();
    let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_read");
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = UnixStream::connect(path).unwrap();
    poll.registry()
        .register(
            &mut stream,
            TOKEN_1,
            Interest::READABLE.add(Interest::WRITABLE),
        )
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    checked_write!(stream.write(&DATA1));
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)],
    );

    stream.shutdown(Shutdown::Read).unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Readiness::READ_CLOSED)],
    );

    // Shutting down the reading side is different on each platform. For example
    // on Linux based systems we can still read.
    #[cfg(any(
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "ios",
        target_os = "macos",
        target_os = "netbsd",
        target_os = "openbsd"
    ))]
    {
        let mut buf = [0; DEFAULT_BUF_SIZE];
        expect_read!(stream.read(&mut buf), &[]);
    }

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

#[test]
fn unix_stream_shutdown_write() {
    let (mut poll, mut events) = init_with_poll();
    let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_write");
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = UnixStream::connect(path).unwrap();
    poll.registry()
        .register(
            &mut stream,
            TOKEN_1,
            Interest::WRITABLE.add(Interest::READABLE),
        )
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    checked_write!(stream.write(&DATA1));
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)],
    );

    stream.shutdown(Shutdown::Write).unwrap();

    #[cfg(any(
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "ios",
        target_os = "macos",
        target_os = "netbsd",
        target_os = "openbsd"
    ))]
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Readiness::WRITE_CLOSED)],
    );

    let err = stream.write(DATA2).unwrap_err();
    assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);

    // Read should be ok
    let mut buf = [0; DEFAULT_BUF_SIZE];
    expect_read!(stream.read(&mut buf), DATA1);

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

#[test]
fn unix_stream_shutdown_both() {
    let (mut poll, mut events) = init_with_poll();
    let (handle, remote_addr) = new_echo_listener(1, "unix_stream_shutdown_both");
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = UnixStream::connect(path).unwrap();
    poll.registry()
        .register(
            &mut stream,
            TOKEN_1,
            Interest::WRITABLE.add(Interest::READABLE),
        )
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    checked_write!(stream.write(&DATA1));
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)],
    );

    stream.shutdown(Shutdown::Both).unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Readiness::WRITE_CLOSED)],
    );

    // Shutting down the reading side is different on each platform. For example
    // on Linux based systems we can still read.
    #[cfg(any(
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "ios",
        target_os = "macos",
        target_os = "netbsd",
        target_os = "openbsd"
    ))]
    {
        let mut buf = [0; DEFAULT_BUF_SIZE];
        expect_read!(stream.read(&mut buf), &[]);
    }

    let err = stream.write(DATA2).unwrap_err();
    #[cfg(unix)]
    assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
    #[cfg(window)]
    assert_eq!(err.kind(), io::ErrorKind::ConnectionAbroted);

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

#[test]
fn unix_stream_shutdown_listener_write() {
    let (mut poll, mut events) = init_with_poll();
    let barrier = Arc::new(Barrier::new(2));
    let (handle, remote_addr) =
        new_noop_listener(1, barrier.clone(), "unix_stream_shutdown_listener_write");
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = UnixStream::connect(path).unwrap();
    poll.registry()
        .register(
            &mut stream,
            TOKEN_1,
            Interest::READABLE.add(Interest::WRITABLE),
        )
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    barrier.wait();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Readiness::READ_CLOSED)],
    );

    barrier.wait();
    handle.join().unwrap();
}

#[test]
fn unix_stream_register() {
    let (mut poll, mut events) = init_with_poll();
    let (handle, remote_addr) = new_echo_listener(1, "unix_stream_register");
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = UnixStream::connect(path).unwrap();
    poll.registry()
        .register(&mut stream, TOKEN_1, Interest::READABLE)
        .unwrap();
    expect_no_events(&mut poll, &mut events);

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

#[test]
fn unix_stream_reregister() {
    let (mut poll, mut events) = init_with_poll();
    let (handle, remote_addr) = new_echo_listener(1, "unix_stream_reregister");
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = UnixStream::connect(path).unwrap();
    poll.registry()
        .register(&mut stream, TOKEN_1, Interest::READABLE)
        .unwrap();
    poll.registry()
        .reregister(&mut stream, TOKEN_1, Interest::WRITABLE)
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

#[test]
fn unix_stream_deregister() {
    let (mut poll, mut events) = init_with_poll();
    let (handle, remote_addr) = new_echo_listener(1, "unix_stream_deregister");
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = UnixStream::connect(path).unwrap();
    poll.registry()
        .register(&mut stream, TOKEN_1, Interest::WRITABLE)
        .unwrap();
    poll.registry().deregister(&mut stream).unwrap();
    expect_no_events(&mut poll, &mut events);

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

fn smoke_test<F>(connect_stream: F, test_name: &'static str)
where
    F: FnOnce(&Path) -> io::Result<UnixStream>,
{
    let (mut poll, mut events) = init_with_poll();
    let (handle, remote_addr) = new_echo_listener(1, test_name);
    let path = remote_addr.as_pathname().expect("failed to get pathname");

    let mut stream = connect_stream(path).unwrap();

    assert_socket_non_blocking(&stream);
    assert_socket_close_on_exec(&stream);

    poll.registry()
        .register(
            &mut stream,
            TOKEN_1,
            Interest::WRITABLE.add(Interest::READABLE),
        )
        .unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::WRITABLE)],
    );

    let mut buf = [0; DEFAULT_BUF_SIZE];
    assert_would_block(stream.read(&mut buf));

    checked_write!(stream.write(&DATA1));
    stream.flush().unwrap();
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)],
    );

    expect_read!(stream.read(&mut buf), DATA1);

    assert!(stream.take_error().unwrap().is_none());

    let bufs = [IoSlice::new(&DATA1), IoSlice::new(&DATA2)];
    let wrote = stream.write_vectored(&bufs).unwrap();
    assert_eq!(wrote, DATA1_LEN + DATA2_LEN);
    expect_events(
        &mut poll,
        &mut events,
        vec![ExpectEvent::new(TOKEN_1, Interest::READABLE)],
    );

    let mut buf1 = [1; DATA1_LEN];
    let mut buf2 = [2; DATA2_LEN + 1];
    let mut bufs = [IoSliceMut::new(&mut buf1), IoSliceMut::new(&mut buf2)];
    let read = stream.read_vectored(&mut bufs).unwrap();
    assert_eq!(read, DATA1_LEN + DATA2_LEN);
    assert_eq!(&buf1, DATA1);
    assert_eq!(&buf2[..DATA2.len()], DATA2);

    // Last byte should be unchanged
    assert_eq!(buf2[DATA2.len()], 2);

    // Close the connection to allow the remote to shutdown
    drop(stream);
    handle.join().unwrap();
}

fn new_echo_listener(
    connections: usize,
    test_name: &'static str,
) -> (thread::JoinHandle<()>, net::SocketAddr) {
    let (addr_sender, addr_receiver) = channel();
    let handle = thread::spawn(move || {
        let path = temp_file(test_name);
        let listener = net::UnixListener::bind(path).unwrap();
        let local_addr = listener.local_addr().unwrap();
        addr_sender.send(local_addr).unwrap();

        for _ in 0..connections {
            let (mut stream, _) = listener.accept().unwrap();

            // On Linux based system it will cause a connection reset
            // error when the reading side of the peer connection is
            // shutdown, we don't consider it an actual here.
            let (mut read, mut written) = (0, 0);
            let mut buf = [0; DEFAULT_BUF_SIZE];
            loop {
                let n = match stream.read(&mut buf) {
                    Ok(amount) => {
                        read += amount;
                        amount
                    }
                    Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
                    Err(ref err) if err.kind() == io::ErrorKind::ConnectionReset => break,
                    Err(err) => panic!("{}", err),
                };
                if n == 0 {
                    break;
                }
                match stream.write(&buf[..n]) {
                    Ok(amount) => written += amount,
                    Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,
                    Err(ref err) if err.kind() == io::ErrorKind::BrokenPipe => break,
                    Err(err) => panic!("{}", err),
                };
            }
            assert_eq!(read, written, "unequal reads and writes");
        }
    });
    (handle, addr_receiver.recv().unwrap())
}

fn new_noop_listener(
    connections: usize,
    barrier: Arc<Barrier>,
    test_name: &'static str,
) -> (thread::JoinHandle<()>, net::SocketAddr) {
    let (sender, receiver) = channel();
    let handle = thread::spawn(move || {
        let path = temp_file(test_name);
        let listener = net::UnixListener::bind(path).unwrap();
        let local_addr = listener.local_addr().unwrap();
        sender.send(local_addr).unwrap();

        for _ in 0..connections {
            let (stream, _) = listener.accept().unwrap();
            barrier.wait();
            stream.shutdown(Shutdown::Write).unwrap();
            barrier.wait();
            drop(stream);
        }
    });
    (handle, receiver.recv().unwrap())
}
