Explicit echo ack protobuf with reliable semantics

This commit is contained in:
Keith Winstein
2012-02-25 14:34:39 -05:00
parent e9ce05bc7b
commit 3a92cd1393
13 changed files with 72 additions and 51 deletions
+9 -1
View File
@@ -251,12 +251,16 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
break;
}
uint64_t now = Network::timestamp();
if ( pollfds[ 0 ].revents & POLLIN ) {
/* packet received from the network */
network.recv();
/* is new user input available for the terminal? */
if ( network.get_remote_state_num() != last_remote_num ) {
last_remote_num = network.get_remote_state_num();
string terminal_to_host;
Network::UserStream us;
@@ -277,6 +281,9 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
}
}
/* register input frame number for future echo ack */
terminal.register_input_frame( last_remote_num, now );
/* update client with new state of terminal */
if ( !network.shutdown_in_progress() ) {
network.set_current_state( terminal );
@@ -381,7 +388,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
/* update utmp if has been more than 10 seconds since heard from client */
if ( connected_utmp ) {
if ( network.get_latest_remote_state().timestamp < Network::timestamp() - 10000 ) {
if ( network.get_latest_remote_state().timestamp < now - 10000 ) {
utempter_remove_added_record();
char tmp[ 64 ];
@@ -392,6 +399,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
}
}
terminal.set_echo_ack( now );
network.tick();
} catch ( Network::NetworkException e ) {
fprintf( stderr, "%s: %s\n", e.function.c_str(), strerror( e.the_errno ) );
+1 -1
View File
@@ -182,7 +182,7 @@ bool STMClient::process_network_input( void )
overlays.get_prediction_engine().set_local_frame_acked( network->get_sent_state_acked() );
overlays.get_prediction_engine().set_send_interval( network->send_interval() );
overlays.get_prediction_engine().set_local_frame_late_acked( network->get_sent_state_late_acked() );
overlays.get_prediction_engine().set_local_frame_late_acked( network->get_latest_remote_state().state.get_echo_ack() );
return true;
}
-5
View File
@@ -33,7 +33,6 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState
sender( &connection, initial_state ),
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ),
last_receiver_state( initial_remote ),
sent_state_late_acked( 0 ),
fragments(),
verbose( false )
{
@@ -47,7 +46,6 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState
sender( &connection, initial_state ),
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ),
last_receiver_state( initial_remote ),
sent_state_late_acked( 0 ),
fragments(),
verbose( false )
{
@@ -68,9 +66,6 @@ void Transport<MyState, RemoteState>::recv( void )
}
sender.process_acknowledgment_through( inst.ack_num() );
if ( inst.late_ack_num() > sent_state_late_acked ) {
sent_state_late_acked = inst.late_ack_num();
}
/* first, make sure we don't already have the new state */
for ( typename list< TimestampedState<RemoteState> >::iterator i = received_states.begin();
-2
View File
@@ -47,7 +47,6 @@ namespace Network {
/* simple receiver */
list< TimestampedState<RemoteState> > received_states;
RemoteState last_receiver_state; /* the state we were in when user last queried state */
uint64_t sent_state_late_acked;
FragmentAssembly fragments;
bool verbose;
@@ -97,7 +96,6 @@ namespace Network {
uint64_t get_sent_state_acked( void ) const { return sender.get_sent_state_acked(); }
uint64_t get_sent_state_last( void ) const { return sender.get_sent_state_last(); }
uint64_t get_sent_state_late_acked( void ) const { return sent_state_late_acked; }
unsigned int send_interval( void ) const { return sender.send_interval(); }
-1
View File
@@ -142,7 +142,6 @@ vector<Fragment> Fragmenter::make_fragments( Instruction &inst, int MTU )
|| (inst.new_num() != last_instruction.new_num())
|| (inst.ack_num() != last_instruction.ack_num())
|| (inst.throwaway_num() != last_instruction.throwaway_num())
|| (inst.late_ack_num() != last_instruction.late_ack_num())
|| (inst.protocol_version() != last_instruction.protocol_version())
|| (last_MTU != MTU) ) {
next_instruction_id++;
+3 -28
View File
@@ -43,8 +43,6 @@ TransportSender<MyState>::TransportSender( Connection *s_connection, MyState &in
shutdown_tries( 0 ),
ack_num( 0 ),
pending_data_ack( false ),
ack_timestamp( 0 ),
ack_history(),
SEND_MINDELAY( 15 ),
last_heard( 0 )
{
@@ -260,14 +258,11 @@ void TransportSender<MyState>::send_in_fragments( string diff, uint64_t new_num
{
Instruction inst;
uint64_t now = timestamp();
inst.set_protocol_version( MOSH_PROTOCOL_VERSION );
inst.set_old_num( assumed_receiver_state->num );
inst.set_new_num( new_num );
inst.set_ack_num( ack_num );
inst.set_throwaway_num( sent_states.front().num );
inst.set_late_ack_num( get_late_ack( now ) );
inst.set_diff( diff );
if ( new_num == uint64_t(-1) ) {
@@ -280,12 +275,11 @@ void TransportSender<MyState>::send_in_fragments( string diff, uint64_t new_num
connection->send( i->tostring() );
if ( verbose ) {
fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, late_ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d, srtt=%.1f age=%llu\n",
fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d, srtt=%.1f\n",
(unsigned int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num,
(int)inst.ack_num(), (int)inst.late_ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(),
(int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(),
1000.0 / (double)send_interval(),
(int)connection->timeout(), connection->get_SRTT(),
(long long)(now - ack_timestamp) );
(int)connection->timeout(), connection->get_SRTT() );
}
}
@@ -318,23 +312,4 @@ template <class MyState>
void TransportSender<MyState>::set_ack_num( uint64_t s_ack_num )
{
ack_num = s_ack_num;
ack_timestamp = timestamp();
ack_history.push_back( make_pair( ack_num, ack_timestamp ) );
}
/* The "late" ack is for the input state that has had enough time on the host to have been echoed */
template <class MyState>
uint64_t TransportSender<MyState>::get_late_ack( uint64_t now )
{
uint64_t newest_echo_ack = 0;
for ( BOOST_AUTO( i, ack_history.begin() ); i != ack_history.end(); i++ ) {
if ( i->second < now - ECHO_TIMEOUT ) {
newest_echo_ack = i->first;
}
}
ack_history.remove_if( (&_1)->*&pair<uint64_t, uint64_t>::first < newest_echo_ack );
return newest_echo_ack;
}
-5
View File
@@ -43,7 +43,6 @@ namespace Network {
static const int ACK_INTERVAL = 3000; /* ms between empty acks */
static const int ACK_DELAY = 100; /* ms before delayed ack */
static const int SHUTDOWN_RETRIES = 3; /* number of shutdown packets to send before giving up */
static const int ECHO_TIMEOUT = 50; /* for late ack */
static const int ACTIVE_RETRY_TIMEOUT = 10000; /* attempt to resend at frame rate */
/* helper methods for tick() */
@@ -82,10 +81,6 @@ namespace Network {
/* information about receiver state */
uint64_t ack_num;
bool pending_data_ack;
uint64_t ack_timestamp;
list< pair<uint64_t, uint64_t> > ack_history;
uint64_t get_late_ack( uint64_t now ); /* calculate delayed "echo" acknowledgment */
unsigned int SEND_MINDELAY; /* ms to collect all input */
+5
View File
@@ -19,7 +19,12 @@ message ResizeMessage {
optional int32 height = 6;
}
message EchoAck {
optional uint64 echo_ack_num = 8;
}
extend Instruction {
optional HostBytes hostbytes = 2;
optional ResizeMessage resize = 3;
optional EchoAck echoack = 7;
}
-1
View File
@@ -9,7 +9,6 @@ message Instruction {
optional uint64 new_num = 3;
optional uint64 ack_num = 4;
optional uint64 throwaway_num = 5;
optional uint64 late_ack_num = 7;
optional bytes diff = 6;
}
+36 -2
View File
@@ -16,6 +16,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/typeof/typeof.hpp>
#include <boost/lambda/lambda.hpp>
#include "completeterminal.h"
#include "hostinput.pb.h"
@@ -24,6 +27,7 @@ using namespace std;
using namespace Parser;
using namespace Terminal;
using namespace HostBuffers;
using namespace boost::lambda;
string Complete::act( const string &str )
{
@@ -52,10 +56,16 @@ string Complete::act( const Action *act )
}
/* interface for Network::Transport */
string Complete::diff_from( const Complete &existing )
string Complete::diff_from( const Complete &existing ) const
{
HostBuffers::HostMessage output;
if ( existing.get_echo_ack() != get_echo_ack() ) {
assert( get_echo_ack() >= existing.get_echo_ack() );
Instruction *new_echo = output.add_instruction();
new_echo->MutableExtension( echoack )->set_echo_ack_num( get_echo_ack() );
}
if ( !(existing.get_fb() == get_fb()) ) {
if ( (existing.get_fb().ds.get_width() != terminal.get_fb().ds.get_width())
|| (existing.get_fb().ds.get_height() != terminal.get_fb().ds.get_height()) ) {
@@ -82,6 +92,10 @@ void Complete::apply_string( string diff )
} else if ( input.instruction( i ).HasExtension( resize ) ) {
act( new Resize( input.instruction( i ).GetExtension( resize ).width(),
input.instruction( i ).GetExtension( resize ).height() ) );
} else if ( input.instruction( i ).HasExtension( echoack ) ) {
uint64_t inst_echo_ack_num = input.instruction( i ).GetExtension( echoack ).echo_ack_num();
assert( inst_echo_ack_num >= echo_ack );
echo_ack = inst_echo_ack_num;
}
}
}
@@ -89,5 +103,25 @@ void Complete::apply_string( string diff )
bool Complete::operator==( Complete const &x ) const
{
// assert( parser == x.parser ); /* parser state is irrelevant for us */
return terminal == x.terminal;
return (terminal == x.terminal) && (echo_ack == x.echo_ack);
}
void Complete::set_echo_ack( uint64_t now )
{
uint64_t newest_echo_ack = 0;
for ( BOOST_AUTO( i, input_history.begin() ); i != input_history.end(); i++ ) {
if ( i->second < now - ECHO_TIMEOUT ) {
newest_echo_ack = i->first;
}
}
input_history.remove_if( (&_1)->*&pair<uint64_t, uint64_t>::first < newest_echo_ack );
echo_ack = newest_echo_ack;
}
void Complete::register_input_frame( uint64_t n, uint64_t now )
{
input_history.push_back( make_pair( n, now ) );
}
+15 -2
View File
@@ -19,6 +19,9 @@
#ifndef COMPLETE_TERMINAL_HPP
#define COMPLETE_TERMINAL_HPP
#include <list>
#include <stdint.h>
#include "parser.h"
#include "terminal.h"
@@ -30,8 +33,14 @@ namespace Terminal {
Parser::UTF8Parser parser;
Terminal::Emulator terminal;
std::list< std::pair<uint64_t, uint64_t> > input_history;
uint64_t echo_ack;
static const int ECHO_TIMEOUT = 50; /* for late ack */
public:
Complete( size_t width, size_t height ) : parser(), terminal( width, height ) {}
Complete( size_t width, size_t height ) : parser(), terminal( width, height ),
input_history(), echo_ack( 0 ) {}
std::string act( const std::string &str );
std::string act( const Parser::Action *act );
@@ -39,9 +48,13 @@ namespace Terminal {
const Framebuffer & get_fb( void ) const { return terminal.get_fb(); }
bool parser_grounded( void ) const { return parser.is_grounded(); }
uint64_t get_echo_ack( void ) const { return echo_ack; }
void set_echo_ack( uint64_t now );
void register_input_frame( uint64_t n, uint64_t now );
/* interface for Network::Transport */
void subtract( const Complete * ) {}
std::string diff_from( const Complete &existing );
std::string diff_from( const Complete &existing ) const;
void apply_string( std::string diff );
bool operator==( const Complete &x ) const;
};
+2 -2
View File
@@ -37,9 +37,9 @@ void UserStream::subtract( const UserStream *prefix )
}
}
string UserStream::diff_from( const UserStream &existing )
string UserStream::diff_from( const UserStream &existing ) const
{
deque<UserEvent>::iterator my_it = actions.begin();
deque<UserEvent>::const_iterator my_it = actions.begin();
for ( deque<UserEvent>::const_iterator i = existing.actions.begin();
i != existing.actions.end();
+1 -1
View File
@@ -73,7 +73,7 @@ namespace Network {
/* interface for Network::Transport */
void subtract( const UserStream *prefix );
string diff_from( const UserStream &existing );
string diff_from( const UserStream &existing ) const;
void apply_string( string diff );
bool operator==( const UserStream &x ) const { return actions == x.actions; }
};