diff --git a/network.cpp b/network.cpp index 28d2721..45ac224 100644 --- a/network.cpp +++ b/network.cpp @@ -169,7 +169,7 @@ void Connection::update_MTU( void ) } } -void Connection::send( string &s, bool send_timestamp ) +void Connection::send( string s, bool send_timestamp ) { assert( attached ); diff --git a/network.hpp b/network.hpp index 666eae0..5ca7d57 100644 --- a/network.hpp +++ b/network.hpp @@ -89,7 +89,7 @@ namespace Network { Connection(); Connection( const char *key_str, const char *ip, int port ); - void send( string &s, bool send_timestamp = true ); + void send( string s, bool send_timestamp = true ); string recv( void ); int fd( void ) { return sock; } int get_MTU( void ) { return MTU; } diff --git a/transportfragment.cpp b/transportfragment.cpp index af82e92..1415de7 100644 --- a/transportfragment.cpp +++ b/transportfragment.cpp @@ -13,6 +13,12 @@ static string network_order_string( uint16_t host_order ) return string( (char *)&net_int, sizeof( net_int ) ); } +static string network_order_string( uint64_t host_order ) +{ + uint64_t net_int = htobe64( host_order ); + return string( (char *)&net_int, sizeof( net_int ) ); +} + string Fragment::tostring( void ) { assert( initialized ); @@ -38,9 +44,10 @@ Fragment::Fragment( string &x ) { assert( x.size() >= frag_header_len ); + uint64_t *data64 = (uint64_t *)x.data(); uint16_t *data16 = (uint16_t *)x.data(); - id = be16toh( data16[ 0 ] ); - fragment_num = be16toh( data16[ 1 ] ); + id = be64toh( data64[ 0 ] ); + fragment_num = be16toh( data16[ 4 ] ); final = ( fragment_num & 0x8000 ) >> 15; fragment_num &= 0x7FFF; } @@ -110,3 +117,44 @@ bool Fragment::operator==( const Fragment &x ) return ( id == x.id ) && ( fragment_num == x.fragment_num ) && ( final == x.final ) && ( initialized == x.initialized ) && ( contents == x.contents ); } + +vector Fragmenter::make_fragments( Instruction &inst, int MTU ) +{ + if ( (inst.old_num() != last_instruction.old_num()) + || (inst.new_num() != last_instruction.new_num()) + || (inst.ack_num() != last_instruction.ack_num()) + || (inst.throwaway_num() != last_instruction.throwaway_num()) + || (last_MTU != MTU) ) { + next_instruction_id++; + } + + if ( (inst.old_num() == last_instruction.old_num()) + && (inst.new_num() == last_instruction.new_num()) ) { + assert( inst.diff() == last_instruction.diff() ); + } + + last_instruction = inst; + last_MTU = MTU; + + string payload = inst.SerializeAsString(); + uint16_t fragment_num = 0; + vector ret; + + while ( !payload.empty() ) { + string this_fragment; + bool final = false; + + if ( int( payload.size() + HEADER_LEN ) > MTU ) { + this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN ); + payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() ); + } else { + this_fragment = payload; + payload.clear(); + final = true; + } + + ret.push_back( Fragment( next_instruction_id, fragment_num++, final, this_fragment ) ); + } + + return ret; +} diff --git a/transportfragment.hpp b/transportfragment.hpp index 44a5fad..8304360 100644 --- a/transportfragment.hpp +++ b/transportfragment.hpp @@ -11,13 +11,15 @@ using namespace std; using namespace TransportBuffers; namespace Network { + static const int HEADER_LEN = 66; + class Fragment { private: - static const size_t frag_header_len = 2 * sizeof( uint16_t ); + static const size_t frag_header_len = sizeof( uint64_t ) + sizeof( uint16_t ); public: - uint16_t id; + uint64_t id; uint16_t fragment_num; bool final; @@ -29,7 +31,7 @@ namespace Network { : id( -1 ), fragment_num( -1 ), final( false ), initialized( false ), contents() {} - Fragment( uint16_t s_id, uint16_t s_fragment_num, bool s_final, string s_contents ) + Fragment( uint64_t s_id, uint16_t s_fragment_num, bool s_final, string s_contents ) : id( s_id ), fragment_num( s_fragment_num ), final( s_final ), initialized( true ), contents( s_contents ) {} @@ -45,7 +47,7 @@ namespace Network { { private: vector fragments; - uint16_t current_id; + uint64_t current_id; int fragments_arrived, fragments_total; public: @@ -53,6 +55,20 @@ namespace Network { bool add_fragment( Fragment &inst ); Instruction get_assembly( void ); }; + + class Fragmenter + { + private: + uint64_t next_instruction_id; + Instruction last_instruction; + int last_MTU; + + public: + Fragmenter() : next_instruction_id( 0 ), last_instruction(), last_MTU( -1 ) {} + vector make_fragments( Instruction &inst, int MTU ); + uint64_t last_ack_sent( void ) { return last_instruction.ack_num(); } + }; + } #endif diff --git a/transportsender.cpp b/transportsender.cpp index 432673b..1579ff8 100644 --- a/transportsender.cpp +++ b/transportsender.cpp @@ -9,8 +9,7 @@ TransportSender::TransportSender( Connection *s_connection, MyState &in current_state( initial_state ), sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ), assumed_receiver_state( sent_states.begin() ), - next_instruction_id( -1 ), - last_instruction_sent(), + fragmenter(), next_ack_time( timestamp() ), next_send_time( timestamp() ), verbose( false ), @@ -225,51 +224,20 @@ void TransportSender::send_in_fragments( string diff, uint64_t new_num, inst.set_throwaway_num( sent_states.front().num ); inst.set_diff( diff ); - if ( (inst.old_num() != last_instruction_sent.old_num()) - || (inst.new_num() != last_instruction_sent.new_num()) - || (inst.ack_num() != last_instruction_sent.ack_num()) - || (inst.throwaway_num() != last_instruction_sent.throwaway_num()) ) { - next_instruction_id++; /* make sure this happens before the first send in case of exception */ - last_instruction_sent = inst; - } else { - assert( inst.diff() == last_instruction_sent.diff() ); - } + vector fragments = fragmenter.make_fragments( inst, connection->get_MTU() ); - string payload = inst.SerializeAsString(); - - uint16_t fragment_num = 0; - - do { - string this_fragment; - - assert( fragment_num <= 32767 ); - - bool final = false; - - int MTU = connection->get_MTU(); - - if ( int( payload.size() + HEADER_LEN ) > MTU ) { - this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN ); - payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() ); - } else { - this_fragment = payload; - payload.clear(); - final = true; - } - - Fragment frag( next_instruction_id, fragment_num++, final, this_fragment ); - string s = frag.tostring(); - - connection->send( s, send_timestamp ); + for ( auto i = fragments.begin(); i != fragments.end(); i++ ) { + connection->send( i->tostring(), send_timestamp ); if ( verbose ) { fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n", - (int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)frag.id, (int)frag.fragment_num, - (int)inst.ack_num(), (int)inst.throwaway_num(), (int)frag.contents.size(), + (int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num, + (int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(), 1000.0 / (double)send_interval(), (int)connection->timeout() ); } - } while ( !payload.empty() ); + + } } template diff --git a/transportsender.hpp b/transportsender.hpp index 5306806..e4c3b9f 100644 --- a/transportsender.hpp +++ b/transportsender.hpp @@ -8,6 +8,7 @@ #include "network.hpp" #include "transportinstruction.pb.h" #include "transportstate.hpp" +#include "transportfragment.hpp" using namespace std; using namespace TransportBuffers; @@ -23,7 +24,6 @@ namespace Network { static const int ACK_INTERVAL = 1000; /* ms between empty acks */ static const int ACK_DELAY = 10; /* ms before delayed ack */ static const int SEND_MINDELAY = 20; /* ms to collect all input */ - static const int HEADER_LEN = 60; /* helper methods for tick() */ unsigned int send_interval( void ); @@ -46,8 +46,7 @@ namespace Network { typename list< TimestampedState >::iterator assumed_receiver_state; /* for fragment creation */ - uint16_t next_instruction_id; - Instruction last_instruction_sent; + Fragmenter fragmenter; /* timing state */ uint64_t next_ack_time; @@ -86,7 +85,7 @@ namespace Network { bool get_shutdown_in_progress( void ) { return shutdown_in_progress; } bool get_shutdown_acknowledged( void ) { return sent_states.front().num == uint64_t(-1); } - bool get_counterparty_shutdown_acknowledged( void ) { return last_instruction_sent.ack_num() == uint64_t(-1); } + bool get_counterparty_shutdown_acknowledged( void ) { return fragmenter.last_ack_sent() == uint64_t(-1); } /* nonexistent methods to satisfy -Weffc++ */ TransportSender( const TransportSender &x );