diff --git a/src/frontend/mosh-server.cc b/src/frontend/mosh-server.cc index 287fe62..3974157 100644 --- a/src/frontend/mosh-server.cc +++ b/src/frontend/mosh-server.cc @@ -251,12 +251,16 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network break; } + uint64_t now = Network::timestamp(); + if ( pollfds[ 0 ].revents & POLLIN ) { /* packet received from the network */ network.recv(); /* is new user input available for the terminal? */ if ( network.get_remote_state_num() != last_remote_num ) { + last_remote_num = network.get_remote_state_num(); + string terminal_to_host; Network::UserStream us; @@ -277,6 +281,9 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network } } + /* register input frame number for future echo ack */ + terminal.register_input_frame( last_remote_num, now ); + /* update client with new state of terminal */ if ( !network.shutdown_in_progress() ) { network.set_current_state( terminal ); @@ -381,7 +388,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network /* update utmp if has been more than 10 seconds since heard from client */ if ( connected_utmp ) { - if ( network.get_latest_remote_state().timestamp < Network::timestamp() - 10000 ) { + if ( network.get_latest_remote_state().timestamp < now - 10000 ) { utempter_remove_added_record(); char tmp[ 64 ]; @@ -392,6 +399,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network } } + terminal.set_echo_ack( now ); network.tick(); } catch ( Network::NetworkException e ) { fprintf( stderr, "%s: %s\n", e.function.c_str(), strerror( e.the_errno ) ); diff --git a/src/frontend/stmclient.cc b/src/frontend/stmclient.cc index 81a81d8..7e386f9 100644 --- a/src/frontend/stmclient.cc +++ b/src/frontend/stmclient.cc @@ -182,7 +182,7 @@ bool STMClient::process_network_input( void ) overlays.get_prediction_engine().set_local_frame_acked( network->get_sent_state_acked() ); overlays.get_prediction_engine().set_send_interval( network->send_interval() ); - overlays.get_prediction_engine().set_local_frame_late_acked( network->get_sent_state_late_acked() ); + overlays.get_prediction_engine().set_local_frame_late_acked( network->get_latest_remote_state().state.get_echo_ack() ); return true; } diff --git a/src/network/networktransport.cc b/src/network/networktransport.cc index 99eac97..2f324d6 100644 --- a/src/network/networktransport.cc +++ b/src/network/networktransport.cc @@ -33,7 +33,6 @@ Transport::Transport( MyState &initial_state, RemoteState sender( &connection, initial_state ), received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), - sent_state_late_acked( 0 ), fragments(), verbose( false ) { @@ -47,7 +46,6 @@ Transport::Transport( MyState &initial_state, RemoteState sender( &connection, initial_state ), received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), - sent_state_late_acked( 0 ), fragments(), verbose( false ) { @@ -68,9 +66,6 @@ void Transport::recv( void ) } sender.process_acknowledgment_through( inst.ack_num() ); - if ( inst.late_ack_num() > sent_state_late_acked ) { - sent_state_late_acked = inst.late_ack_num(); - } /* first, make sure we don't already have the new state */ for ( typename list< TimestampedState >::iterator i = received_states.begin(); diff --git a/src/network/networktransport.h b/src/network/networktransport.h index 877eb45..83fbeb6 100644 --- a/src/network/networktransport.h +++ b/src/network/networktransport.h @@ -47,7 +47,6 @@ namespace Network { /* simple receiver */ list< TimestampedState > received_states; RemoteState last_receiver_state; /* the state we were in when user last queried state */ - uint64_t sent_state_late_acked; FragmentAssembly fragments; bool verbose; @@ -97,7 +96,6 @@ namespace Network { uint64_t get_sent_state_acked( void ) const { return sender.get_sent_state_acked(); } uint64_t get_sent_state_last( void ) const { return sender.get_sent_state_last(); } - uint64_t get_sent_state_late_acked( void ) const { return sent_state_late_acked; } unsigned int send_interval( void ) const { return sender.send_interval(); } diff --git a/src/network/transportfragment.cc b/src/network/transportfragment.cc index e861ceb..55cd41d 100644 --- a/src/network/transportfragment.cc +++ b/src/network/transportfragment.cc @@ -142,7 +142,6 @@ vector Fragmenter::make_fragments( Instruction &inst, int MTU ) || (inst.new_num() != last_instruction.new_num()) || (inst.ack_num() != last_instruction.ack_num()) || (inst.throwaway_num() != last_instruction.throwaway_num()) - || (inst.late_ack_num() != last_instruction.late_ack_num()) || (inst.protocol_version() != last_instruction.protocol_version()) || (last_MTU != MTU) ) { next_instruction_id++; diff --git a/src/network/transportsender.cc b/src/network/transportsender.cc index e023396..c8e5c97 100644 --- a/src/network/transportsender.cc +++ b/src/network/transportsender.cc @@ -43,8 +43,6 @@ TransportSender::TransportSender( Connection *s_connection, MyState &in shutdown_tries( 0 ), ack_num( 0 ), pending_data_ack( false ), - ack_timestamp( 0 ), - ack_history(), SEND_MINDELAY( 15 ), last_heard( 0 ) { @@ -260,14 +258,11 @@ void TransportSender::send_in_fragments( string diff, uint64_t new_num { Instruction inst; - uint64_t now = timestamp(); - inst.set_protocol_version( MOSH_PROTOCOL_VERSION ); inst.set_old_num( assumed_receiver_state->num ); inst.set_new_num( new_num ); inst.set_ack_num( ack_num ); inst.set_throwaway_num( sent_states.front().num ); - inst.set_late_ack_num( get_late_ack( now ) ); inst.set_diff( diff ); if ( new_num == uint64_t(-1) ) { @@ -280,12 +275,11 @@ void TransportSender::send_in_fragments( string diff, uint64_t new_num connection->send( i->tostring() ); if ( verbose ) { - fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, late_ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d, srtt=%.1f age=%llu\n", + fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d, srtt=%.1f\n", (unsigned int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num, - (int)inst.ack_num(), (int)inst.late_ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(), + (int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(), 1000.0 / (double)send_interval(), - (int)connection->timeout(), connection->get_SRTT(), - (long long)(now - ack_timestamp) ); + (int)connection->timeout(), connection->get_SRTT() ); } } @@ -318,23 +312,4 @@ template void TransportSender::set_ack_num( uint64_t s_ack_num ) { ack_num = s_ack_num; - ack_timestamp = timestamp(); - ack_history.push_back( make_pair( ack_num, ack_timestamp ) ); -} - -/* The "late" ack is for the input state that has had enough time on the host to have been echoed */ -template -uint64_t TransportSender::get_late_ack( uint64_t now ) -{ - uint64_t newest_echo_ack = 0; - - for ( BOOST_AUTO( i, ack_history.begin() ); i != ack_history.end(); i++ ) { - if ( i->second < now - ECHO_TIMEOUT ) { - newest_echo_ack = i->first; - } - } - - ack_history.remove_if( (&_1)->*&pair::first < newest_echo_ack ); - - return newest_echo_ack; } diff --git a/src/network/transportsender.h b/src/network/transportsender.h index 127c3a7..e49e036 100644 --- a/src/network/transportsender.h +++ b/src/network/transportsender.h @@ -43,7 +43,6 @@ namespace Network { static const int ACK_INTERVAL = 3000; /* ms between empty acks */ static const int ACK_DELAY = 100; /* ms before delayed ack */ static const int SHUTDOWN_RETRIES = 3; /* number of shutdown packets to send before giving up */ - static const int ECHO_TIMEOUT = 50; /* for late ack */ static const int ACTIVE_RETRY_TIMEOUT = 10000; /* attempt to resend at frame rate */ /* helper methods for tick() */ @@ -82,10 +81,6 @@ namespace Network { /* information about receiver state */ uint64_t ack_num; bool pending_data_ack; - uint64_t ack_timestamp; - - list< pair > ack_history; - uint64_t get_late_ack( uint64_t now ); /* calculate delayed "echo" acknowledgment */ unsigned int SEND_MINDELAY; /* ms to collect all input */ diff --git a/src/protobufs/hostinput.proto b/src/protobufs/hostinput.proto index 1b27274..4258d75 100644 --- a/src/protobufs/hostinput.proto +++ b/src/protobufs/hostinput.proto @@ -19,7 +19,12 @@ message ResizeMessage { optional int32 height = 6; } +message EchoAck { + optional uint64 echo_ack_num = 8; +} + extend Instruction { optional HostBytes hostbytes = 2; optional ResizeMessage resize = 3; + optional EchoAck echoack = 7; } diff --git a/src/protobufs/transportinstruction.proto b/src/protobufs/transportinstruction.proto index 7907dc2..20009fd 100644 --- a/src/protobufs/transportinstruction.proto +++ b/src/protobufs/transportinstruction.proto @@ -9,7 +9,6 @@ message Instruction { optional uint64 new_num = 3; optional uint64 ack_num = 4; optional uint64 throwaway_num = 5; - optional uint64 late_ack_num = 7; optional bytes diff = 6; } diff --git a/src/statesync/completeterminal.cc b/src/statesync/completeterminal.cc index 7d64892..83a8d1e 100644 --- a/src/statesync/completeterminal.cc +++ b/src/statesync/completeterminal.cc @@ -16,6 +16,9 @@ along with this program. If not, see . */ +#include +#include + #include "completeterminal.h" #include "hostinput.pb.h" @@ -24,6 +27,7 @@ using namespace std; using namespace Parser; using namespace Terminal; using namespace HostBuffers; +using namespace boost::lambda; string Complete::act( const string &str ) { @@ -52,10 +56,16 @@ string Complete::act( const Action *act ) } /* interface for Network::Transport */ -string Complete::diff_from( const Complete &existing ) +string Complete::diff_from( const Complete &existing ) const { HostBuffers::HostMessage output; + if ( existing.get_echo_ack() != get_echo_ack() ) { + assert( get_echo_ack() >= existing.get_echo_ack() ); + Instruction *new_echo = output.add_instruction(); + new_echo->MutableExtension( echoack )->set_echo_ack_num( get_echo_ack() ); + } + if ( !(existing.get_fb() == get_fb()) ) { if ( (existing.get_fb().ds.get_width() != terminal.get_fb().ds.get_width()) || (existing.get_fb().ds.get_height() != terminal.get_fb().ds.get_height()) ) { @@ -82,6 +92,10 @@ void Complete::apply_string( string diff ) } else if ( input.instruction( i ).HasExtension( resize ) ) { act( new Resize( input.instruction( i ).GetExtension( resize ).width(), input.instruction( i ).GetExtension( resize ).height() ) ); + } else if ( input.instruction( i ).HasExtension( echoack ) ) { + uint64_t inst_echo_ack_num = input.instruction( i ).GetExtension( echoack ).echo_ack_num(); + assert( inst_echo_ack_num >= echo_ack ); + echo_ack = inst_echo_ack_num; } } } @@ -89,5 +103,25 @@ void Complete::apply_string( string diff ) bool Complete::operator==( Complete const &x ) const { // assert( parser == x.parser ); /* parser state is irrelevant for us */ - return terminal == x.terminal; + return (terminal == x.terminal) && (echo_ack == x.echo_ack); +} + +void Complete::set_echo_ack( uint64_t now ) +{ + uint64_t newest_echo_ack = 0; + + for ( BOOST_AUTO( i, input_history.begin() ); i != input_history.end(); i++ ) { + if ( i->second < now - ECHO_TIMEOUT ) { + newest_echo_ack = i->first; + } + } + + input_history.remove_if( (&_1)->*&pair::first < newest_echo_ack ); + + echo_ack = newest_echo_ack; +} + +void Complete::register_input_frame( uint64_t n, uint64_t now ) +{ + input_history.push_back( make_pair( n, now ) ); } diff --git a/src/statesync/completeterminal.h b/src/statesync/completeterminal.h index 5bc739f..273c5f3 100644 --- a/src/statesync/completeterminal.h +++ b/src/statesync/completeterminal.h @@ -19,6 +19,9 @@ #ifndef COMPLETE_TERMINAL_HPP #define COMPLETE_TERMINAL_HPP +#include +#include + #include "parser.h" #include "terminal.h" @@ -30,8 +33,14 @@ namespace Terminal { Parser::UTF8Parser parser; Terminal::Emulator terminal; + std::list< std::pair > input_history; + uint64_t echo_ack; + + static const int ECHO_TIMEOUT = 50; /* for late ack */ + public: - Complete( size_t width, size_t height ) : parser(), terminal( width, height ) {} + Complete( size_t width, size_t height ) : parser(), terminal( width, height ), + input_history(), echo_ack( 0 ) {} std::string act( const std::string &str ); std::string act( const Parser::Action *act ); @@ -39,9 +48,13 @@ namespace Terminal { const Framebuffer & get_fb( void ) const { return terminal.get_fb(); } bool parser_grounded( void ) const { return parser.is_grounded(); } + uint64_t get_echo_ack( void ) const { return echo_ack; } + void set_echo_ack( uint64_t now ); + void register_input_frame( uint64_t n, uint64_t now ); + /* interface for Network::Transport */ void subtract( const Complete * ) {} - std::string diff_from( const Complete &existing ); + std::string diff_from( const Complete &existing ) const; void apply_string( std::string diff ); bool operator==( const Complete &x ) const; }; diff --git a/src/statesync/user.cc b/src/statesync/user.cc index ed1d705..d32aca3 100644 --- a/src/statesync/user.cc +++ b/src/statesync/user.cc @@ -37,9 +37,9 @@ void UserStream::subtract( const UserStream *prefix ) } } -string UserStream::diff_from( const UserStream &existing ) +string UserStream::diff_from( const UserStream &existing ) const { - deque::iterator my_it = actions.begin(); + deque::const_iterator my_it = actions.begin(); for ( deque::const_iterator i = existing.actions.begin(); i != existing.actions.end(); diff --git a/src/statesync/user.h b/src/statesync/user.h index 0f0c0fa..ddfcd65 100644 --- a/src/statesync/user.h +++ b/src/statesync/user.h @@ -73,7 +73,7 @@ namespace Network { /* interface for Network::Transport */ void subtract( const UserStream *prefix ); - string diff_from( const UserStream &existing ); + string diff_from( const UserStream &existing ) const; void apply_string( string diff ); bool operator==( const UserStream &x ) const { return actions == x.actions; } };