Successfully synchronizes state. Here testing with 50% bidi packet loss.

This commit is contained in:
Keith Winstein
2011-08-09 22:36:22 -04:00
parent 188c44f5be
commit 66a64f0b22
6 changed files with 193 additions and 37 deletions
+115 -21
View File
@@ -22,20 +22,20 @@ uint64_t Transport<MyState, RemoteState>::timestamp( void )
}
template <class MyState, class RemoteState>
Transport<MyState, RemoteState>::Transport( MyState &initial_state )
Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState &initial_remote )
: connection(),
server( true ),
current_state( initial_state ),
sent_states( 1, TimestampedState<MyState>( timestamp(), 0, initial_state ) ),
assumed_receiver_state( sent_states.begin() ),
timeout( INITIAL_TIMEOUT ),
highest_state_received( 0 )
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) )
{
/* server */
}
template <class MyState, class RemoteState>
Transport<MyState, RemoteState>::Transport( MyState &initial_state,
Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState &initial_remote,
const char *key_str, const char *ip, int port )
: connection( key_str, ip, port ),
server( false ),
@@ -43,7 +43,7 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state,
sent_states( 1, TimestampedState<MyState>( timestamp(), 0, initial_state ) ),
assumed_receiver_state( sent_states.begin() ),
timeout( INITIAL_TIMEOUT ),
highest_state_received( 0 )
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) )
{
/* client */
}
@@ -54,10 +54,6 @@ void Transport<MyState, RemoteState>::tick( void )
/* Update assumed receiver state */
update_assumed_receiver_state();
fprintf( stderr, "Assumed receiver state: %d/%d\r\n",
int(assumed_receiver_state->num),
int(sent_states.back().num) );
/* Cut out common prefix of all states */
rationalize_states();
@@ -88,14 +84,13 @@ void Transport<MyState, RemoteState>::send_to_receiver( void )
/* send empty ack */
Instruction inst( assumed_receiver_state->num,
assumed_receiver_state->num,
highest_state_received,
received_states.back().num,
sent_states.front().num,
"" );
string s = inst.tostring();
connection.send( s );
assumed_receiver_state->timestamp = timestamp();
fprintf( stderr, "Empty ack.\r\n" );
return;
}
@@ -106,7 +101,10 @@ void Transport<MyState, RemoteState>::send_to_receiver( void )
exit( 1 );
}
Instruction inst( assumed_receiver_state->num, -1, highest_state_received,
Instruction inst( assumed_receiver_state->num,
-1,
received_states.back().num,
sent_states.front().num,
current_state.diff_from( assumed_receiver_state->state,
connection.get_MTU() - HEADER_LEN ) );
MyState new_state = assumed_receiver_state->state;
@@ -121,6 +119,17 @@ void Transport<MyState, RemoteState>::send_to_receiver( void )
previously_sent++;
}
/* Reusing state numbers is only for intermediate states */
/* If this is the final diff in a sequence, make sure it does get the highest
state number (even if we've retread to previously-seen ground ) */
/* This will force the client to update to this state */
typename list< TimestampedState<MyState> >::iterator last = sent_states.end();
last--;
if ( (previously_sent != last)
&& (new_state == target_receiver_state) ) {
previously_sent = sent_states.end();
}
if ( previously_sent == sent_states.end() ) { /* not previously sent */
inst.new_num = sent_states.back().num + 1;
sent_states.push_back( TimestampedState<MyState>( timestamp(), inst.new_num, new_state ) );
@@ -136,11 +145,6 @@ void Transport<MyState, RemoteState>::send_to_receiver( void )
string s = inst.tostring();
try {
fprintf( stderr, "Sending: " );
for ( size_t i = 0; i < s.size(); i++ ) {
fprintf( stderr, "%c", s[ i ] );
}
fprintf( stderr, "\r\n" );
connection.send( s );
} catch ( MTUException m ) {
continue;
@@ -177,11 +181,101 @@ void Transport<MyState, RemoteState>::rationalize_states( void )
{
MyState * const known_receiver_state = &sent_states.front().state;
for ( typename list< TimestampedState<MyState> >::iterator i = sent_states.begin();
i != sent_states.end();
current_state.subtract( known_receiver_state );
for ( typename list< TimestampedState<MyState> >::reverse_iterator i = sent_states.rbegin();
i != sent_states.rend();
i++ ) {
i->state.subtract( known_receiver_state );
}
current_state.subtract( known_receiver_state );
}
template <class MyState, class RemoteState>
void Transport<MyState, RemoteState>::recv( void )
{
string s( connection.recv() );
Instruction inst( s );
process_acknowledgment_through( inst.ack_num );
// process_throwaway_until( inst.throwaway.num );
/* first, make sure we don't already have the new state */
for ( typename list< TimestampedState<RemoteState> >::iterator i = received_states.begin();
i != received_states.end();
i++ ) {
if ( inst.new_num == i->num ) {
i->timestamp = timestamp();
return;
}
}
/* now, make sure we do have the old state */
bool found = 0;
typename list< TimestampedState<RemoteState> >::iterator reference_state = received_states.begin();
while ( reference_state != received_states.end() ) {
if ( inst.old_num == reference_state->num ) {
found = true;
break;
}
reference_state++;
}
if ( !found ) {
fprintf( stderr, "Ignoring out-of-order packet. Reference state %d has been discarded.\n", int(inst.old_num) );
return;
}
/* apply diff to reference state */
TimestampedState<RemoteState> new_state = *reference_state;
new_state.timestamp = timestamp();
new_state.num = inst.new_num;
new_state.state.apply_string( inst.diff );
if ( new_state.num > received_states.back().num ) {
process_throwaway_until( inst.throwaway_num );
}
/* Insert new state in sorted place */
for ( typename list< TimestampedState<RemoteState> >::iterator i = received_states.begin();
i != received_states.end();
i++ ) {
if ( i->num > new_state.num ) {
received_states.insert( i, new_state );
return;
}
}
received_states.push_back( new_state );
}
template <class MyState, class RemoteState>
void Transport<MyState, RemoteState>::process_acknowledgment_through( uint64_t ack_num )
{
typename list< TimestampedState<MyState> >::iterator i = sent_states.begin();
while ( i != sent_states.end() ) {
typename list< TimestampedState<MyState> >::iterator inext = i;
inext++;
if ( i->num < ack_num ) {
sent_states.erase( i );
}
i = inext;
}
assert( sent_states.size() > 0 );
assert( sent_states.front().num == ack_num );
}
template <class MyState, class RemoteState>
void Transport<MyState, RemoteState>::process_throwaway_until( uint64_t throwaway_num )
{
typename list< TimestampedState<MyState> >::iterator i = received_states.begin();
while ( i != received_states.end() ) {
typename list< TimestampedState<MyState> >::iterator inext = i;
inext++;
if ( i->num < throwaway_num ) {
sent_states.erase( i );
}
i = inext;
}
assert( received_states.size() > 0 );
}