Switch to 64-bit fragment ID, fragmenting in separate function

This commit is contained in:
Keith Winstein
2011-09-13 00:55:18 -04:00
parent 62daa19f28
commit a24443638b
6 changed files with 83 additions and 52 deletions
+1 -1
View File
@@ -169,7 +169,7 @@ void Connection::update_MTU( void )
}
}
void Connection::send( string &s, bool send_timestamp )
void Connection::send( string s, bool send_timestamp )
{
assert( attached );
+1 -1
View File
@@ -89,7 +89,7 @@ namespace Network {
Connection();
Connection( const char *key_str, const char *ip, int port );
void send( string &s, bool send_timestamp = true );
void send( string s, bool send_timestamp = true );
string recv( void );
int fd( void ) { return sock; }
int get_MTU( void ) { return MTU; }
+50 -2
View File
@@ -13,6 +13,12 @@ static string network_order_string( uint16_t host_order )
return string( (char *)&net_int, sizeof( net_int ) );
}
static string network_order_string( uint64_t host_order )
{
uint64_t net_int = htobe64( host_order );
return string( (char *)&net_int, sizeof( net_int ) );
}
string Fragment::tostring( void )
{
assert( initialized );
@@ -38,9 +44,10 @@ Fragment::Fragment( string &x )
{
assert( x.size() >= frag_header_len );
uint64_t *data64 = (uint64_t *)x.data();
uint16_t *data16 = (uint16_t *)x.data();
id = be16toh( data16[ 0 ] );
fragment_num = be16toh( data16[ 1 ] );
id = be64toh( data64[ 0 ] );
fragment_num = be16toh( data16[ 4 ] );
final = ( fragment_num & 0x8000 ) >> 15;
fragment_num &= 0x7FFF;
}
@@ -110,3 +117,44 @@ bool Fragment::operator==( const Fragment &x )
return ( id == x.id ) && ( fragment_num == x.fragment_num ) && ( final == x.final )
&& ( initialized == x.initialized ) && ( contents == x.contents );
}
vector<Fragment> Fragmenter::make_fragments( Instruction &inst, int MTU )
{
if ( (inst.old_num() != last_instruction.old_num())
|| (inst.new_num() != last_instruction.new_num())
|| (inst.ack_num() != last_instruction.ack_num())
|| (inst.throwaway_num() != last_instruction.throwaway_num())
|| (last_MTU != MTU) ) {
next_instruction_id++;
}
if ( (inst.old_num() == last_instruction.old_num())
&& (inst.new_num() == last_instruction.new_num()) ) {
assert( inst.diff() == last_instruction.diff() );
}
last_instruction = inst;
last_MTU = MTU;
string payload = inst.SerializeAsString();
uint16_t fragment_num = 0;
vector<Fragment> ret;
while ( !payload.empty() ) {
string this_fragment;
bool final = false;
if ( int( payload.size() + HEADER_LEN ) > MTU ) {
this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN );
payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() );
} else {
this_fragment = payload;
payload.clear();
final = true;
}
ret.push_back( Fragment( next_instruction_id, fragment_num++, final, this_fragment ) );
}
return ret;
}
+20 -4
View File
@@ -11,13 +11,15 @@ using namespace std;
using namespace TransportBuffers;
namespace Network {
static const int HEADER_LEN = 66;
class Fragment
{
private:
static const size_t frag_header_len = 2 * sizeof( uint16_t );
static const size_t frag_header_len = sizeof( uint64_t ) + sizeof( uint16_t );
public:
uint16_t id;
uint64_t id;
uint16_t fragment_num;
bool final;
@@ -29,7 +31,7 @@ namespace Network {
: id( -1 ), fragment_num( -1 ), final( false ), initialized( false ), contents()
{}
Fragment( uint16_t s_id, uint16_t s_fragment_num, bool s_final, string s_contents )
Fragment( uint64_t s_id, uint16_t s_fragment_num, bool s_final, string s_contents )
: id( s_id ), fragment_num( s_fragment_num ), final( s_final ), initialized( true ),
contents( s_contents )
{}
@@ -45,7 +47,7 @@ namespace Network {
{
private:
vector<Fragment> fragments;
uint16_t current_id;
uint64_t current_id;
int fragments_arrived, fragments_total;
public:
@@ -53,6 +55,20 @@ namespace Network {
bool add_fragment( Fragment &inst );
Instruction get_assembly( void );
};
class Fragmenter
{
private:
uint64_t next_instruction_id;
Instruction last_instruction;
int last_MTU;
public:
Fragmenter() : next_instruction_id( 0 ), last_instruction(), last_MTU( -1 ) {}
vector<Fragment> make_fragments( Instruction &inst, int MTU );
uint64_t last_ack_sent( void ) { return last_instruction.ack_num(); }
};
}
#endif
+8 -40
View File
@@ -9,8 +9,7 @@ TransportSender<MyState>::TransportSender( Connection *s_connection, MyState &in
current_state( initial_state ),
sent_states( 1, TimestampedState<MyState>( timestamp(), 0, initial_state ) ),
assumed_receiver_state( sent_states.begin() ),
next_instruction_id( -1 ),
last_instruction_sent(),
fragmenter(),
next_ack_time( timestamp() ),
next_send_time( timestamp() ),
verbose( false ),
@@ -225,51 +224,20 @@ void TransportSender<MyState>::send_in_fragments( string diff, uint64_t new_num,
inst.set_throwaway_num( sent_states.front().num );
inst.set_diff( diff );
if ( (inst.old_num() != last_instruction_sent.old_num())
|| (inst.new_num() != last_instruction_sent.new_num())
|| (inst.ack_num() != last_instruction_sent.ack_num())
|| (inst.throwaway_num() != last_instruction_sent.throwaway_num()) ) {
next_instruction_id++; /* make sure this happens before the first send in case of exception */
last_instruction_sent = inst;
} else {
assert( inst.diff() == last_instruction_sent.diff() );
}
vector<Fragment> fragments = fragmenter.make_fragments( inst, connection->get_MTU() );
string payload = inst.SerializeAsString();
uint16_t fragment_num = 0;
do {
string this_fragment;
assert( fragment_num <= 32767 );
bool final = false;
int MTU = connection->get_MTU();
if ( int( payload.size() + HEADER_LEN ) > MTU ) {
this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN );
payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() );
} else {
this_fragment = payload;
payload.clear();
final = true;
}
Fragment frag( next_instruction_id, fragment_num++, final, this_fragment );
string s = frag.tostring();
connection->send( s, send_timestamp );
for ( auto i = fragments.begin(); i != fragments.end(); i++ ) {
connection->send( i->tostring(), send_timestamp );
if ( verbose ) {
fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n",
(int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)frag.id, (int)frag.fragment_num,
(int)inst.ack_num(), (int)inst.throwaway_num(), (int)frag.contents.size(),
(int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num,
(int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(),
1000.0 / (double)send_interval(),
(int)connection->timeout() );
}
} while ( !payload.empty() );
}
}
template <class MyState>
+3 -4
View File
@@ -8,6 +8,7 @@
#include "network.hpp"
#include "transportinstruction.pb.h"
#include "transportstate.hpp"
#include "transportfragment.hpp"
using namespace std;
using namespace TransportBuffers;
@@ -23,7 +24,6 @@ namespace Network {
static const int ACK_INTERVAL = 1000; /* ms between empty acks */
static const int ACK_DELAY = 10; /* ms before delayed ack */
static const int SEND_MINDELAY = 20; /* ms to collect all input */
static const int HEADER_LEN = 60;
/* helper methods for tick() */
unsigned int send_interval( void );
@@ -46,8 +46,7 @@ namespace Network {
typename list< TimestampedState<MyState> >::iterator assumed_receiver_state;
/* for fragment creation */
uint16_t next_instruction_id;
Instruction last_instruction_sent;
Fragmenter fragmenter;
/* timing state */
uint64_t next_ack_time;
@@ -86,7 +85,7 @@ namespace Network {
bool get_shutdown_in_progress( void ) { return shutdown_in_progress; }
bool get_shutdown_acknowledged( void ) { return sent_states.front().num == uint64_t(-1); }
bool get_counterparty_shutdown_acknowledged( void ) { return last_instruction_sent.ack_num() == uint64_t(-1); }
bool get_counterparty_shutdown_acknowledged( void ) { return fragmenter.last_ack_sent() == uint64_t(-1); }
/* nonexistent methods to satisfy -Weffc++ */
TransportSender( const TransportSender &x );