#include #include #include "transportsender.hpp" #include "transportfragment.hpp" using namespace Network; template TransportSender::TransportSender( Connection *s_connection, MyState &initial_state ) : connection( s_connection ), current_state( initial_state ), sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ), assumed_receiver_state( sent_states.begin() ), fragmenter(), next_ack_time( timestamp() ), next_send_time( timestamp() ), verbose( false ), shutdown_in_progress( false ), shutdown_tries( 0 ), ack_num( 0 ), pending_data_ack( false ), SEND_MINDELAY( 15 ) { } /* Try to send roughly two frames per RTT, bounded by limits on frame rate */ template unsigned int TransportSender::send_interval( void ) { int SEND_INTERVAL = lrint( ceil( connection->get_SRTT() / 2.0 ) ); if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) { SEND_INTERVAL = SEND_INTERVAL_MIN; } else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) { SEND_INTERVAL = SEND_INTERVAL_MAX; } return SEND_INTERVAL; } /* How many ms can the caller wait before we will have an event (empty ack or next frame)? */ template int TransportSender::wait_time( void ) { if ( pending_data_ack && (next_ack_time > timestamp() + ACK_DELAY) ) { next_ack_time = timestamp() + ACK_DELAY; } if ( !(current_state == sent_states.back().state) ) { /* pending data to send */ if ( next_send_time > timestamp() + SEND_MINDELAY ) { next_send_time = timestamp() + SEND_MINDELAY; } if ( next_send_time < sent_states.back().timestamp + send_interval() ) { next_send_time = sent_states.back().timestamp + send_interval(); } } /* speed up shutdown sequence */ if ( shutdown_in_progress || (ack_num == uint64_t(-1)) ) { next_ack_time = sent_states.back().timestamp + send_interval(); } uint64_t next_wakeup = next_ack_time; if ( next_send_time < next_wakeup ) { next_wakeup = next_send_time; } if ( !connection->get_attached() ) { return -1; } if ( next_wakeup > timestamp() ) { return next_wakeup - timestamp(); } else { return 0; } } /* Send data or an empty ack if necessary */ template void TransportSender::tick( void ) { wait_time(); if ( !connection->get_attached() ) { return; } if ( (timestamp() < next_ack_time) && (timestamp() < next_send_time) ) { return; } /* Determine if a new diff or empty ack needs to be sent */ /* Update assumed receiver state */ update_assumed_receiver_state(); /* Cut out common prefix of all states */ rationalize_states(); string diff = current_state.diff_from( assumed_receiver_state->state ); if ( diff.empty() && (timestamp() >= next_ack_time) ) { send_empty_ack(); return; } if ( !diff.empty() && ( (timestamp() >= next_send_time) || (timestamp() >= next_ack_time) ) ) { /* Send diffs or ack */ send_to_receiver( diff ); } } template void TransportSender::send_empty_ack( void ) { assert ( timestamp() >= next_ack_time ); uint64_t new_num = sent_states.back().num + 1; /* special case for shutdown sequence */ if ( shutdown_in_progress ) { new_num = uint64_t( -1 ); } // sent_states.push_back( TimestampedState( sent_states.back().timestamp, new_num, current_state ) ); add_sent_state( sent_states.back().timestamp, new_num, current_state ); send_in_fragments( "", new_num ); next_ack_time = timestamp() + ACK_INTERVAL; } template void TransportSender::add_sent_state( uint64_t the_timestamp, uint64_t num, MyState &state ) { sent_states.push_back( TimestampedState( the_timestamp, num, state ) ); if ( sent_states.size() > 32 ) { /* limit on state queue */ auto last = sent_states.end(); for ( int i = 0; i < 16; i++ ) { last--; } sent_states.erase( last ); /* erase state from middle of queue */ } } template void TransportSender::send_to_receiver( string diff ) { uint64_t new_num; if ( current_state == sent_states.back().state ) { /* previously sent */ new_num = sent_states.back().num; } else { /* new state */ new_num = sent_states.back().num + 1; } /* special case for shutdown sequence */ if ( shutdown_in_progress ) { new_num = uint64_t( -1 ); } int MTU_tries = 0; while ( 1 ) { MTU_tries++; if ( MTU_tries > 20 ) { fprintf( stderr, "Error, could not send fragments after 20 tries (MTU = %d).\n", connection->get_MTU() ); return; } if ( new_num == sent_states.back().num ) { sent_states.back().timestamp = timestamp(); } else { // sent_states.push_back( TimestampedState( timestamp(), new_num, current_state ) ); add_sent_state( timestamp(), new_num, current_state ); } try { send_in_fragments( diff, new_num ); // Can throw NetworkException break; } catch ( MTUException m ) { fprintf( stderr, "Caught Path MTU exception, MTU now = %d\n", connection->get_MTU() ); new_num++; } } /* successfully sent, probably */ /* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */ assumed_receiver_state = sent_states.end(); assumed_receiver_state--; next_ack_time = timestamp() + ACK_INTERVAL; next_send_time = uint64_t(-1); } template void TransportSender::update_assumed_receiver_state( void ) { uint64_t now = timestamp(); /* start from what is known and give benefit of the doubt to unacknowledged states transmitted recently enough ago */ assumed_receiver_state = sent_states.begin(); typename list< TimestampedState >::iterator i = sent_states.begin(); i++; while ( i != sent_states.end() ) { assert( now >= i->timestamp ); if ( uint64_t(now - i->timestamp) < connection->timeout() + ACK_DELAY ) { assumed_receiver_state = i; } else { return; } i++; } } template void TransportSender::rationalize_states( void ) { const MyState * known_receiver_state = &sent_states.front().state; current_state.subtract( known_receiver_state ); for ( typename list< TimestampedState >::reverse_iterator i = sent_states.rbegin(); i != sent_states.rend(); i++ ) { i->state.subtract( known_receiver_state ); } } template void TransportSender::send_in_fragments( string diff, uint64_t new_num ) { Instruction inst; inst.set_old_num( assumed_receiver_state->num ); inst.set_new_num( new_num ); inst.set_ack_num( ack_num ); inst.set_throwaway_num( sent_states.front().num ); inst.set_diff( diff ); vector fragments = fragmenter.make_fragments( inst, connection->get_MTU() ); for ( auto i = fragments.begin(); i != fragments.end(); i++ ) { connection->send( i->tostring() ); if ( verbose ) { fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n", (unsigned int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num, (int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(), 1000.0 / (double)send_interval(), (int)connection->timeout() ); } } if ( new_num == uint64_t(-1) ) { shutdown_tries++; } pending_data_ack = false; } template void TransportSender::process_acknowledgment_through( uint64_t ack_num ) { /* Ignore ack if we have culled the state it's acknowledging */ if ( sent_states.end() != find_if( sent_states.begin(), sent_states.end(), [ack_num]( TimestampedState x ) { return x.num == ack_num; } ) ) { sent_states.remove_if( [ack_num]( TimestampedState x ) { return x.num < ack_num; } ); } assert( !sent_states.empty() ); } /* give up on getting acknowledgement for shutdown */ template bool TransportSender::shutdown_ack_timed_out( void ) { return shutdown_tries >= SHUTDOWN_RETRIES; }