Get the timing and delayed ACKs right(er)

This commit is contained in:
Keith Winstein
2011-08-19 04:46:12 -04:00
parent fdfd7b010b
commit 772d476022
9 changed files with 153 additions and 58 deletions
+2
View File
@@ -1,5 +1,7 @@
Simplify send() routine to call diff() when state has changed Simplify send() routine to call diff() when state has changed
Delayed ACK
Figure out packet double-sending issue [if delay > 1/50 sec?] Figure out packet double-sending issue [if delay > 1/50 sec?]
Graceful exit / server shutdown Graceful exit / server shutdown
+4
View File
@@ -33,7 +33,11 @@ string Complete::act( const Action *act )
/* interface for Network::Transport */ /* interface for Network::Transport */
string Complete::diff_from( const Complete &existing ) string Complete::diff_from( const Complete &existing )
{ {
if ( existing.get_fb() == get_fb() ) {
return "";
} else {
return Terminal::Display::new_frame( true, existing.get_fb(), terminal.get_fb() ); return Terminal::Display::new_frame( true, existing.get_fb(), terminal.get_fb() );
}
} }
void Complete::apply_string( string diff ) void Complete::apply_string( string diff )
+8
View File
@@ -290,3 +290,11 @@ uint64_t Network::timestamp( void )
return millis; return millis;
} }
uint64_t Connection::timeout( void )
{
uint64_t RTO = lrint( ceil( SRTT + 4 * RTTVAR ) );
if ( RTO < MIN_RTO ) {
RTO = MIN_RTO;
}
return RTO;
}
+3 -1
View File
@@ -55,6 +55,7 @@ namespace Network {
class Connection { class Connection {
private: private:
static const int RECEIVE_MTU = 2048; static const int RECEIVE_MTU = 2048;
static const uint64_t MIN_RTO = 50; /* ms */
int sock; int sock;
struct sockaddr_in remote_addr; struct sockaddr_in remote_addr;
@@ -95,7 +96,8 @@ namespace Network {
string get_key( void ) { return key.printable_key(); } string get_key( void ) { return key.printable_key(); }
bool get_attached( void ) { return attached; } bool get_attached( void ) { return attached; }
int timeout( void ) { return (int)lrint( ceil( SRTT + 4 * RTTVAR ) ); } uint64_t timeout( void );
double get_SRTT( void ) { return SRTT; }
bool pending_timestamp( void ) { return ( saved_timestamp != uint64_t(-1) ); } bool pending_timestamp( void ) { return ( saved_timestamp != uint64_t(-1) ); }
}; };
} }
+105 -41
View File
@@ -16,7 +16,9 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ), received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ),
last_receiver_state( initial_remote ), last_receiver_state( initial_remote ),
fragments(), fragments(),
verbose( false ) verbose( false ),
next_ack_time( timestamp() ),
next_send_time( timestamp() )
{ {
/* server */ /* server */
} }
@@ -32,7 +34,9 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ), received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ),
last_receiver_state( initial_remote ), last_receiver_state( initial_remote ),
fragments(), fragments(),
verbose( false ) verbose( false ),
next_ack_time( timestamp() ),
next_send_time( timestamp() )
{ {
/* client */ /* client */
} }
@@ -40,7 +44,7 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState
template <class MyState, class RemoteState> template <class MyState, class RemoteState>
unsigned int Transport<MyState, RemoteState>::send_interval( void ) unsigned int Transport<MyState, RemoteState>::send_interval( void )
{ {
unsigned int SEND_INTERVAL = connection.timeout() / 10; int SEND_INTERVAL = lrint( ceil( (connection.get_SRTT() - ACK_DELAY) / 2.0 ) );
if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) { if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) {
SEND_INTERVAL = SEND_INTERVAL_MIN; SEND_INTERVAL = SEND_INTERVAL_MIN;
} else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) { } else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) {
@@ -50,56 +54,105 @@ unsigned int Transport<MyState, RemoteState>::send_interval( void )
return SEND_INTERVAL; return SEND_INTERVAL;
} }
/* Returns the number of ms to wait until next (possible) event */
template <class MyState, class RemoteState> template <class MyState, class RemoteState>
int Transport<MyState, RemoteState>::tick( void ) int Transport<MyState, RemoteState>::wait_time( void )
{ {
if ( connection.pending_timestamp() && ( next_ack_time > timestamp() + ACK_DELAY ) ) {
next_ack_time = timestamp() + ACK_DELAY;
}
uint64_t next_wakeup = next_ack_time;
if ( !(current_state == sent_states.back().state) ) { /* pending data to send */
if ( next_send_time > timestamp() + SEND_MINDELAY ) {
next_send_time = timestamp() + SEND_MINDELAY;
}
if ( next_send_time < sent_states.back().timestamp + send_interval() ) {
next_send_time = sent_states.back().timestamp + send_interval();
}
if ( next_send_time < next_wakeup ) {
next_wakeup = next_send_time;
}
}
if ( !connection.get_attached() ) {
return -1;
}
if ( next_wakeup > timestamp() ) {
return next_wakeup - timestamp();
} else {
return 0;
}
}
/* Send data or an ack if necessary */
template <class MyState, class RemoteState>
void Transport<MyState, RemoteState>::tick( void )
{
wait_time();
if ( !connection.get_attached() ) {
return;
}
if ( (timestamp() < next_ack_time)
&& (timestamp() < next_send_time) ) {
return;
}
/* Determine if a new diff or empty ack needs to be sent */ /* Determine if a new diff or empty ack needs to be sent */
if ( timestamp() - sent_states.back().timestamp >= send_interval() ) {
/* Update assumed receiver state */ /* Update assumed receiver state */
update_assumed_receiver_state(); update_assumed_receiver_state();
/* Cut out common prefix of all states */ /* Cut out common prefix of all states */
rationalize_states(); rationalize_states();
string diff = current_state.diff_from( assumed_receiver_state->state );
if ( diff.empty() && (timestamp() >= next_ack_time) ) {
/*
if ( verbose )
fprintf( stderr, "Sending empty ack (ts=%d, next_send=%d, next_ack=%d)\n",
(int)timestamp() % 100000,
(int)next_send_time % 100000,
(int)next_ack_time % 100000 );
*/
send_empty_ack();
return;
}
if ( !diff.empty() && ( (timestamp() >= next_send_time)
|| (timestamp() >= next_ack_time) ) ) {
/* Send diffs or ack */ /* Send diffs or ack */
send_to_receiver(); /*
if ( verbose )
return send_interval(); fprintf( stderr, "Sending packet (ts=%d, next_send=%d, next_ack=%d)\n",
(int)timestamp() % 100000,
(int)next_send_time % 100000,
(int)next_ack_time % 100000 );
*/
send_to_receiver( diff );
} }
int64_t wait = sent_states.back().timestamp + send_interval() - timestamp();
if ( wait < 0 ) {
wait = 0;
}
return wait;
} }
template <class MyState, class RemoteState> template <class MyState, class RemoteState>
void Transport<MyState, RemoteState>::send_to_receiver( void ) void Transport<MyState, RemoteState>::send_empty_ack( void )
{ {
if ( !connection.get_attached() ) { assert ( timestamp() >= next_ack_time );
return;
}
string diff = current_state.diff_from( assumed_receiver_state->state );
if ( diff.empty() ) {
/* send empty ack */
if ( (!connection.pending_timestamp())
&& (timestamp() - sent_states.back().timestamp < int64_t( ACK_INTERVAL ))
&& (sent_states.back().num > 0) ) {
return;
}
uint64_t new_num = sent_states.back().num + 1; uint64_t new_num = sent_states.back().num + 1;
send_in_fragments( diff, new_num, false ); send_in_fragments( "", new_num, false );
sent_states.push_back( TimestampedState<MyState>( timestamp(), new_num, current_state ) ); sent_states.push_back( TimestampedState<MyState>( sent_states.back().timestamp, new_num, current_state ) );
next_ack_time = timestamp() + ACK_INTERVAL;
return; }
}
template <class MyState, class RemoteState>
void Transport<MyState, RemoteState>::send_to_receiver( string diff )
{
uint64_t new_num; uint64_t new_num;
if ( current_state == sent_states.back().state ) { /* previously sent */ if ( current_state == sent_states.back().state ) { /* previously sent */
new_num = sent_states.back().num; new_num = sent_states.back().num;
@@ -138,6 +191,8 @@ void Transport<MyState, RemoteState>::send_to_receiver( void )
/* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */ /* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */
assumed_receiver_state = sent_states.end(); assumed_receiver_state = sent_states.end();
assumed_receiver_state--; assumed_receiver_state--;
next_ack_time = timestamp() + ACK_INTERVAL;
next_send_time = -1;
} }
template <class MyState, class RemoteState> template <class MyState, class RemoteState>
@@ -149,16 +204,19 @@ void Transport<MyState, RemoteState>::update_assumed_receiver_state( void )
transmitted recently enough ago */ transmitted recently enough ago */
assumed_receiver_state = sent_states.begin(); assumed_receiver_state = sent_states.begin();
for ( typename list< TimestampedState<MyState> >::iterator i = sent_states.begin(); typename list< TimestampedState<MyState> >::iterator i = sent_states.begin();
i != sent_states.end(); i++;
i++ ) {
while ( i != sent_states.end() ) {
assert( now >= i->timestamp ); assert( now >= i->timestamp );
if ( int(now - i->timestamp) < connection.timeout() ) { if ( int(now - i->timestamp) < connection.timeout() + ACK_DELAY ) {
assumed_receiver_state = i; assumed_receiver_state = i;
} else { } else {
return; return;
} }
i++;
} }
} }
@@ -229,6 +287,11 @@ void Transport<MyState, RemoteState>::recv( void )
return; return;
} }
} }
/*
if ( verbose )
fprintf( stderr, "[%d] Received state %d [ack %d]\n",
(int)timestamp() % 100000, (int)new_state.num, (int)inst.ack_num );
*/
received_states.push_back( new_state ); received_states.push_back( new_state );
} }
} }
@@ -318,10 +381,11 @@ void Transport<MyState, RemoteState>::send_in_fragments( string diff, uint64_t n
connection.send( s, send_timestamp ); connection.send( s, send_timestamp );
if ( verbose ) { if ( verbose ) {
fprintf( stderr, "Sent [%d=>%d] frag %d, ack=%d, throwaway=%d, len=%d, frame rate=%.2f\n", fprintf( stderr, "[%d] Sent [%d=>%d] frag %d, ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n",
(int)inst.old_num, (int)inst.new_num, (int)inst.fragment_num, (int)(timestamp() % 100000), (int)inst.old_num, (int)inst.new_num, (int)inst.fragment_num,
(int)inst.ack_num, (int)inst.throwaway_num, (int)inst.diff.size(), (int)inst.ack_num, (int)inst.throwaway_num, (int)inst.diff.size(),
1000.0 / (double)send_interval() ); 1000.0 / (double)send_interval(),
(int)connection.timeout() );
} }
} while ( !diff.empty() ); } while ( !diff.empty() );
} }
+13 -6
View File
@@ -76,16 +76,19 @@ namespace Network {
class Transport class Transport
{ {
private: private:
static const unsigned int SEND_INTERVAL_MIN = 20; /* ms between frames */ static const int SEND_INTERVAL_MIN = 20; /* ms between frames */
static const unsigned int SEND_INTERVAL_MAX = 250; /* ms between frames */ static const int SEND_INTERVAL_MAX = 250; /* ms between frames */
static const int ACK_INTERVAL = 1000; /* ms between empty acks */ static const int ACK_INTERVAL = 1000; /* ms between empty acks */
static const int ACK_DELAY = 10; /* ms before delayed ack */
static const int SEND_MINDELAY = 20; /* ms to collect all input */
static const int HEADER_LEN = 120; static const int HEADER_LEN = 120;
/* helper methods for tick() */ /* helper methods for tick() */
unsigned int send_interval( void ); unsigned int send_interval( void );
void update_assumed_receiver_state( void ); void update_assumed_receiver_state( void );
void rationalize_states( void ); void rationalize_states( void );
void send_to_receiver( void ); void send_to_receiver( string diff );
void send_empty_ack( void );
void send_in_fragments( string diff, uint64_t new_num, bool send_timestamp = true ); void send_in_fragments( string diff, uint64_t new_num, bool send_timestamp = true );
/* helper methods for recv() */ /* helper methods for recv() */
@@ -112,15 +115,19 @@ namespace Network {
FragmentAssembly fragments; FragmentAssembly fragments;
bool verbose; bool verbose;
uint64_t next_ack_time;
uint64_t next_send_time;
public: public:
Transport( MyState &initial_state, RemoteState &initial_remote ); Transport( MyState &initial_state, RemoteState &initial_remote );
Transport( MyState &initial_state, RemoteState &initial_remote, Transport( MyState &initial_state, RemoteState &initial_remote,
const char *key_str, const char *ip, int port ); const char *key_str, const char *ip, int port );
/* Send data or an ack if necessary. /* Send data or an ack if necessary. */
Returns the number of ms to wait until next event. */ void tick( void );
int tick( void );
/* Returns the number of ms to wait until next event. */
int wait_time( void );
/* Blocks waiting for a packet. */ /* Blocks waiting for a packet. */
void recv( void ); void recv( void );
+6 -2
View File
@@ -45,11 +45,13 @@ int main( int argc, char *argv[] )
uint64_t last_num = n->get_remote_state_num(); uint64_t last_num = n->get_remote_state_num();
while ( true ) { while ( true ) {
try { try {
if ( poll( &my_pollfd, 1, n->tick() ) < 0 ) { if ( poll( &my_pollfd, 1, n->wait_time() ) < 0 ) {
perror( "poll" ); perror( "poll" );
exit( 1 ); exit( 1 );
} }
n->tick();
if ( my_pollfd.revents & POLLIN ) { if ( my_pollfd.revents & POLLIN ) {
n->recv(); n->recv();
@@ -89,10 +91,12 @@ int main( int argc, char *argv[] )
while( true ) { while( true ) {
try { try {
if ( poll( fds, 2, n->tick() ) < 0 ) { if ( poll( fds, 2, n->wait_time() ) < 0 ) {
perror( "poll" ); perror( "poll" );
} }
n->tick();
if ( fds[ 0 ].revents & POLLIN ) { if ( fds[ 0 ].revents & POLLIN ) {
char x; char x;
assert( read( STDIN_FILENO, &x, 1 ) == 1 ); assert( read( STDIN_FILENO, &x, 1 ) == 1 );
+3 -1
View File
@@ -136,7 +136,7 @@ void serve( int host_fd )
while ( 1 ) { while ( 1 ) {
try { try {
int active_fds = poll( pollfds, 2, network.tick() ); int active_fds = poll( pollfds, 2, network.wait_time() );
if ( active_fds < 0 ) { if ( active_fds < 0 ) {
perror( "poll" ); perror( "poll" );
break; break;
@@ -206,6 +206,8 @@ void serve( int host_fd )
& (POLLERR | POLLHUP | POLLNVAL) ) { & (POLLERR | POLLHUP | POLLNVAL) ) {
break; break;
} }
network.tick();
} catch ( Network::NetworkException e ) { } catch ( Network::NetworkException e ) {
fprintf( stderr, "%s: %s\r\n", e.function.c_str(), strerror( e.the_errno ) ); fprintf( stderr, "%s: %s\r\n", e.function.c_str(), strerror( e.the_errno ) );
sleep( 1 ); sleep( 1 );
+3 -1
View File
@@ -143,7 +143,7 @@ void client( const char *ip, int port, const char *key )
while ( 1 ) { while ( 1 ) {
try { try {
int active_fds = poll( pollfds, 3, network.tick() ); int active_fds = poll( pollfds, 3, network.wait_time() );
if ( active_fds < 0 ) { if ( active_fds < 0 ) {
perror( "poll" ); perror( "poll" );
break; break;
@@ -208,6 +208,8 @@ void client( const char *ip, int port, const char *key )
& (POLLERR | POLLHUP | POLLNVAL) ) { & (POLLERR | POLLHUP | POLLNVAL) ) {
break; break;
} }
network.tick();
} catch ( Network::NetworkException e ) { } catch ( Network::NetworkException e ) {
fprintf( stderr, "%s: %s\r\n", e.function.c_str(), strerror( e.the_errno ) ); fprintf( stderr, "%s: %s\r\n", e.function.c_str(), strerror( e.the_errno ) );
sleep( 1 ); sleep( 1 );