Move to Google protobuf for Instruction
This commit is contained in:
+43
-23
@@ -13,6 +13,8 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState
|
||||
current_state( initial_state ),
|
||||
sent_states( 1, TimestampedState<MyState>( timestamp(), 0, initial_state ) ),
|
||||
assumed_receiver_state( sent_states.begin() ),
|
||||
next_instruction_id( -1 ),
|
||||
last_instruction_sent(),
|
||||
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ),
|
||||
last_receiver_state( initial_remote ),
|
||||
fragments(),
|
||||
@@ -31,6 +33,8 @@ Transport<MyState, RemoteState>::Transport( MyState &initial_state, RemoteState
|
||||
current_state( initial_state ),
|
||||
sent_states( 1, TimestampedState<MyState>( timestamp(), 0, initial_state ) ),
|
||||
assumed_receiver_state( sent_states.begin() ),
|
||||
next_instruction_id( -1 ),
|
||||
last_instruction_sent(),
|
||||
received_states( 1, TimestampedState<RemoteState>( timestamp(), 0, initial_remote ) ),
|
||||
last_receiver_state( initial_remote ),
|
||||
fragments(),
|
||||
@@ -240,18 +244,18 @@ template <class MyState, class RemoteState>
|
||||
void Transport<MyState, RemoteState>::recv( void )
|
||||
{
|
||||
string s( connection.recv() );
|
||||
Instruction frag( s );
|
||||
Fragment frag( s );
|
||||
|
||||
if ( fragments.add_fragment( frag ) ) { /* complete packet */
|
||||
Instruction inst = fragments.get_assembly();
|
||||
|
||||
process_acknowledgment_through( inst.ack_num );
|
||||
process_acknowledgment_through( inst.ack_num() );
|
||||
|
||||
/* first, make sure we don't already have the new state */
|
||||
for ( typename list< TimestampedState<RemoteState> >::iterator i = received_states.begin();
|
||||
i != received_states.end();
|
||||
i++ ) {
|
||||
if ( inst.new_num == i->num ) {
|
||||
if ( inst.new_num() == i->num ) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -260,7 +264,7 @@ void Transport<MyState, RemoteState>::recv( void )
|
||||
bool found = 0;
|
||||
typename list< TimestampedState<RemoteState> >::iterator reference_state = received_states.begin();
|
||||
while ( reference_state != received_states.end() ) {
|
||||
if ( inst.old_num == reference_state->num ) {
|
||||
if ( inst.old_num() == reference_state->num ) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
@@ -275,10 +279,10 @@ void Transport<MyState, RemoteState>::recv( void )
|
||||
/* apply diff to reference state */
|
||||
TimestampedState<RemoteState> new_state = *reference_state;
|
||||
new_state.timestamp = timestamp();
|
||||
new_state.num = inst.new_num;
|
||||
new_state.state.apply_string( inst.diff );
|
||||
new_state.num = inst.new_num();
|
||||
new_state.state.apply_string( inst.diff() );
|
||||
|
||||
process_throwaway_until( inst.throwaway_num );
|
||||
process_throwaway_until( inst.throwaway_num() );
|
||||
|
||||
/* Insert new state in sorted place */
|
||||
for ( typename list< TimestampedState<RemoteState> >::iterator i = received_states.begin();
|
||||
@@ -355,6 +359,25 @@ string Transport<MyState, RemoteState>::get_remote_diff( void )
|
||||
template <class MyState, class RemoteState>
|
||||
void Transport<MyState, RemoteState>::send_in_fragments( string diff, uint64_t new_num, bool send_timestamp )
|
||||
{
|
||||
Instruction inst;
|
||||
inst.set_old_num( assumed_receiver_state->num );
|
||||
inst.set_new_num( new_num );
|
||||
inst.set_ack_num( received_states.back().num );
|
||||
inst.set_throwaway_num( sent_states.front().num );
|
||||
inst.set_diff( diff );
|
||||
|
||||
if ( (inst.old_num() != last_instruction_sent.old_num())
|
||||
|| (inst.new_num() != last_instruction_sent.new_num())
|
||||
|| (inst.ack_num() != last_instruction_sent.ack_num())
|
||||
|| (inst.throwaway_num() != last_instruction_sent.throwaway_num()) ) {
|
||||
next_instruction_id++; /* make sure this happens before the first send in case of exception */
|
||||
last_instruction_sent = inst;
|
||||
} else {
|
||||
assert( inst.diff() == last_instruction_sent.diff() );
|
||||
}
|
||||
|
||||
string payload = inst.SerializeAsString();
|
||||
|
||||
uint16_t fragment_num = 0;
|
||||
|
||||
do {
|
||||
@@ -364,31 +387,28 @@ void Transport<MyState, RemoteState>::send_in_fragments( string diff, uint64_t n
|
||||
|
||||
bool final = false;
|
||||
|
||||
if ( int( diff.size() + HEADER_LEN ) > connection.get_MTU() ) {
|
||||
this_fragment = string( diff.begin(), diff.begin() + connection.get_MTU() - HEADER_LEN );
|
||||
diff = string( diff.begin() + connection.get_MTU() - HEADER_LEN, diff.end() );
|
||||
int MTU = connection.get_MTU();
|
||||
|
||||
if ( int( payload.size() + HEADER_LEN ) > MTU ) {
|
||||
this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN );
|
||||
payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() );
|
||||
} else {
|
||||
this_fragment = diff;
|
||||
diff.clear();
|
||||
this_fragment = payload;
|
||||
payload.clear();
|
||||
final = true;
|
||||
}
|
||||
|
||||
Instruction inst( assumed_receiver_state->num,
|
||||
new_num,
|
||||
received_states.back().num,
|
||||
sent_states.front().num,
|
||||
fragment_num++, final,
|
||||
this_fragment );
|
||||
string s = inst.tostring();
|
||||
Fragment frag( next_instruction_id, fragment_num++, final, this_fragment );
|
||||
string s = frag.tostring();
|
||||
|
||||
connection.send( s, send_timestamp );
|
||||
|
||||
if ( verbose ) {
|
||||
fprintf( stderr, "[%d] Sent [%d=>%d] frag %d, ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n",
|
||||
(int)(timestamp() % 100000), (int)inst.old_num, (int)inst.new_num, (int)inst.fragment_num,
|
||||
(int)inst.ack_num, (int)inst.throwaway_num, (int)inst.diff.size(),
|
||||
fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n",
|
||||
(int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)frag.id, (int)frag.fragment_num,
|
||||
(int)inst.ack_num(), (int)inst.throwaway_num(), (int)frag.contents.size(),
|
||||
1000.0 / (double)send_interval(),
|
||||
(int)connection.timeout() );
|
||||
}
|
||||
} while ( !diff.empty() );
|
||||
} while ( !payload.empty() );
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user