diff --git a/Makefile b/Makefile index b81105d..3a4b829 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ proto = userinput.proto transportinstruction.proto -source = parse.cpp parserstate.cpp parser.cpp templates.cpp terminal.cpp termemu.cpp parseraction.cpp terminalfunctions.cpp swrite.cpp terminalframebuffer.cpp terminaldispatcher.cpp terminaluserinput.cpp terminaldisplay.cpp network.cpp ntester.cpp ocb.cpp base64.cpp encrypt.cpp decrypt.cpp crypto.cpp networktransport.cpp networkfragment.cpp user.cpp userinput.pb.cc completeterminal.cpp stm-server.cpp stm.cpp transportinstruction.pb.cc -objects = parserstate.o parser.o templates.o terminal.o parseraction.o terminalfunctions.o swrite.o terminalframebuffer.o terminaldispatcher.o terminaluserinput.o terminaldisplay.o network.o ocb.o base64.o crypto.o networktransport.o networkfragment.o user.o userinput.pb.o completeterminal.o transportinstruction.pb.o +source = parse.cpp parserstate.cpp parser.cpp templates.cpp terminal.cpp termemu.cpp parseraction.cpp terminalfunctions.cpp swrite.cpp terminalframebuffer.cpp terminaldispatcher.cpp terminaluserinput.cpp terminaldisplay.cpp network.cpp ntester.cpp ocb.cpp base64.cpp encrypt.cpp decrypt.cpp crypto.cpp networktransport.cpp transportfragment.cpp user.cpp userinput.pb.cc completeterminal.cpp stm-server.cpp stm.cpp transportinstruction.pb.cc transportsender.cpp +objects = parserstate.o parser.o templates.o terminal.o parseraction.o terminalfunctions.o swrite.o terminalframebuffer.o terminaldispatcher.o terminaluserinput.o terminaldisplay.o network.o ocb.o base64.o crypto.o networktransport.o transportfragment.o user.o userinput.pb.o completeterminal.o transportinstruction.pb.o transportsender.o repos = templates.rpo executables = parse termemu ntester encrypt decrypt stm-server stm diff --git a/networktransport.cpp b/networktransport.cpp index f5e5c03..72d7407 100644 --- a/networktransport.cpp +++ b/networktransport.cpp @@ -9,18 +9,10 @@ using namespace std; template Transport::Transport( MyState &initial_state, RemoteState &initial_remote ) : connection(), - server( true ), - current_state( initial_state ), - sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ), - assumed_receiver_state( sent_states.begin() ), - next_instruction_id( -1 ), - last_instruction_sent(), + sender( &connection, initial_state ), received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), - fragments(), - verbose( false ), - next_ack_time( timestamp() ), - next_send_time( timestamp() ) + fragments() { /* server */ } @@ -29,217 +21,14 @@ template Transport::Transport( MyState &initial_state, RemoteState &initial_remote, const char *key_str, const char *ip, int port ) : connection( key_str, ip, port ), - server( false ), - current_state( initial_state ), - sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ), - assumed_receiver_state( sent_states.begin() ), - next_instruction_id( -1 ), - last_instruction_sent(), + sender( &connection, initial_state ), received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), - fragments(), - verbose( false ), - next_ack_time( timestamp() ), - next_send_time( timestamp() ) + fragments() { /* client */ } -/* Try to send roughly two frames per RTT, bounded by limits on frame rate */ -template -unsigned int Transport::send_interval( void ) -{ - int SEND_INTERVAL = lrint( ceil( (connection.get_SRTT() - ACK_DELAY) / 2.0 ) ); - if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) { - SEND_INTERVAL = SEND_INTERVAL_MIN; - } else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) { - SEND_INTERVAL = SEND_INTERVAL_MAX; - } - - return SEND_INTERVAL; -} - -/* How many ms can the caller wait before we will have an event (empty ack or next frame)? */ -template -int Transport::wait_time( void ) -{ - if ( connection.pending_timestamp() && ( next_ack_time > timestamp() + ACK_DELAY ) ) { - next_ack_time = timestamp() + ACK_DELAY; - } - - uint64_t next_wakeup = next_ack_time; - - if ( !(current_state == sent_states.back().state) ) { /* pending data to send */ - if ( next_send_time > timestamp() + SEND_MINDELAY ) { - next_send_time = timestamp() + SEND_MINDELAY; - } - - if ( next_send_time < sent_states.back().timestamp + send_interval() ) { - next_send_time = sent_states.back().timestamp + send_interval(); - } - - if ( next_send_time < next_wakeup ) { - next_wakeup = next_send_time; - } - } - - if ( !connection.get_attached() ) { - return -1; - } - - if ( next_wakeup > timestamp() ) { - return next_wakeup - timestamp(); - } else { - return 0; - } -} - -/* Send data or an empty ack if necessary */ -template -void Transport::tick( void ) -{ - wait_time(); - - if ( !connection.get_attached() ) { - return; - } - - if ( (timestamp() < next_ack_time) - && (timestamp() < next_send_time) ) { - return; - } - - /* Determine if a new diff or empty ack needs to be sent */ - /* Update assumed receiver state */ - update_assumed_receiver_state(); - - /* Cut out common prefix of all states */ - rationalize_states(); - - string diff = current_state.diff_from( assumed_receiver_state->state ); - - if ( diff.empty() && (timestamp() >= next_ack_time) ) { - /* - if ( verbose ) - fprintf( stderr, "Sending empty ack (ts=%d, next_send=%d, next_ack=%d)\n", - (int)timestamp() % 100000, - (int)next_send_time % 100000, - (int)next_ack_time % 100000 ); - */ - send_empty_ack(); - return; - } - - if ( !diff.empty() && ( (timestamp() >= next_send_time) - || (timestamp() >= next_ack_time) ) ) { - /* Send diffs or ack */ - /* - if ( verbose ) - fprintf( stderr, "Sending packet (ts=%d, next_send=%d, next_ack=%d)\n", - (int)timestamp() % 100000, - (int)next_send_time % 100000, - (int)next_ack_time % 100000 ); - */ - send_to_receiver( diff ); - } -} - -template -void Transport::send_empty_ack( void ) -{ - assert ( timestamp() >= next_ack_time ); - - uint64_t new_num = sent_states.back().num + 1; - - send_in_fragments( "", new_num, false ); - sent_states.push_back( TimestampedState( sent_states.back().timestamp, new_num, current_state ) ); - next_ack_time = timestamp() + ACK_INTERVAL; -} - -template -void Transport::send_to_receiver( string diff ) -{ - uint64_t new_num; - if ( current_state == sent_states.back().state ) { /* previously sent */ - new_num = sent_states.back().num; - } else { /* new state */ - new_num = sent_states.back().num + 1; - } - - bool done = false; - int MTU_tries = 0; - while ( !done ) { - MTU_tries++; - - if ( MTU_tries > 20 ) { - fprintf( stderr, "Error, could not send fragments after 20 tries (MTU = %d).\n", - connection.get_MTU() ); - } - - try { - send_in_fragments( diff, new_num ); - done = true; - } catch ( MTUException m ) { - fprintf( stderr, "Caught Path MTU exception, MTU now = %d\n", connection.get_MTU() ); - done = false; - } - - if ( new_num == sent_states.back().num ) { - sent_states.back().timestamp = timestamp(); - } else { - sent_states.push_back( TimestampedState( timestamp(), new_num, current_state ) ); - } - - new_num++; - } - - /* successfully sent, probably */ - /* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */ - assumed_receiver_state = sent_states.end(); - assumed_receiver_state--; - next_ack_time = timestamp() + ACK_INTERVAL; - next_send_time = -1; -} - -template -void Transport::update_assumed_receiver_state( void ) -{ - uint64_t now = timestamp(); - - /* start from what is known and give benefit of the doubt to unacknowledged states - transmitted recently enough ago */ - assumed_receiver_state = sent_states.begin(); - - typename list< TimestampedState >::iterator i = sent_states.begin(); - i++; - - while ( i != sent_states.end() ) { - assert( now >= i->timestamp ); - - if ( int(now - i->timestamp) < connection.timeout() + ACK_DELAY ) { - assumed_receiver_state = i; - } else { - return; - } - - i++; - } -} - -template -void Transport::rationalize_states( void ) -{ - const MyState * known_receiver_state = &sent_states.front().state; - - current_state.subtract( known_receiver_state ); - - for ( typename list< TimestampedState >::reverse_iterator i = sent_states.rbegin(); - i != sent_states.rend(); - i++ ) { - i->state.subtract( known_receiver_state ); - } -} - template void Transport::recv( void ) { @@ -249,7 +38,7 @@ void Transport::recv( void ) if ( fragments.add_fragment( frag ) ) { /* complete packet */ Instruction inst = fragments.get_assembly(); - process_acknowledgment_through( inst.ack_num() ); + sender.process_acknowledgment_through( inst.ack_num() ); /* first, make sure we don't already have the new state */ for ( typename list< TimestampedState >::iterator i = received_states.begin(); @@ -299,26 +88,10 @@ void Transport::recv( void ) (int)timestamp() % 100000, (int)new_state.num, (int)inst.ack_num ); */ received_states.push_back( new_state ); + sender.set_ack_num( received_states.back().num ); } } -template -void Transport::process_acknowledgment_through( uint64_t ack_num ) -{ - typename list< TimestampedState >::iterator i = sent_states.begin(); - while ( i != sent_states.end() ) { - typename list< TimestampedState >::iterator inext = i; - inext++; - if ( i->num < ack_num ) { - sent_states.erase( i ); - } - i = inext; - } - - assert( sent_states.size() > 0 ); - // assert( sent_states.front().num == ack_num ); -} - /* The sender uses throwaway_num to tell us the earliest received state that we need to keep around */ template void Transport::process_throwaway_until( uint64_t throwaway_num ) @@ -356,59 +129,3 @@ string Transport::get_remote_diff( void ) return ret; } -template -void Transport::send_in_fragments( string diff, uint64_t new_num, bool send_timestamp ) -{ - Instruction inst; - inst.set_old_num( assumed_receiver_state->num ); - inst.set_new_num( new_num ); - inst.set_ack_num( received_states.back().num ); - inst.set_throwaway_num( sent_states.front().num ); - inst.set_diff( diff ); - - if ( (inst.old_num() != last_instruction_sent.old_num()) - || (inst.new_num() != last_instruction_sent.new_num()) - || (inst.ack_num() != last_instruction_sent.ack_num()) - || (inst.throwaway_num() != last_instruction_sent.throwaway_num()) ) { - next_instruction_id++; /* make sure this happens before the first send in case of exception */ - last_instruction_sent = inst; - } else { - assert( inst.diff() == last_instruction_sent.diff() ); - } - - string payload = inst.SerializeAsString(); - - uint16_t fragment_num = 0; - - do { - string this_fragment; - - assert( fragment_num <= 32767 ); - - bool final = false; - - int MTU = connection.get_MTU(); - - if ( int( payload.size() + HEADER_LEN ) > MTU ) { - this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN ); - payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() ); - } else { - this_fragment = payload; - payload.clear(); - final = true; - } - - Fragment frag( next_instruction_id, fragment_num++, final, this_fragment ); - string s = frag.tostring(); - - connection.send( s, send_timestamp ); - - if ( verbose ) { - fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n", - (int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)frag.id, (int)frag.fragment_num, - (int)inst.ack_num(), (int)inst.throwaway_num(), (int)frag.contents.size(), - 1000.0 / (double)send_interval(), - (int)connection.timeout() ); - } - } while ( !payload.empty() ); -} diff --git a/networktransport.hpp b/networktransport.hpp index b972b66..be3699f 100644 --- a/networktransport.hpp +++ b/networktransport.hpp @@ -8,127 +8,41 @@ #include #include "network.hpp" -#include "transportinstruction.pb.h" +#include "transportsender.hpp" +#include "transportfragment.hpp" using namespace std; -using namespace TransportBuffers; namespace Network { - class Fragment - { - private: - static const size_t frag_header_len = 2 * sizeof( uint16_t ); - - public: - uint16_t id; - uint16_t fragment_num; - bool final; - - bool initialized; - - string contents; - - Fragment() - : id( -1 ), fragment_num( -1 ), final( false ), initialized( false ), contents() - {} - - Fragment( uint16_t s_id, uint16_t s_fragment_num, bool s_final, string s_contents ) - : id( s_id ), fragment_num( s_fragment_num ), final( s_final ), initialized( true ), - contents( s_contents ) - {} - - Fragment( string &x ); - - string tostring( void ); - - bool operator==( const Fragment &x ); - }; - - class FragmentAssembly - { - private: - vector fragments; - uint16_t current_id; - int fragments_arrived, fragments_total; - - public: - FragmentAssembly() : fragments(), current_id( -1 ), fragments_arrived( 0 ), fragments_total( -1 ) {} - bool add_fragment( Fragment &inst ); - Instruction get_assembly( void ); - }; - - template - class TimestampedState - { - public: - uint64_t timestamp; - uint64_t num; - State state; - - TimestampedState( uint64_t s_timestamp, uint64_t s_num, State &s_state ) - : timestamp( s_timestamp ), num( s_num ), state( s_state ) - {} - }; - template class Transport { private: - static const int SEND_INTERVAL_MIN = 20; /* ms between frames */ - static const int SEND_INTERVAL_MAX = 250; /* ms between frames */ - static const int ACK_INTERVAL = 1000; /* ms between empty acks */ - static const int ACK_DELAY = 10; /* ms before delayed ack */ - static const int SEND_MINDELAY = 20; /* ms to collect all input */ - static const int HEADER_LEN = 120; + /* the underlying, encrypted network connection */ + Connection connection; - /* helper methods for tick() */ - unsigned int send_interval( void ); - void update_assumed_receiver_state( void ); - void rationalize_states( void ); - void send_to_receiver( string diff ); - void send_empty_ack( void ); - void send_in_fragments( string diff, uint64_t new_num, bool send_timestamp = true ); + /* sender side */ + TransportSender sender; /* helper methods for recv() */ - void process_acknowledgment_through( uint64_t ack_num ); void process_throwaway_until( uint64_t throwaway_num ); - Connection connection; - bool server; - - /* sender */ - MyState current_state; - - list< TimestampedState > sent_states; - /* first element: known, acknowledged receiver state */ - /* last element: last sent state */ - /* somewhere in the middle: the assumed state of the receiver */ - - typename list< TimestampedState >::iterator assumed_receiver_state; - - uint16_t next_instruction_id; - Instruction last_instruction_sent; - /* simple receiver */ list< TimestampedState > received_states; RemoteState last_receiver_state; /* the state we were in when user last queried state */ FragmentAssembly fragments; - bool verbose; - uint64_t next_ack_time; - uint64_t next_send_time; - public: Transport( MyState &initial_state, RemoteState &initial_remote ); Transport( MyState &initial_state, RemoteState &initial_remote, const char *key_str, const char *ip, int port ); /* Send data or an ack if necessary. */ - void tick( void ); + void tick( void ) { sender.tick(); } - /* Returns the number of ms to wait until next event. */ - int wait_time( void ); + /* Returns the number of ms to wait until next possible event. */ + int wait_time( void ) { return sender.wait_time(); } /* Blocks waiting for a packet. */ void recv( void ); @@ -136,8 +50,8 @@ namespace Network { int port( void ) { return connection.port(); } string get_key( void ) { return connection.get_key(); } - MyState &get_current_state( void ) { return current_state; } - void set_current_state( const MyState &x ) { current_state = x; } + MyState &get_current_state( void ) { return sender.get_current_state(); } + void set_current_state( const MyState &x ) { sender.set_current_state( x ); } string get_remote_diff( void ); @@ -148,7 +62,7 @@ namespace Network { int fd( void ) { return connection.fd(); } - void set_verbose( void ) { verbose = true; } + void set_verbose( void ) { sender.set_verbose(); } }; } diff --git a/templates.cpp b/templates.cpp index 0dd069f..69aa83c 100644 --- a/templates.cpp +++ b/templates.cpp @@ -9,6 +9,7 @@ #include "user.hpp" #include "networktransport.cpp" +#include "transportsender.cpp" #include "userinput.pb.h" namespace Parser { @@ -33,4 +34,7 @@ template class Transport; template class Transport; template class Transport; +template class TransportSender; +template class TransportSender; + template class deque; diff --git a/networkfragment.cpp b/transportfragment.cpp similarity index 98% rename from networkfragment.cpp rename to transportfragment.cpp index 3eaed00..af82e92 100644 --- a/networkfragment.cpp +++ b/transportfragment.cpp @@ -1,7 +1,7 @@ #include #include -#include "networktransport.hpp" +#include "transportfragment.hpp" #include "transportinstruction.pb.h" using namespace Network; diff --git a/transportfragment.hpp b/transportfragment.hpp new file mode 100644 index 0000000..44a5fad --- /dev/null +++ b/transportfragment.hpp @@ -0,0 +1,58 @@ +#ifndef TRANSPORT_FRAGMENT_HPP +#define TRANSPORT_FRAGMENT_HPP + +#include +#include +#include + +#include "transportinstruction.pb.h" + +using namespace std; +using namespace TransportBuffers; + +namespace Network { + class Fragment + { + private: + static const size_t frag_header_len = 2 * sizeof( uint16_t ); + + public: + uint16_t id; + uint16_t fragment_num; + bool final; + + bool initialized; + + string contents; + + Fragment() + : id( -1 ), fragment_num( -1 ), final( false ), initialized( false ), contents() + {} + + Fragment( uint16_t s_id, uint16_t s_fragment_num, bool s_final, string s_contents ) + : id( s_id ), fragment_num( s_fragment_num ), final( s_final ), initialized( true ), + contents( s_contents ) + {} + + Fragment( string &x ); + + string tostring( void ); + + bool operator==( const Fragment &x ); + }; + + class FragmentAssembly + { + private: + vector fragments; + uint16_t current_id; + int fragments_arrived, fragments_total; + + public: + FragmentAssembly() : fragments(), current_id( -1 ), fragments_arrived( 0 ), fragments_total( -1 ) {} + bool add_fragment( Fragment &inst ); + Instruction get_assembly( void ); + }; +} + +#endif diff --git a/transportsender.cpp b/transportsender.cpp new file mode 100644 index 0000000..22b999f --- /dev/null +++ b/transportsender.cpp @@ -0,0 +1,274 @@ +#include "transportsender.hpp" +#include "transportfragment.hpp" + +using namespace Network; + +template +TransportSender::TransportSender( Connection *s_connection, MyState &initial_state ) + : connection( s_connection ), + current_state( initial_state ), + sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ), + assumed_receiver_state( sent_states.begin() ), + next_instruction_id( -1 ), + last_instruction_sent(), + next_ack_time( timestamp() ), + next_send_time( timestamp() ), + verbose( false ), + ack_num( 0 ) +{ +} + +/* Try to send roughly two frames per RTT, bounded by limits on frame rate */ +template +unsigned int TransportSender::send_interval( void ) +{ + int SEND_INTERVAL = lrint( ceil( (connection->get_SRTT() - ACK_DELAY) / 2.0 ) ); + if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) { + SEND_INTERVAL = SEND_INTERVAL_MIN; + } else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) { + SEND_INTERVAL = SEND_INTERVAL_MAX; + } + + return SEND_INTERVAL; +} + +/* How many ms can the caller wait before we will have an event (empty ack or next frame)? */ +template +int TransportSender::wait_time( void ) +{ + if ( connection->pending_timestamp() && ( next_ack_time > timestamp() + ACK_DELAY ) ) { + next_ack_time = timestamp() + ACK_DELAY; + } + + uint64_t next_wakeup = next_ack_time; + + if ( !(current_state == sent_states.back().state) ) { /* pending data to send */ + if ( next_send_time > timestamp() + SEND_MINDELAY ) { + next_send_time = timestamp() + SEND_MINDELAY; + } + + if ( next_send_time < sent_states.back().timestamp + send_interval() ) { + next_send_time = sent_states.back().timestamp + send_interval(); + } + + if ( next_send_time < next_wakeup ) { + next_wakeup = next_send_time; + } + } + + if ( !connection->get_attached() ) { + return -1; + } + + if ( next_wakeup > timestamp() ) { + return next_wakeup - timestamp(); + } else { + return 0; + } +} + +/* Send data or an empty ack if necessary */ +template +void TransportSender::tick( void ) +{ + wait_time(); + + if ( !connection->get_attached() ) { + return; + } + + if ( (timestamp() < next_ack_time) + && (timestamp() < next_send_time) ) { + return; + } + + /* Determine if a new diff or empty ack needs to be sent */ + /* Update assumed receiver state */ + update_assumed_receiver_state(); + + /* Cut out common prefix of all states */ + rationalize_states(); + + string diff = current_state.diff_from( assumed_receiver_state->state ); + + if ( diff.empty() && (timestamp() >= next_ack_time) ) { + send_empty_ack(); + return; + } + + if ( !diff.empty() && ( (timestamp() >= next_send_time) + || (timestamp() >= next_ack_time) ) ) { + /* Send diffs or ack */ + send_to_receiver( diff ); + } +} + +template +void TransportSender::send_empty_ack( void ) +{ + assert ( timestamp() >= next_ack_time ); + + uint64_t new_num = sent_states.back().num + 1; + + send_in_fragments( "", new_num, false ); + sent_states.push_back( TimestampedState( sent_states.back().timestamp, new_num, current_state ) ); + next_ack_time = timestamp() + ACK_INTERVAL; +} + +template +void TransportSender::send_to_receiver( string diff ) +{ + uint64_t new_num; + if ( current_state == sent_states.back().state ) { /* previously sent */ + new_num = sent_states.back().num; + } else { /* new state */ + new_num = sent_states.back().num + 1; + } + + bool done = false; + int MTU_tries = 0; + while ( !done ) { + MTU_tries++; + + if ( MTU_tries > 20 ) { + fprintf( stderr, "Error, could not send fragments after 20 tries (MTU = %d).\n", + connection->get_MTU() ); + } + + try { + send_in_fragments( diff, new_num ); + done = true; + } catch ( MTUException m ) { + fprintf( stderr, "Caught Path MTU exception, MTU now = %d\n", connection->get_MTU() ); + done = false; + } + + if ( new_num == sent_states.back().num ) { + sent_states.back().timestamp = timestamp(); + } else { + sent_states.push_back( TimestampedState( timestamp(), new_num, current_state ) ); + } + + new_num++; + } + + /* successfully sent, probably */ + /* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */ + assumed_receiver_state = sent_states.end(); + assumed_receiver_state--; + next_ack_time = timestamp() + ACK_INTERVAL; + next_send_time = -1; +} + +template +void TransportSender::update_assumed_receiver_state( void ) +{ + uint64_t now = timestamp(); + + /* start from what is known and give benefit of the doubt to unacknowledged states + transmitted recently enough ago */ + assumed_receiver_state = sent_states.begin(); + + typename list< TimestampedState >::iterator i = sent_states.begin(); + i++; + + while ( i != sent_states.end() ) { + assert( now >= i->timestamp ); + + if ( int(now - i->timestamp) < connection->timeout() + ACK_DELAY ) { + assumed_receiver_state = i; + } else { + return; + } + + i++; + } +} + +template +void TransportSender::rationalize_states( void ) +{ + const MyState * known_receiver_state = &sent_states.front().state; + + current_state.subtract( known_receiver_state ); + + for ( typename list< TimestampedState >::reverse_iterator i = sent_states.rbegin(); + i != sent_states.rend(); + i++ ) { + i->state.subtract( known_receiver_state ); + } +} + +template +void TransportSender::send_in_fragments( string diff, uint64_t new_num, bool send_timestamp ) +{ + Instruction inst; + inst.set_old_num( assumed_receiver_state->num ); + inst.set_new_num( new_num ); + inst.set_ack_num( ack_num ); + inst.set_throwaway_num( sent_states.front().num ); + inst.set_diff( diff ); + + if ( (inst.old_num() != last_instruction_sent.old_num()) + || (inst.new_num() != last_instruction_sent.new_num()) + || (inst.ack_num() != last_instruction_sent.ack_num()) + || (inst.throwaway_num() != last_instruction_sent.throwaway_num()) ) { + next_instruction_id++; /* make sure this happens before the first send in case of exception */ + last_instruction_sent = inst; + } else { + assert( inst.diff() == last_instruction_sent.diff() ); + } + + string payload = inst.SerializeAsString(); + + uint16_t fragment_num = 0; + + do { + string this_fragment; + + assert( fragment_num <= 32767 ); + + bool final = false; + + int MTU = connection->get_MTU(); + + if ( int( payload.size() + HEADER_LEN ) > MTU ) { + this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN ); + payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() ); + } else { + this_fragment = payload; + payload.clear(); + final = true; + } + + Fragment frag( next_instruction_id, fragment_num++, final, this_fragment ); + string s = frag.tostring(); + + connection->send( s, send_timestamp ); + + if ( verbose ) { + fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n", + (int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)frag.id, (int)frag.fragment_num, + (int)inst.ack_num(), (int)inst.throwaway_num(), (int)frag.contents.size(), + 1000.0 / (double)send_interval(), + (int)connection->timeout() ); + } + } while ( !payload.empty() ); +} + +template +void TransportSender::process_acknowledgment_through( uint64_t ack_num ) +{ + typename list< TimestampedState >::iterator i = sent_states.begin(); + while ( i != sent_states.end() ) { + typename list< TimestampedState >::iterator inext = i; + inext++; + if ( i->num < ack_num ) { + sent_states.erase( i ); + } + i = inext; + } + + assert( sent_states.size() > 0 ); +} + diff --git a/transportsender.hpp b/transportsender.hpp new file mode 100644 index 0000000..a36eddc --- /dev/null +++ b/transportsender.hpp @@ -0,0 +1,85 @@ +#ifndef TRANSPORT_SENDER_HPP +#define TRANSPORT_SENDER_HPP + +#include +#include + +#include "network.hpp" +#include "transportinstruction.pb.h" +#include "transportstate.hpp" + +using namespace std; +using namespace TransportBuffers; + +namespace Network { + template + class TransportSender + { + private: + /* timing parameters */ + static const int SEND_INTERVAL_MIN = 20; /* ms between frames */ + static const int SEND_INTERVAL_MAX = 250; /* ms between frames */ + static const int ACK_INTERVAL = 1000; /* ms between empty acks */ + static const int ACK_DELAY = 10; /* ms before delayed ack */ + static const int SEND_MINDELAY = 20; /* ms to collect all input */ + static const int HEADER_LEN = 120; + + /* helper methods for tick() */ + unsigned int send_interval( void ); + void update_assumed_receiver_state( void ); + void rationalize_states( void ); + void send_to_receiver( string diff ); + void send_empty_ack( void ); + void send_in_fragments( string diff, uint64_t new_num, bool send_timestamp = true ); + + /* state of sender */ + Connection *connection; + + MyState current_state; + + list< TimestampedState > sent_states; + /* first element: known, acknowledged receiver state */ + /* last element: last sent state */ + + /* somewhere in the middle: the assumed state of the receiver */ + typename list< TimestampedState >::iterator assumed_receiver_state; + + /* for fragment creation */ + uint16_t next_instruction_id; + Instruction last_instruction_sent; + + /* timing state */ + uint64_t next_ack_time; + uint64_t next_send_time; + + bool verbose; + + /* information about receiver state */ + uint64_t ack_num; + + public: + /* constructor */ + TransportSender( Connection *s_connection, MyState &initial_state ); + + /* Send data or an ack if necessary */ + void tick( void ); + + /* Returns the number of ms to wait until next possible event. */ + int wait_time( void ); + + /* executed upon receipt of ack */ + void process_acknowledgment_through( uint64_t ack_num ); + + /* getters and setters */ + MyState &get_current_state( void ) { return current_state; } + void set_current_state( const MyState &x ) { current_state = x; } + void set_verbose( void ) { verbose = true; } + void set_ack_num( uint64_t s_ack_num ) { ack_num = s_ack_num; } + + /* nonexistent methods to satisfy -Weffc++ */ + TransportSender( const TransportSender &x ); + TransportSender & operator=( const TransportSender &x ); + }; +} + +#endif diff --git a/transportstate.hpp b/transportstate.hpp new file mode 100644 index 0000000..b9e4c5b --- /dev/null +++ b/transportstate.hpp @@ -0,0 +1,19 @@ +#ifndef TRANSPORT_STATE_HPP +#define TRANSPORT_STATE_HPP + +namespace Network { + template + class TimestampedState + { + public: + uint64_t timestamp; + uint64_t num; + State state; + + TimestampedState( uint64_t s_timestamp, uint64_t s_num, State &s_state ) + : timestamp( s_timestamp ), num( s_num ), state( s_state ) + {} + }; +} + +#endif