From 772d476022eab0d525731e30b0e621d8a2eceb64 Mon Sep 17 00:00:00 2001 From: Keith Winstein Date: Fri, 19 Aug 2011 04:46:12 -0400 Subject: [PATCH] Get the timing and delayed ACKs right(er) --- TODO | 2 + completeterminal.cpp | 6 +- network.cpp | 8 +++ network.hpp | 4 +- networktransport.cpp | 156 ++++++++++++++++++++++++++++++------------- networktransport.hpp | 19 ++++-- ntester.cpp | 8 ++- stm-server.cpp | 4 +- stm.cpp | 4 +- 9 files changed, 153 insertions(+), 58 deletions(-) diff --git a/TODO b/TODO index 9423c78..0a1dc7a 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,7 @@ Simplify send() routine to call diff() when state has changed +Delayed ACK + Figure out packet double-sending issue [if delay > 1/50 sec?] Graceful exit / server shutdown diff --git a/completeterminal.cpp b/completeterminal.cpp index aeac281..52e0c61 100644 --- a/completeterminal.cpp +++ b/completeterminal.cpp @@ -33,7 +33,11 @@ string Complete::act( const Action *act ) /* interface for Network::Transport */ string Complete::diff_from( const Complete &existing ) { - return Terminal::Display::new_frame( true, existing.get_fb(), terminal.get_fb() ); + if ( existing.get_fb() == get_fb() ) { + return ""; + } else { + return Terminal::Display::new_frame( true, existing.get_fb(), terminal.get_fb() ); + } } void Complete::apply_string( string diff ) diff --git a/network.cpp b/network.cpp index cd6f01a..28c4467 100644 --- a/network.cpp +++ b/network.cpp @@ -290,3 +290,11 @@ uint64_t Network::timestamp( void ) return millis; } +uint64_t Connection::timeout( void ) +{ + uint64_t RTO = lrint( ceil( SRTT + 4 * RTTVAR ) ); + if ( RTO < MIN_RTO ) { + RTO = MIN_RTO; + } + return RTO; +} diff --git a/network.hpp b/network.hpp index 0ab7505..3adfaf0 100644 --- a/network.hpp +++ b/network.hpp @@ -55,6 +55,7 @@ namespace Network { class Connection { private: static const int RECEIVE_MTU = 2048; + static const uint64_t MIN_RTO = 50; /* ms */ int sock; struct sockaddr_in remote_addr; @@ -95,7 +96,8 @@ namespace Network { string get_key( void ) { return key.printable_key(); } bool get_attached( void ) { return attached; } - int timeout( void ) { return (int)lrint( ceil( SRTT + 4 * RTTVAR ) ); } + uint64_t timeout( void ); + double get_SRTT( void ) { return SRTT; } bool pending_timestamp( void ) { return ( saved_timestamp != uint64_t(-1) ); } }; } diff --git a/networktransport.cpp b/networktransport.cpp index 7a35c4f..a24945b 100644 --- a/networktransport.cpp +++ b/networktransport.cpp @@ -16,7 +16,9 @@ Transport::Transport( MyState &initial_state, RemoteState received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), fragments(), - verbose( false ) + verbose( false ), + next_ack_time( timestamp() ), + next_send_time( timestamp() ) { /* server */ } @@ -32,7 +34,9 @@ Transport::Transport( MyState &initial_state, RemoteState received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), fragments(), - verbose( false ) + verbose( false ), + next_ack_time( timestamp() ), + next_send_time( timestamp() ) { /* client */ } @@ -40,7 +44,7 @@ Transport::Transport( MyState &initial_state, RemoteState template unsigned int Transport::send_interval( void ) { - unsigned int SEND_INTERVAL = connection.timeout() / 10; + int SEND_INTERVAL = lrint( ceil( (connection.get_SRTT() - ACK_DELAY) / 2.0 ) ); if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) { SEND_INTERVAL = SEND_INTERVAL_MIN; } else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) { @@ -50,56 +54,105 @@ unsigned int Transport::send_interval( void ) return SEND_INTERVAL; } -/* Returns the number of ms to wait until next (possible) event */ template -int Transport::tick( void ) +int Transport::wait_time( void ) { - /* Determine if a new diff or empty ack needs to be sent */ - if ( timestamp() - sent_states.back().timestamp >= send_interval() ) { - /* Update assumed receiver state */ - update_assumed_receiver_state(); - - /* Cut out common prefix of all states */ - rationalize_states(); - - /* Send diffs or ack */ - send_to_receiver(); - - return send_interval(); - } - - int64_t wait = sent_states.back().timestamp + send_interval() - timestamp(); - if ( wait < 0 ) { - wait = 0; + if ( connection.pending_timestamp() && ( next_ack_time > timestamp() + ACK_DELAY ) ) { + next_ack_time = timestamp() + ACK_DELAY; + } + + uint64_t next_wakeup = next_ack_time; + + if ( !(current_state == sent_states.back().state) ) { /* pending data to send */ + if ( next_send_time > timestamp() + SEND_MINDELAY ) { + next_send_time = timestamp() + SEND_MINDELAY; + } + + if ( next_send_time < sent_states.back().timestamp + send_interval() ) { + next_send_time = sent_states.back().timestamp + send_interval(); + } + + if ( next_send_time < next_wakeup ) { + next_wakeup = next_send_time; + } + } + + if ( !connection.get_attached() ) { + return -1; + } + + if ( next_wakeup > timestamp() ) { + return next_wakeup - timestamp(); + } else { + return 0; } - return wait; } +/* Send data or an ack if necessary */ template -void Transport::send_to_receiver( void ) +void Transport::tick( void ) { + wait_time(); + if ( !connection.get_attached() ) { return; } - string diff = current_state.diff_from( assumed_receiver_state->state ); - - if ( diff.empty() ) { - /* send empty ack */ - if ( (!connection.pending_timestamp()) - && (timestamp() - sent_states.back().timestamp < int64_t( ACK_INTERVAL )) - && (sent_states.back().num > 0) ) { - return; - } - - uint64_t new_num = sent_states.back().num + 1; - - send_in_fragments( diff, new_num, false ); - sent_states.push_back( TimestampedState( timestamp(), new_num, current_state ) ); - + if ( (timestamp() < next_ack_time) + && (timestamp() < next_send_time) ) { return; } + /* Determine if a new diff or empty ack needs to be sent */ + /* Update assumed receiver state */ + update_assumed_receiver_state(); + + /* Cut out common prefix of all states */ + rationalize_states(); + + string diff = current_state.diff_from( assumed_receiver_state->state ); + + if ( diff.empty() && (timestamp() >= next_ack_time) ) { + /* + if ( verbose ) + fprintf( stderr, "Sending empty ack (ts=%d, next_send=%d, next_ack=%d)\n", + (int)timestamp() % 100000, + (int)next_send_time % 100000, + (int)next_ack_time % 100000 ); + */ + send_empty_ack(); + return; + } + + if ( !diff.empty() && ( (timestamp() >= next_send_time) + || (timestamp() >= next_ack_time) ) ) { + /* Send diffs or ack */ + /* + if ( verbose ) + fprintf( stderr, "Sending packet (ts=%d, next_send=%d, next_ack=%d)\n", + (int)timestamp() % 100000, + (int)next_send_time % 100000, + (int)next_ack_time % 100000 ); + */ + send_to_receiver( diff ); + } +} + +template +void Transport::send_empty_ack( void ) +{ + assert ( timestamp() >= next_ack_time ); + + uint64_t new_num = sent_states.back().num + 1; + + send_in_fragments( "", new_num, false ); + sent_states.push_back( TimestampedState( sent_states.back().timestamp, new_num, current_state ) ); + next_ack_time = timestamp() + ACK_INTERVAL; +} + +template +void Transport::send_to_receiver( string diff ) +{ uint64_t new_num; if ( current_state == sent_states.back().state ) { /* previously sent */ new_num = sent_states.back().num; @@ -138,6 +191,8 @@ void Transport::send_to_receiver( void ) /* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */ assumed_receiver_state = sent_states.end(); assumed_receiver_state--; + next_ack_time = timestamp() + ACK_INTERVAL; + next_send_time = -1; } template @@ -149,16 +204,19 @@ void Transport::update_assumed_receiver_state( void ) transmitted recently enough ago */ assumed_receiver_state = sent_states.begin(); - for ( typename list< TimestampedState >::iterator i = sent_states.begin(); - i != sent_states.end(); - i++ ) { + typename list< TimestampedState >::iterator i = sent_states.begin(); + i++; + + while ( i != sent_states.end() ) { assert( now >= i->timestamp ); - if ( int(now - i->timestamp) < connection.timeout() ) { + if ( int(now - i->timestamp) < connection.timeout() + ACK_DELAY ) { assumed_receiver_state = i; } else { return; } + + i++; } } @@ -229,6 +287,11 @@ void Transport::recv( void ) return; } } + /* + if ( verbose ) + fprintf( stderr, "[%d] Received state %d [ack %d]\n", + (int)timestamp() % 100000, (int)new_state.num, (int)inst.ack_num ); + */ received_states.push_back( new_state ); } } @@ -318,10 +381,11 @@ void Transport::send_in_fragments( string diff, uint64_t n connection.send( s, send_timestamp ); if ( verbose ) { - fprintf( stderr, "Sent [%d=>%d] frag %d, ack=%d, throwaway=%d, len=%d, frame rate=%.2f\n", - (int)inst.old_num, (int)inst.new_num, (int)inst.fragment_num, + fprintf( stderr, "[%d] Sent [%d=>%d] frag %d, ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n", + (int)(timestamp() % 100000), (int)inst.old_num, (int)inst.new_num, (int)inst.fragment_num, (int)inst.ack_num, (int)inst.throwaway_num, (int)inst.diff.size(), - 1000.0 / (double)send_interval() ); + 1000.0 / (double)send_interval(), + (int)connection.timeout() ); } } while ( !diff.empty() ); } diff --git a/networktransport.hpp b/networktransport.hpp index 585306c..f91e778 100644 --- a/networktransport.hpp +++ b/networktransport.hpp @@ -76,16 +76,19 @@ namespace Network { class Transport { private: - static const unsigned int SEND_INTERVAL_MIN = 20; /* ms between frames */ - static const unsigned int SEND_INTERVAL_MAX = 250; /* ms between frames */ + static const int SEND_INTERVAL_MIN = 20; /* ms between frames */ + static const int SEND_INTERVAL_MAX = 250; /* ms between frames */ static const int ACK_INTERVAL = 1000; /* ms between empty acks */ + static const int ACK_DELAY = 10; /* ms before delayed ack */ + static const int SEND_MINDELAY = 20; /* ms to collect all input */ static const int HEADER_LEN = 120; /* helper methods for tick() */ unsigned int send_interval( void ); void update_assumed_receiver_state( void ); void rationalize_states( void ); - void send_to_receiver( void ); + void send_to_receiver( string diff ); + void send_empty_ack( void ); void send_in_fragments( string diff, uint64_t new_num, bool send_timestamp = true ); /* helper methods for recv() */ @@ -112,15 +115,19 @@ namespace Network { FragmentAssembly fragments; bool verbose; + uint64_t next_ack_time; + uint64_t next_send_time; public: Transport( MyState &initial_state, RemoteState &initial_remote ); Transport( MyState &initial_state, RemoteState &initial_remote, const char *key_str, const char *ip, int port ); - /* Send data or an ack if necessary. - Returns the number of ms to wait until next event. */ - int tick( void ); + /* Send data or an ack if necessary. */ + void tick( void ); + + /* Returns the number of ms to wait until next event. */ + int wait_time( void ); /* Blocks waiting for a packet. */ void recv( void ); diff --git a/ntester.cpp b/ntester.cpp index ea41d4c..ac2e6b9 100644 --- a/ntester.cpp +++ b/ntester.cpp @@ -45,11 +45,13 @@ int main( int argc, char *argv[] ) uint64_t last_num = n->get_remote_state_num(); while ( true ) { try { - if ( poll( &my_pollfd, 1, n->tick() ) < 0 ) { + if ( poll( &my_pollfd, 1, n->wait_time() ) < 0 ) { perror( "poll" ); exit( 1 ); } + n->tick(); + if ( my_pollfd.revents & POLLIN ) { n->recv(); @@ -89,10 +91,12 @@ int main( int argc, char *argv[] ) while( true ) { try { - if ( poll( fds, 2, n->tick() ) < 0 ) { + if ( poll( fds, 2, n->wait_time() ) < 0 ) { perror( "poll" ); } + n->tick(); + if ( fds[ 0 ].revents & POLLIN ) { char x; assert( read( STDIN_FILENO, &x, 1 ) == 1 ); diff --git a/stm-server.cpp b/stm-server.cpp index 9d54bc6..696a068 100644 --- a/stm-server.cpp +++ b/stm-server.cpp @@ -136,7 +136,7 @@ void serve( int host_fd ) while ( 1 ) { try { - int active_fds = poll( pollfds, 2, network.tick() ); + int active_fds = poll( pollfds, 2, network.wait_time() ); if ( active_fds < 0 ) { perror( "poll" ); break; @@ -206,6 +206,8 @@ void serve( int host_fd ) & (POLLERR | POLLHUP | POLLNVAL) ) { break; } + + network.tick(); } catch ( Network::NetworkException e ) { fprintf( stderr, "%s: %s\r\n", e.function.c_str(), strerror( e.the_errno ) ); sleep( 1 ); diff --git a/stm.cpp b/stm.cpp index 0d233ec..52a11db 100644 --- a/stm.cpp +++ b/stm.cpp @@ -143,7 +143,7 @@ void client( const char *ip, int port, const char *key ) while ( 1 ) { try { - int active_fds = poll( pollfds, 3, network.tick() ); + int active_fds = poll( pollfds, 3, network.wait_time() ); if ( active_fds < 0 ) { perror( "poll" ); break; @@ -208,6 +208,8 @@ void client( const char *ip, int port, const char *key ) & (POLLERR | POLLHUP | POLLNVAL) ) { break; } + + network.tick(); } catch ( Network::NetworkException e ) { fprintf( stderr, "%s: %s\r\n", e.function.c_str(), strerror( e.the_errno ) ); sleep( 1 );