use buf::SendBuf;
use {flush, Body, RecvBody};
use tower::MakeService;
use tower_service::Service;
use futures::future::{Either, Executor, Join, MapErr};
use futures::{Async, Future, Poll, Stream};
use h2;
use h2::server::{Connection as Accept, Handshake, SendResponse};
use http::{Request, Response};
use tokio_io::{AsyncRead, AsyncWrite};
use std::marker::PhantomData;
use std::{error, fmt, mem};
pub struct Server<S, E, B>
where
S: MakeService<(), Request<RecvBody>>,
B: Body,
{
new_service: S,
builder: h2::server::Builder,
executor: E,
_p: PhantomData<B>,
}
pub struct Connection<T, S, E, B, F>
where
T: AsyncRead + AsyncWrite,
S: MakeService<(), Request<RecvBody>>,
B: Body,
{
state: State<T, S, B>,
executor: E,
modify: F,
}
pub trait Modify {
fn modify(&mut self, request: &mut Request<()>);
}
enum State<T, S, B>
where
T: AsyncRead + AsyncWrite,
S: MakeService<(), Request<RecvBody>>,
B: Body,
{
Init(Init<T, SendBuf<B::Data>, S::Future, S::MakeError>),
Ready {
connection: Accept<T, SendBuf<B::Data>>,
service: S::Service,
},
GoAway {
connection: Accept<T, SendBuf<B::Data>>,
error: Error<S>,
},
Done,
}
type Init<T, B, S, E> = Join<MapErr<Handshake<T, B>, MapErrA<E>>, MapErr<S, MapErrB<E>>>;
type MapErrA<E> = fn(h2::Error) -> Either<h2::Error, E>;
type MapErrB<E> = fn(E) -> Either<h2::Error, E>;
pub struct Background<T, B>
where
B: Body,
{
state: BackgroundState<T, B>,
}
enum BackgroundState<T, B>
where
B: Body,
{
Respond {
respond: SendResponse<SendBuf<B::Data>>,
response: T,
},
Flush(flush::Flush<B>),
}
pub enum Error<S>
where
S: MakeService<(), Request<RecvBody>>,
{
Handshake(h2::Error),
Protocol(h2::Error),
NewService(S::MakeError),
Service(S::Error),
Execute,
}
enum PollMain {
Again,
Done,
}
impl<S, E, B> Server<S, E, B>
where
S: MakeService<(), Request<RecvBody>, Response = Response<B>>,
S::Error: Into<Box<dyn std::error::Error>>,
B: Body + 'static,
B::Error: Into<Box<dyn std::error::Error>>,
E: Clone + Executor<Background<<S::Service as Service<Request<RecvBody>>>::Future, B>>,
{
pub fn new(new_service: S, builder: h2::server::Builder, executor: E) -> Self {
Server {
new_service,
executor,
builder,
_p: PhantomData,
}
}
}
impl<S, E, B> Server<S, E, B>
where
S: MakeService<(), Request<RecvBody>, Response = Response<B>>,
B: Body,
B::Data: 'static,
B::Error: Into<Box<dyn std::error::Error>>,
E: Clone,
{
pub fn serve<T>(&mut self, io: T) -> Connection<T, S, E, B, ()>
where
T: AsyncRead + AsyncWrite,
{
self.serve_modified(io, ())
}
pub fn serve_modified<T, F>(&mut self, io: T, modify: F) -> Connection<T, S, E, B, F>
where
T: AsyncRead + AsyncWrite,
F: Modify,
{
let executor = self.executor.clone();
let service = self
.new_service
.make_service(())
.map_err(Either::B as MapErrB<S::MakeError>);
let handshake = self
.builder
.handshake(io)
.map_err(Either::A as MapErrA<S::MakeError>);
Connection {
state: State::Init(handshake.join(service)),
executor,
modify,
}
}
}
impl<S, E, B> Clone for Server<S, E, B>
where
S: MakeService<(), Request<RecvBody>> + Clone,
E: Clone,
B: Body,
{
fn clone(&self) -> Self {
Server {
new_service: self.new_service.clone(),
executor: self.executor.clone(),
builder: self.builder.clone(),
_p: PhantomData,
}
}
}
impl<T, S, E, B, F> Future for Connection<T, S, E, B, F>
where
T: AsyncRead + AsyncWrite,
S: MakeService<(), Request<RecvBody>, Response = Response<B>>,
S::Error: Into<Box<dyn std::error::Error>>,
E: Executor<Background<<S::Service as Service<Request<RecvBody>>>::Future, B>>,
B: Body + 'static,
B::Error: Into<Box<dyn std::error::Error>>,
F: Modify,
{
type Item = ();
type Error = Error<S>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll2().map_err(|e| {
self.state = State::Done;
e
})
}
}
impl<T, S, E, B, F> Connection<T, S, E, B, F>
where
T: AsyncRead + AsyncWrite,
S: MakeService<(), Request<RecvBody>, Response = Response<B>>,
S::Error: Into<Box<dyn std::error::Error>>,
E: Executor<Background<<S::Service as Service<Request<RecvBody>>>::Future, B>>,
B: Body + 'static,
B::Error: Into<Box<dyn std::error::Error>>,
F: Modify,
{
pub fn graceful_shutdown(&mut self) {
match self.state {
State::Init(_) => {
}
State::Ready {
ref mut connection, ..
} => {
connection.graceful_shutdown();
return;
}
State::GoAway { .. } => return,
State::Done => return,
}
self.state = State::Done;
}
fn poll2(&mut self) -> Poll<(), Error<S>> {
loop {
match self.state {
State::Init(..) => try_ready!(self.poll_init()),
State::Ready { .. } => match try_ready!(self.poll_main()) {
PollMain::Again => continue,
PollMain::Done => {
self.state = State::Done;
return Ok(().into());
}
},
State::GoAway { .. } => try_ready!(self.poll_goaway()),
State::Done => return Ok(().into()),
}
}
}
fn poll_init(&mut self) -> Poll<(), Error<S>> {
use self::State::*;
let (connection, service) = match self.state {
Init(ref mut join) => try_ready!(join.poll().map_err(Error::from_init)),
_ => unreachable!(),
};
self.state = Ready {
connection,
service,
};
Ok(().into())
}
fn poll_main(&mut self) -> Poll<PollMain, Error<S>> {
let error = match self.state {
State::Ready {
ref mut connection,
ref mut service,
} => loop {
match service.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
let next = connection.poll_close().map_err(Error::Protocol);
try_ready!(next);
return Ok(PollMain::Done.into());
}
Err(err) => {
trace!("service closed");
break Error::Service(err);
}
}
let next = connection.poll().map_err(Error::Protocol);
let (request, respond) = match try_ready!(next) {
Some(next) => next,
None => return Ok(PollMain::Done.into()),
};
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, ());
self.modify.modify(&mut request);
let (parts, _) = request.into_parts();
let request = Request::from_parts(parts, RecvBody::new(body));
let response = service.call(request);
if let Err(_) = self.executor.execute(Background::new(respond, response)) {
break Error::Execute;
}
},
_ => unreachable!(),
};
match mem::replace(&mut self.state, State::Done) {
State::Ready { mut connection, .. } => {
connection.graceful_shutdown();
self.state = State::GoAway { connection, error };
Ok(Async::Ready(PollMain::Again))
}
_ => unreachable!(),
}
}
fn poll_goaway(&mut self) -> Poll<(), Error<S>> {
match self.state {
State::GoAway {
ref mut connection, ..
} => {
try_ready!(connection.poll_close().map_err(Error::Protocol));
}
_ => unreachable!(),
}
match mem::replace(&mut self.state, State::Done) {
State::GoAway { error, .. } => {
trace!("goaway completed");
Err(error)
}
_ => unreachable!(),
}
}
}
impl<T> Modify for T
where
T: FnMut(&mut Request<()>),
{
fn modify(&mut self, request: &mut Request<()>) {
(*self)(request);
}
}
impl Modify for () {
fn modify(&mut self, _: &mut Request<()>) {}
}
impl<T, B> Background<T, B>
where
T: Future,
B: Body,
{
fn new(respond: SendResponse<SendBuf<B::Data>>, response: T) -> Self {
Background {
state: BackgroundState::Respond { respond, response },
}
}
}
impl<T, B> Future for Background<T, B>
where
T: Future<Item = Response<B>>,
T::Error: Into<Box<dyn std::error::Error>>,
B: Body,
B::Error: Into<Box<dyn std::error::Error>>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
use self::BackgroundState::*;
loop {
let flush = match self.state {
Respond {
ref mut respond,
ref mut response,
} => {
use flush::Flush;
match respond.poll_reset() {
Ok(Async::Ready(reason)) => {
debug!("stream received RST_FRAME: {:?}", reason);
return Ok(().into());
}
Ok(Async::NotReady) => {
}
Err(err) => {
debug!("stream poll_reset received error: {}", err);
return Err(());
}
}
let response = try_ready!(response.poll().map_err(|err| {
let err = err.into();
debug!("user service error: {}", err);
let reason = ::error::reason_from_dyn_error(&*err);
respond.send_reset(reason);
}));
let (parts, body) = response.into_parts();
let eos = body.is_end_stream();
let response = Response::from_parts(parts, ());
match respond.send_response(response, eos) {
Ok(stream) => {
if eos {
return Ok(().into());
}
Flush::new(body, stream)
}
Err(err) => {
warn!("error sending response: {:?}", err);
return Ok(().into());
}
}
}
Flush(ref mut flush) => return flush.poll(),
};
self.state = Flush(flush);
}
}
}
impl<S> Error<S>
where
S: MakeService<(), Request<RecvBody>>,
{
fn from_init(err: Either<h2::Error, S::MakeError>) -> Self {
match err {
Either::A(err) => Error::Handshake(err),
Either::B(err) => Error::NewService(err),
}
}
}
impl<S> fmt::Debug for Error<S>
where
S: MakeService<(), Request<RecvBody>>,
S::MakeError: fmt::Debug,
S::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Handshake(ref why) => f.debug_tuple("Handshake").field(why).finish(),
Error::Protocol(ref why) => f.debug_tuple("Protocol").field(why).finish(),
Error::NewService(ref why) => f.debug_tuple("NewService").field(why).finish(),
Error::Service(ref why) => f.debug_tuple("Service").field(why).finish(),
Error::Execute => f.debug_tuple("Execute").finish(),
}
}
}
impl<S> fmt::Display for Error<S>
where
S: MakeService<(), Request<RecvBody>>,
S::MakeError: fmt::Display,
S::Error: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Handshake(ref why) => {
write!(f, "Error occurred during HTTP/2.0 handshake: {}", why)
}
Error::Protocol(ref why) => write!(f, "Error produced by HTTP/2.0 stream: {}", why),
Error::NewService(ref why) => {
write!(f, "Error occurred while obtaining service: {}", why)
}
Error::Service(ref why) => write!(f, "Error returned by service: {}", why),
Error::Execute => write!(f, "Error occurred while attempting to spawn a task"),
}
}
}
impl<S> error::Error for Error<S>
where
S: MakeService<(), Request<RecvBody>>,
S::MakeError: error::Error,
S::Error: error::Error,
{
fn cause(&self) -> Option<&error::Error> {
match *self {
Error::Handshake(ref why) => Some(why),
Error::Protocol(ref why) => Some(why),
Error::NewService(ref why) => Some(why),
Error::Service(ref why) => Some(why),
Error::Execute => None,
}
}
fn description(&self) -> &str {
match *self {
Error::Handshake(_) => "error occurred during HTTP/2.0 handshake",
Error::Protocol(_) => "error produced by HTTP/2.0 stream",
Error::NewService(_) => "error occured while obtaining service",
Error::Service(_) => "error returned by service",
Error::Execute => "error occurred while attempting to spawn a task",
}
}
}