Cleanup fragment reassembly
This commit is contained in:
+21
-24
@@ -25,7 +25,12 @@ string Instruction::tostring( void )
|
|||||||
ret += network_order_string( new_num );
|
ret += network_order_string( new_num );
|
||||||
ret += network_order_string( ack_num );
|
ret += network_order_string( ack_num );
|
||||||
ret += network_order_string( throwaway_num );
|
ret += network_order_string( throwaway_num );
|
||||||
ret += network_order_string( fragment_num );
|
|
||||||
|
assert( !( fragment_num & 0x8000 ) );
|
||||||
|
|
||||||
|
uint16_t combined_fragment_num = ( final << 15 ) | fragment_num;
|
||||||
|
|
||||||
|
ret += network_order_string( combined_fragment_num );
|
||||||
|
|
||||||
assert( ret.size() == inst_header_len );
|
assert( ret.size() == inst_header_len );
|
||||||
|
|
||||||
@@ -35,7 +40,7 @@ string Instruction::tostring( void )
|
|||||||
}
|
}
|
||||||
|
|
||||||
Instruction::Instruction( string &x )
|
Instruction::Instruction( string &x )
|
||||||
: old_num( -1 ), new_num( -1 ), ack_num( -1 ), throwaway_num( -1 ), fragment_num( -1 ), diff()
|
: old_num( -1 ), new_num( -1 ), ack_num( -1 ), throwaway_num( -1 ), fragment_num( -1 ), final( false ), diff()
|
||||||
{
|
{
|
||||||
assert( x.size() >= inst_header_len );
|
assert( x.size() >= inst_header_len );
|
||||||
uint64_t *data = (uint64_t *)x.data();
|
uint64_t *data = (uint64_t *)x.data();
|
||||||
@@ -45,6 +50,8 @@ Instruction::Instruction( string &x )
|
|||||||
ack_num = be64toh( data[ 2 ] );
|
ack_num = be64toh( data[ 2 ] );
|
||||||
throwaway_num = be64toh( data[ 3 ] );
|
throwaway_num = be64toh( data[ 3 ] );
|
||||||
fragment_num = be16toh( data16[ 16 ] );
|
fragment_num = be16toh( data16[ 16 ] );
|
||||||
|
final = ( fragment_num & 0x8000 ) >> 15;
|
||||||
|
fragment_num &= 0x7FFF;
|
||||||
|
|
||||||
diff = string( x.begin() + inst_header_len, x.end() );
|
diff = string( x.begin() + inst_header_len, x.end() );
|
||||||
}
|
}
|
||||||
@@ -57,45 +64,35 @@ bool FragmentAssembly::same_template( Instruction &a, Instruction &b )
|
|||||||
|
|
||||||
bool FragmentAssembly::add_fragment( Instruction &inst )
|
bool FragmentAssembly::add_fragment( Instruction &inst )
|
||||||
{
|
{
|
||||||
/* decode fragment num */
|
|
||||||
bool last_fragment = inst.fragment_num > 32767;
|
|
||||||
uint16_t real_fragment_num = inst.fragment_num;
|
|
||||||
if ( last_fragment ) {
|
|
||||||
real_fragment_num -= 32768;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* see if this is a totally new packet */
|
/* see if this is a totally new packet */
|
||||||
if ( !same_template( inst, current_template ) ) {
|
if ( !same_template( inst, current_template ) ) {
|
||||||
fragments.clear();
|
fragments.clear();
|
||||||
current_template = inst;
|
current_template = inst;
|
||||||
fragments.resize( real_fragment_num + 1 );
|
fragments.resize( inst.fragment_num + 1 );
|
||||||
fragments.at( real_fragment_num ) = inst;
|
fragments.at( inst.fragment_num ) = inst;
|
||||||
fragments_arrived = 1;
|
fragments_arrived = 1;
|
||||||
fragments_total = -1;
|
fragments_total = -1;
|
||||||
} else { /* not a new packet */
|
} else { /* not a new packet */
|
||||||
/* see if we already have this fragment */
|
/* see if we already have this fragment */
|
||||||
if ( (fragments.size() > real_fragment_num)
|
if ( (fragments.size() > inst.fragment_num)
|
||||||
&& (fragments.at( real_fragment_num ).old_num != uint64_t(-1)) ) {
|
&& (fragments.at( inst.fragment_num ).old_num != uint64_t(-1)) ) {
|
||||||
assert( fragments.at( real_fragment_num ) == inst );
|
assert( fragments.at( inst.fragment_num ) == inst );
|
||||||
} else {
|
} else {
|
||||||
if ( (int)fragments.size() < real_fragment_num + 1 ) {
|
if ( (int)fragments.size() < inst.fragment_num + 1 ) {
|
||||||
fragments.resize( real_fragment_num + 1 );
|
fragments.resize( inst.fragment_num + 1 );
|
||||||
}
|
}
|
||||||
fragments.at( real_fragment_num ) = inst;
|
fragments.at( inst.fragment_num ) = inst;
|
||||||
fragments_arrived++;
|
fragments_arrived++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( last_fragment ) {
|
if ( inst.final ) {
|
||||||
fragments_total = real_fragment_num + 1;
|
fragments_total = inst.fragment_num + 1;
|
||||||
fragments.resize( fragments_total );
|
fragments.resize( fragments_total );
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( fragments_arrived == fragments_total ) {
|
if ( fragments_total != -1 ) {
|
||||||
assert( (int)fragments.size() == fragments_total );
|
assert( fragments_arrived <= fragments_total );
|
||||||
for ( unsigned int i = 0; i < fragments.size(); i++ ) {
|
|
||||||
assert( fragments.at( i ).old_num != uint64_t(-1) );
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* see if we're done */
|
/* see if we're done */
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ void Transport<MyState, RemoteState>::send_to_receiver( void )
|
|||||||
assumed_receiver_state->num,
|
assumed_receiver_state->num,
|
||||||
received_states.back().num,
|
received_states.back().num,
|
||||||
sent_states.front().num,
|
sent_states.front().num,
|
||||||
32768,
|
0, true,
|
||||||
"" );
|
"" );
|
||||||
string s = inst.tostring();
|
string s = inst.tostring();
|
||||||
connection.send( s, false );
|
connection.send( s, false );
|
||||||
@@ -287,26 +287,25 @@ void Transport<MyState, RemoteState>::send_in_fragments( string diff, uint64_t n
|
|||||||
|
|
||||||
assert( fragment_num <= 32767 );
|
assert( fragment_num <= 32767 );
|
||||||
|
|
||||||
|
bool final = false;
|
||||||
|
|
||||||
if ( int( diff.size() + HEADER_LEN ) > connection.get_MTU() ) {
|
if ( int( diff.size() + HEADER_LEN ) > connection.get_MTU() ) {
|
||||||
this_fragment = string( diff.begin(), diff.begin() + connection.get_MTU() - HEADER_LEN );
|
this_fragment = string( diff.begin(), diff.begin() + connection.get_MTU() - HEADER_LEN );
|
||||||
diff = string( diff.begin() + connection.get_MTU() - HEADER_LEN, diff.end() );
|
diff = string( diff.begin() + connection.get_MTU() - HEADER_LEN, diff.end() );
|
||||||
} else {
|
} else {
|
||||||
this_fragment = diff;
|
this_fragment = diff;
|
||||||
diff.clear();
|
diff.clear();
|
||||||
fragment_num += 32768; /* last fragment */
|
final = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
Instruction inst( assumed_receiver_state->num,
|
Instruction inst( assumed_receiver_state->num,
|
||||||
new_num,
|
new_num,
|
||||||
received_states.back().num,
|
received_states.back().num,
|
||||||
sent_states.front().num,
|
sent_states.front().num,
|
||||||
fragment_num++,
|
fragment_num++, final,
|
||||||
this_fragment );
|
this_fragment );
|
||||||
string s = inst.tostring();
|
string s = inst.tostring();
|
||||||
|
|
||||||
fprintf( stderr, "Sending [%d=>%d frag %d], len=%u\r\n",
|
|
||||||
(int)inst.old_num, (int)inst.new_num, inst.fragment_num, (unsigned int)inst.diff.size() );
|
|
||||||
|
|
||||||
connection.send( s );
|
connection.send( s );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,18 +22,19 @@ namespace Network {
|
|||||||
uint64_t ack_num;
|
uint64_t ack_num;
|
||||||
uint64_t throwaway_num;
|
uint64_t throwaway_num;
|
||||||
uint16_t fragment_num;
|
uint16_t fragment_num;
|
||||||
|
bool final;
|
||||||
|
|
||||||
string diff;
|
string diff;
|
||||||
|
|
||||||
Instruction() : old_num( -1 ), new_num( -1 ), ack_num( -1 ), throwaway_num( -1 ), fragment_num( -1 ),
|
Instruction() : old_num( -1 ), new_num( -1 ), ack_num( -1 ), throwaway_num( -1 ), fragment_num( -1 ), final( false ),
|
||||||
diff( "" )
|
diff( "" )
|
||||||
{}
|
{}
|
||||||
|
|
||||||
Instruction( uint64_t s_old_num, uint64_t s_new_num,
|
Instruction( uint64_t s_old_num, uint64_t s_new_num,
|
||||||
uint64_t s_ack_num, uint64_t s_throwaway_num, uint16_t s_fragment_num,
|
uint64_t s_ack_num, uint64_t s_throwaway_num, uint16_t s_fragment_num, bool s_final,
|
||||||
string s_diff )
|
string s_diff )
|
||||||
: old_num( s_old_num ), new_num( s_new_num ),
|
: old_num( s_old_num ), new_num( s_new_num ),
|
||||||
ack_num( s_ack_num ), throwaway_num( s_throwaway_num ), fragment_num( s_fragment_num ),
|
ack_num( s_ack_num ), throwaway_num( s_throwaway_num ), fragment_num( s_fragment_num ), final( s_final ),
|
||||||
diff( s_diff )
|
diff( s_diff )
|
||||||
{}
|
{}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user