diff --git a/Makefile b/Makefile index e6f5f06..24ce190 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ -proto = userinput.proto -source = parse.cpp parserstate.cpp parser.cpp templates.cpp terminal.cpp termemu.cpp parseraction.cpp terminalfunctions.cpp swrite.cpp terminalframebuffer.cpp terminaldispatcher.cpp terminaluserinput.cpp terminaldisplay.cpp network.cpp ntester.cpp ocb.cpp base64.cpp encrypt.cpp decrypt.cpp crypto.cpp networktransport.cpp networkinstruction.cpp user.cpp userinput.pb.cc completeterminal.cpp stm-server.cpp stm.cpp -objects = parserstate.o parser.o templates.o terminal.o parseraction.o terminalfunctions.o swrite.o terminalframebuffer.o terminaldispatcher.o terminaluserinput.o terminaldisplay.o network.o ocb.o base64.o crypto.o networktransport.o networkinstruction.o user.o userinput.pb.o completeterminal.o +proto = userinput.proto transportinstruction.proto +source = parse.cpp parserstate.cpp parser.cpp templates.cpp terminal.cpp termemu.cpp parseraction.cpp terminalfunctions.cpp swrite.cpp terminalframebuffer.cpp terminaldispatcher.cpp terminaluserinput.cpp terminaldisplay.cpp network.cpp ntester.cpp ocb.cpp base64.cpp encrypt.cpp decrypt.cpp crypto.cpp networktransport.cpp networkfragment.cpp user.cpp userinput.pb.cc completeterminal.cpp stm-server.cpp stm.cpp transportinstruction.pb.cc +objects = parserstate.o parser.o templates.o terminal.o parseraction.o terminalfunctions.o swrite.o terminalframebuffer.o terminaldispatcher.o terminaluserinput.o terminaldisplay.o network.o ocb.o base64.o crypto.o networktransport.o networkfragment.o user.o userinput.pb.o completeterminal.o transportinstruction.pb.o repos = templates.rpo executables = parse termemu ntester encrypt decrypt stm-server stm diff --git a/networkfragment.cpp b/networkfragment.cpp new file mode 100644 index 0000000..3eaed00 --- /dev/null +++ b/networkfragment.cpp @@ -0,0 +1,112 @@ +#include +#include + +#include "networktransport.hpp" +#include "transportinstruction.pb.h" + +using namespace Network; +using namespace TransportBuffers; + +static string network_order_string( uint16_t host_order ) +{ + uint16_t net_int = htobe16( host_order ); + return string( (char *)&net_int, sizeof( net_int ) ); +} + +string Fragment::tostring( void ) +{ + assert( initialized ); + + string ret; + + ret += network_order_string( id ); + + assert( !( fragment_num & 0x8000 ) ); /* effective limit on size of a terminal screen change or buffered user input */ + uint16_t combined_fragment_num = ( final << 15 ) | fragment_num; + ret += network_order_string( combined_fragment_num ); + + assert( ret.size() == frag_header_len ); + + ret += contents; + + return ret; +} + +Fragment::Fragment( string &x ) + : id( -1 ), fragment_num( -1 ), final( false ), initialized( true ), + contents( x.begin() + frag_header_len, x.end() ) +{ + assert( x.size() >= frag_header_len ); + + uint16_t *data16 = (uint16_t *)x.data(); + id = be16toh( data16[ 0 ] ); + fragment_num = be16toh( data16[ 1 ] ); + final = ( fragment_num & 0x8000 ) >> 15; + fragment_num &= 0x7FFF; +} + +bool FragmentAssembly::add_fragment( Fragment &frag ) +{ + /* see if this is a totally new packet */ + if ( current_id != frag.id ) { + fragments.clear(); + fragments.resize( frag.fragment_num + 1 ); + fragments.at( frag.fragment_num ) = frag; + fragments_arrived = 1; + fragments_total = -1; /* unknown */ + current_id = frag.id; + } else { /* not a new packet */ + /* see if we already have this fragment */ + if ( (fragments.size() > frag.fragment_num) + && (fragments.at( frag.fragment_num ).initialized) ) { + /* make sure new version is same as what we already have */ + assert( fragments.at( frag.fragment_num ) == frag ); + } else { + if ( (int)fragments.size() < frag.fragment_num + 1 ) { + fragments.resize( frag.fragment_num + 1 ); + } + fragments.at( frag.fragment_num ) = frag; + fragments_arrived++; + } + } + + if ( frag.final ) { + fragments_total = frag.fragment_num + 1; + assert( (int)fragments.size() <= fragments_total ); + fragments.resize( fragments_total ); + } + + if ( fragments_total != -1 ) { + assert( fragments_arrived <= fragments_total ); + } + + /* see if we're done */ + return ( fragments_arrived == fragments_total ); +} + +Instruction FragmentAssembly::get_assembly( void ) +{ + assert( fragments_arrived == fragments_total ); + + string encoded; + + for ( int i = 0; i < fragments_total; i++ ) { + assert( fragments.at( i ).initialized ); + encoded += fragments.at( i ).contents; + } + + Instruction ret; + assert( ret.ParseFromString( encoded ) ); + + fragments.clear(); + fragments_arrived = 0; + fragments_total = -1; + + return ret; +} + +bool Fragment::operator==( const Fragment &x ) +{ + return ( id == x.id ) && ( fragment_num == x.fragment_num ) && ( final == x.final ) + && ( initialized == x.initialized ) && ( contents == x.contents ); +} diff --git a/networkinstruction.cpp b/networkinstruction.cpp deleted file mode 100644 index 454169f..0000000 --- a/networkinstruction.cpp +++ /dev/null @@ -1,125 +0,0 @@ -#include -#include - -#include "networktransport.hpp" - -using namespace Network; - -static string network_order_string( uint64_t host_order ) -{ - uint64_t net_int = htobe64( host_order ); - return string( (char *)&net_int, sizeof( net_int ) ); -} - -static string network_order_string( uint16_t host_order ) -{ - uint16_t net_int = htobe16( host_order ); - return string( (char *)&net_int, sizeof( net_int ) ); -} - -string Instruction::tostring( void ) -{ - string ret; - - ret += network_order_string( old_num ); - ret += network_order_string( new_num ); - ret += network_order_string( ack_num ); - ret += network_order_string( throwaway_num ); - - assert( !( fragment_num & 0x8000 ) ); - - uint16_t combined_fragment_num = ( final << 15 ) | fragment_num; - - ret += network_order_string( combined_fragment_num ); - - assert( ret.size() == inst_header_len ); - - ret += diff; - - return ret; -} - -Instruction::Instruction( string &x ) - : old_num( -1 ), new_num( -1 ), ack_num( -1 ), throwaway_num( -1 ), fragment_num( -1 ), final( false ), diff() -{ - assert( x.size() >= inst_header_len ); - uint64_t *data = (uint64_t *)x.data(); - uint16_t *data16 = (uint16_t *)x.data(); - old_num = be64toh( data[ 0 ] ); - new_num = be64toh( data[ 1 ] ); - ack_num = be64toh( data[ 2 ] ); - throwaway_num = be64toh( data[ 3 ] ); - fragment_num = be16toh( data16[ 16 ] ); - final = ( fragment_num & 0x8000 ) >> 15; - fragment_num &= 0x7FFF; - - diff = string( x.begin() + inst_header_len, x.end() ); -} - -bool FragmentAssembly::same_template( Instruction &a, Instruction &b ) -{ - return ( a.old_num == b.old_num ) && ( a.new_num == b.new_num ) && ( a.ack_num == b.ack_num ) - && ( a.throwaway_num == b.throwaway_num ); -} - -bool FragmentAssembly::add_fragment( Instruction &inst ) -{ - /* see if this is a totally new packet */ - if ( !same_template( inst, current_template ) ) { - fragments.clear(); - current_template = inst; - fragments.resize( inst.fragment_num + 1 ); - fragments.at( inst.fragment_num ) = inst; - fragments_arrived = 1; - fragments_total = -1; - } else { /* not a new packet */ - /* see if we already have this fragment */ - if ( (fragments.size() > inst.fragment_num) - && (fragments.at( inst.fragment_num ).old_num != uint64_t(-1)) ) { - assert( fragments.at( inst.fragment_num ) == inst ); - } else { - if ( (int)fragments.size() < inst.fragment_num + 1 ) { - fragments.resize( inst.fragment_num + 1 ); - } - fragments.at( inst.fragment_num ) = inst; - fragments_arrived++; - } - } - - if ( inst.final ) { - fragments_total = inst.fragment_num + 1; - fragments.resize( fragments_total ); - } - - if ( fragments_total != -1 ) { - assert( fragments_arrived <= fragments_total ); - } - - /* see if we're done */ - return ( fragments_arrived == fragments_total ); -} - -Instruction FragmentAssembly::get_assembly( void ) -{ - assert( fragments_arrived == fragments_total ); - - Instruction ret( current_template ); - ret.diff = ""; - - for ( int i = 0; i < fragments_total; i++ ) { - ret.diff += fragments.at( i ).diff; - } - - fragments.clear(); - fragments_arrived = 0; - fragments_total = -1; - - return ret; -} - -bool Instruction::operator==( const Instruction &x ) -{ - return ( old_num == x.old_num ) && ( new_num == x.new_num ) - && ( ack_num == x.ack_num ) && ( throwaway_num == x.throwaway_num ) - && ( fragment_num == x.fragment_num ) && ( diff == x.diff ); -} diff --git a/networktransport.cpp b/networktransport.cpp index a240494..f5e5c03 100644 --- a/networktransport.cpp +++ b/networktransport.cpp @@ -13,6 +13,8 @@ Transport::Transport( MyState &initial_state, RemoteState current_state( initial_state ), sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ), assumed_receiver_state( sent_states.begin() ), + next_instruction_id( -1 ), + last_instruction_sent(), received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), fragments(), @@ -31,6 +33,8 @@ Transport::Transport( MyState &initial_state, RemoteState current_state( initial_state ), sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ), assumed_receiver_state( sent_states.begin() ), + next_instruction_id( -1 ), + last_instruction_sent(), received_states( 1, TimestampedState( timestamp(), 0, initial_remote ) ), last_receiver_state( initial_remote ), fragments(), @@ -240,18 +244,18 @@ template void Transport::recv( void ) { string s( connection.recv() ); - Instruction frag( s ); + Fragment frag( s ); if ( fragments.add_fragment( frag ) ) { /* complete packet */ Instruction inst = fragments.get_assembly(); - process_acknowledgment_through( inst.ack_num ); + process_acknowledgment_through( inst.ack_num() ); /* first, make sure we don't already have the new state */ for ( typename list< TimestampedState >::iterator i = received_states.begin(); i != received_states.end(); i++ ) { - if ( inst.new_num == i->num ) { + if ( inst.new_num() == i->num ) { return; } } @@ -260,7 +264,7 @@ void Transport::recv( void ) bool found = 0; typename list< TimestampedState >::iterator reference_state = received_states.begin(); while ( reference_state != received_states.end() ) { - if ( inst.old_num == reference_state->num ) { + if ( inst.old_num() == reference_state->num ) { found = true; break; } @@ -275,10 +279,10 @@ void Transport::recv( void ) /* apply diff to reference state */ TimestampedState new_state = *reference_state; new_state.timestamp = timestamp(); - new_state.num = inst.new_num; - new_state.state.apply_string( inst.diff ); + new_state.num = inst.new_num(); + new_state.state.apply_string( inst.diff() ); - process_throwaway_until( inst.throwaway_num ); + process_throwaway_until( inst.throwaway_num() ); /* Insert new state in sorted place */ for ( typename list< TimestampedState >::iterator i = received_states.begin(); @@ -355,6 +359,25 @@ string Transport::get_remote_diff( void ) template void Transport::send_in_fragments( string diff, uint64_t new_num, bool send_timestamp ) { + Instruction inst; + inst.set_old_num( assumed_receiver_state->num ); + inst.set_new_num( new_num ); + inst.set_ack_num( received_states.back().num ); + inst.set_throwaway_num( sent_states.front().num ); + inst.set_diff( diff ); + + if ( (inst.old_num() != last_instruction_sent.old_num()) + || (inst.new_num() != last_instruction_sent.new_num()) + || (inst.ack_num() != last_instruction_sent.ack_num()) + || (inst.throwaway_num() != last_instruction_sent.throwaway_num()) ) { + next_instruction_id++; /* make sure this happens before the first send in case of exception */ + last_instruction_sent = inst; + } else { + assert( inst.diff() == last_instruction_sent.diff() ); + } + + string payload = inst.SerializeAsString(); + uint16_t fragment_num = 0; do { @@ -364,31 +387,28 @@ void Transport::send_in_fragments( string diff, uint64_t n bool final = false; - if ( int( diff.size() + HEADER_LEN ) > connection.get_MTU() ) { - this_fragment = string( diff.begin(), diff.begin() + connection.get_MTU() - HEADER_LEN ); - diff = string( diff.begin() + connection.get_MTU() - HEADER_LEN, diff.end() ); + int MTU = connection.get_MTU(); + + if ( int( payload.size() + HEADER_LEN ) > MTU ) { + this_fragment = string( payload.begin(), payload.begin() + MTU - HEADER_LEN ); + payload = string( payload.begin() + MTU - HEADER_LEN, payload.end() ); } else { - this_fragment = diff; - diff.clear(); + this_fragment = payload; + payload.clear(); final = true; } - Instruction inst( assumed_receiver_state->num, - new_num, - received_states.back().num, - sent_states.front().num, - fragment_num++, final, - this_fragment ); - string s = inst.tostring(); + Fragment frag( next_instruction_id, fragment_num++, final, this_fragment ); + string s = frag.tostring(); connection.send( s, send_timestamp ); if ( verbose ) { - fprintf( stderr, "[%d] Sent [%d=>%d] frag %d, ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n", - (int)(timestamp() % 100000), (int)inst.old_num, (int)inst.new_num, (int)inst.fragment_num, - (int)inst.ack_num, (int)inst.throwaway_num, (int)inst.diff.size(), + fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n", + (int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)frag.id, (int)frag.fragment_num, + (int)inst.ack_num(), (int)inst.throwaway_num(), (int)frag.contents.size(), 1000.0 / (double)send_interval(), (int)connection.timeout() ); } - } while ( !diff.empty() ); + } while ( !payload.empty() ); } diff --git a/networktransport.hpp b/networktransport.hpp index f91e778..b972b66 100644 --- a/networktransport.hpp +++ b/networktransport.hpp @@ -8,54 +8,52 @@ #include #include "network.hpp" +#include "transportinstruction.pb.h" using namespace std; +using namespace TransportBuffers; namespace Network { - class Instruction + class Fragment { private: - static const size_t inst_header_len = 4 * sizeof( uint64_t ) + 1 * sizeof( uint16_t ); + static const size_t frag_header_len = 2 * sizeof( uint16_t ); public: - uint64_t old_num, new_num; - uint64_t ack_num; - uint64_t throwaway_num; + uint16_t id; uint16_t fragment_num; bool final; - string diff; + bool initialized; - Instruction() : old_num( -1 ), new_num( -1 ), ack_num( -1 ), throwaway_num( -1 ), fragment_num( -1 ), final( false ), - diff( "" ) + string contents; + + Fragment() + : id( -1 ), fragment_num( -1 ), final( false ), initialized( false ), contents() {} - Instruction( uint64_t s_old_num, uint64_t s_new_num, - uint64_t s_ack_num, uint64_t s_throwaway_num, uint16_t s_fragment_num, bool s_final, - string s_diff ) - : old_num( s_old_num ), new_num( s_new_num ), - ack_num( s_ack_num ), throwaway_num( s_throwaway_num ), fragment_num( s_fragment_num ), final( s_final ), - diff( s_diff ) + Fragment( uint16_t s_id, uint16_t s_fragment_num, bool s_final, string s_contents ) + : id( s_id ), fragment_num( s_fragment_num ), final( s_final ), initialized( true ), + contents( s_contents ) {} - Instruction( string &x ); + Fragment( string &x ); string tostring( void ); - bool operator==( const Instruction &x ); + bool operator==( const Fragment &x ); }; class FragmentAssembly { private: - vector fragments; - Instruction current_template; + vector fragments; + uint16_t current_id; int fragments_arrived, fragments_total; public: - FragmentAssembly() : fragments(), current_template(), fragments_arrived( 0 ), fragments_total( -1 ) {} - static bool same_template( Instruction &a, Instruction &b ); - bool add_fragment( Instruction &inst ); + FragmentAssembly() : fragments(), current_id( -1 ), fragments_arrived( 0 ), fragments_total( -1 ) {} + bool add_fragment( Fragment &inst ); Instruction get_assembly( void ); }; @@ -108,6 +106,9 @@ namespace Network { typename list< TimestampedState >::iterator assumed_receiver_state; + uint16_t next_instruction_id; + Instruction last_instruction_sent; + /* simple receiver */ list< TimestampedState > received_states; RemoteState last_receiver_state; /* the state we were in when user last queried state */ diff --git a/templates.cpp b/templates.cpp index 84f35ee..0dd069f 100644 --- a/templates.cpp +++ b/templates.cpp @@ -28,7 +28,7 @@ template class vector; template class map; template class vector; -template class vector; +template class vector; template class Transport; template class Transport; template class Transport; diff --git a/transportinstruction.proto b/transportinstruction.proto new file mode 100644 index 0000000..9c53c0e --- /dev/null +++ b/transportinstruction.proto @@ -0,0 +1,12 @@ +option optimize_for = LITE_RUNTIME; + +package TransportBuffers; + +message Instruction { + optional uint64 old_num = 1; + optional uint64 new_num = 2; + optional uint64 ack_num = 3; + optional uint64 throwaway_num = 4; + + optional bytes diff = 5; +}