Better timestamping -- now send opportunistically
This commit is contained in:
+18
-8
@@ -49,9 +49,16 @@ string Packet::tostring( Session *session )
|
|||||||
|
|
||||||
Packet Connection::new_packet( string &s_payload )
|
Packet Connection::new_packet( string &s_payload )
|
||||||
{
|
{
|
||||||
Packet p( next_seq++, direction, timestamp16(), saved_timestamp, s_payload );
|
uint16_t outgoing_timestamp_reply = -1;
|
||||||
|
|
||||||
saved_timestamp = -1;
|
if ( timestamp() - saved_timestamp_received_at < 25 ) {
|
||||||
|
/* we have a recent received timestamp */
|
||||||
|
outgoing_timestamp_reply = saved_timestamp;
|
||||||
|
saved_timestamp = -1;
|
||||||
|
saved_timestamp_received_at = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
Packet p( next_seq++, direction, timestamp16(), outgoing_timestamp_reply, s_payload );
|
||||||
|
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
@@ -83,6 +90,7 @@ Connection::Connection() /* server */
|
|||||||
direction( TO_CLIENT ),
|
direction( TO_CLIENT ),
|
||||||
next_seq( 0 ),
|
next_seq( 0 ),
|
||||||
saved_timestamp( -1 ),
|
saved_timestamp( -1 ),
|
||||||
|
saved_timestamp_received_at( 0 ),
|
||||||
expected_receiver_seq( 0 ),
|
expected_receiver_seq( 0 ),
|
||||||
RTT_hit( false ),
|
RTT_hit( false ),
|
||||||
SRTT( 1000 ),
|
SRTT( 1000 ),
|
||||||
@@ -112,6 +120,7 @@ Connection::Connection( const char *key_str, const char *ip, int port ) /* clien
|
|||||||
direction( TO_SERVER ),
|
direction( TO_SERVER ),
|
||||||
next_seq( 0 ),
|
next_seq( 0 ),
|
||||||
saved_timestamp( -1 ),
|
saved_timestamp( -1 ),
|
||||||
|
saved_timestamp_received_at( 0 ),
|
||||||
expected_receiver_seq( 0 ),
|
expected_receiver_seq( 0 ),
|
||||||
RTT_hit( false ),
|
RTT_hit( false ),
|
||||||
SRTT( 1000 ),
|
SRTT( 1000 ),
|
||||||
@@ -169,16 +178,12 @@ void Connection::update_MTU( void )
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::send( string s, bool send_timestamp )
|
void Connection::send( string s )
|
||||||
{
|
{
|
||||||
assert( attached );
|
assert( attached );
|
||||||
|
|
||||||
Packet px = new_packet( s );
|
Packet px = new_packet( s );
|
||||||
|
|
||||||
if ( !send_timestamp ) {
|
|
||||||
px.timestamp = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
string p = px.tostring( &session );
|
string p = px.tostring( &session );
|
||||||
|
|
||||||
ssize_t bytes_sent = sendto( sock, p.data(), p.size(), 0,
|
ssize_t bytes_sent = sendto( sock, p.data(), p.size(), 0,
|
||||||
@@ -225,6 +230,7 @@ string Connection::recv( void )
|
|||||||
|
|
||||||
if ( p.timestamp != uint16_t(-1) ) {
|
if ( p.timestamp != uint16_t(-1) ) {
|
||||||
saved_timestamp = p.timestamp;
|
saved_timestamp = p.timestamp;
|
||||||
|
saved_timestamp_received_at = timestamp();
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( p.timestamp_reply != uint16_t(-1) ) {
|
if ( p.timestamp_reply != uint16_t(-1) ) {
|
||||||
@@ -291,7 +297,11 @@ uint64_t Network::timestamp( void )
|
|||||||
|
|
||||||
uint16_t Network::timestamp16( void )
|
uint16_t Network::timestamp16( void )
|
||||||
{
|
{
|
||||||
return timestamp() % 65536;
|
uint16_t ts = timestamp() % 65536;
|
||||||
|
if ( ts == uint16_t(-1) ) {
|
||||||
|
ts++;
|
||||||
|
}
|
||||||
|
return ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint16_t Network::timestamp_diff( uint16_t tsnew, uint16_t tsold )
|
uint16_t Network::timestamp_diff( uint16_t tsnew, uint16_t tsold )
|
||||||
|
|||||||
+3
-2
@@ -78,6 +78,7 @@ namespace Network {
|
|||||||
Direction direction;
|
Direction direction;
|
||||||
uint64_t next_seq;
|
uint64_t next_seq;
|
||||||
uint16_t saved_timestamp;
|
uint16_t saved_timestamp;
|
||||||
|
uint64_t saved_timestamp_received_at;
|
||||||
uint64_t expected_receiver_seq;
|
uint64_t expected_receiver_seq;
|
||||||
|
|
||||||
bool RTT_hit;
|
bool RTT_hit;
|
||||||
@@ -90,7 +91,7 @@ namespace Network {
|
|||||||
Connection();
|
Connection();
|
||||||
Connection( const char *key_str, const char *ip, int port );
|
Connection( const char *key_str, const char *ip, int port );
|
||||||
|
|
||||||
void send( string s, bool send_timestamp = true );
|
void send( string s );
|
||||||
string recv( void );
|
string recv( void );
|
||||||
int fd( void ) { return sock; }
|
int fd( void ) { return sock; }
|
||||||
int get_MTU( void ) { return MTU; }
|
int get_MTU( void ) { return MTU; }
|
||||||
@@ -101,7 +102,7 @@ namespace Network {
|
|||||||
|
|
||||||
uint64_t timeout( void );
|
uint64_t timeout( void );
|
||||||
double get_SRTT( void ) { return SRTT; }
|
double get_SRTT( void ) { return SRTT; }
|
||||||
bool pending_timestamp( void ) { return ( saved_timestamp != uint16_t(-1) ); }
|
// bool pending_timestamp( void ) { return ( saved_timestamp != uint16_t(-1) ); }
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+14
-2
@@ -71,7 +71,10 @@ void Transport<MyState, RemoteState>::recv( void )
|
|||||||
TimestampedState<RemoteState> new_state = *reference_state;
|
TimestampedState<RemoteState> new_state = *reference_state;
|
||||||
new_state.timestamp = timestamp();
|
new_state.timestamp = timestamp();
|
||||||
new_state.num = inst.new_num();
|
new_state.num = inst.new_num();
|
||||||
new_state.state.apply_string( inst.diff() );
|
|
||||||
|
if ( !inst.diff().empty() ) {
|
||||||
|
new_state.state.apply_string( inst.diff() );
|
||||||
|
}
|
||||||
|
|
||||||
process_throwaway_until( inst.throwaway_num() );
|
process_throwaway_until( inst.throwaway_num() );
|
||||||
|
|
||||||
@@ -81,14 +84,23 @@ void Transport<MyState, RemoteState>::recv( void )
|
|||||||
i++ ) {
|
i++ ) {
|
||||||
if ( i->num > new_state.num ) {
|
if ( i->num > new_state.num ) {
|
||||||
received_states.insert( i, new_state );
|
received_states.insert( i, new_state );
|
||||||
|
if ( verbose ) {
|
||||||
|
fprintf( stderr, "[%d] Received OUT-OF-ORDER state %d [ack %d]\n",
|
||||||
|
(int)timestamp() % 100000, (int)new_state.num, (int)inst.ack_num() );
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ( verbose )
|
if ( verbose ) {
|
||||||
fprintf( stderr, "[%d] Received state %d [ack %d]\n",
|
fprintf( stderr, "[%d] Received state %d [ack %d]\n",
|
||||||
(int)timestamp() % 100000, (int)new_state.num, (int)inst.ack_num() );
|
(int)timestamp() % 100000, (int)new_state.num, (int)inst.ack_num() );
|
||||||
|
}
|
||||||
received_states.push_back( new_state );
|
received_states.push_back( new_state );
|
||||||
sender.set_ack_num( received_states.back().num );
|
sender.set_ack_num( received_states.back().num );
|
||||||
|
|
||||||
|
if ( !inst.diff().empty() ) {
|
||||||
|
sender.set_data_ack();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+8
-5
@@ -14,7 +14,8 @@ TransportSender<MyState>::TransportSender( Connection *s_connection, MyState &in
|
|||||||
next_send_time( timestamp() ),
|
next_send_time( timestamp() ),
|
||||||
verbose( false ),
|
verbose( false ),
|
||||||
shutdown_in_progress( false ),
|
shutdown_in_progress( false ),
|
||||||
ack_num( 0 )
|
ack_num( 0 ),
|
||||||
|
pending_data_ack( false )
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,7 +37,7 @@ unsigned int TransportSender<MyState>::send_interval( void )
|
|||||||
template <class MyState>
|
template <class MyState>
|
||||||
int TransportSender<MyState>::wait_time( void )
|
int TransportSender<MyState>::wait_time( void )
|
||||||
{
|
{
|
||||||
if ( connection->pending_timestamp() && ( next_ack_time > timestamp() + ACK_DELAY ) ) {
|
if ( pending_data_ack && (next_ack_time > timestamp() + ACK_DELAY) ) {
|
||||||
next_ack_time = timestamp() + ACK_DELAY;
|
next_ack_time = timestamp() + ACK_DELAY;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,7 +121,7 @@ void TransportSender<MyState>::send_empty_ack( void )
|
|||||||
new_num = uint64_t( -1 );
|
new_num = uint64_t( -1 );
|
||||||
}
|
}
|
||||||
|
|
||||||
send_in_fragments( "", new_num, false );
|
send_in_fragments( "", new_num );
|
||||||
sent_states.push_back( TimestampedState<MyState>( sent_states.back().timestamp, new_num, current_state ) );
|
sent_states.push_back( TimestampedState<MyState>( sent_states.back().timestamp, new_num, current_state ) );
|
||||||
next_ack_time = timestamp() + ACK_INTERVAL;
|
next_ack_time = timestamp() + ACK_INTERVAL;
|
||||||
}
|
}
|
||||||
@@ -215,7 +216,7 @@ void TransportSender<MyState>::rationalize_states( void )
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class MyState>
|
template <class MyState>
|
||||||
void TransportSender<MyState>::send_in_fragments( string diff, uint64_t new_num, bool send_timestamp )
|
void TransportSender<MyState>::send_in_fragments( string diff, uint64_t new_num )
|
||||||
{
|
{
|
||||||
Instruction inst;
|
Instruction inst;
|
||||||
inst.set_old_num( assumed_receiver_state->num );
|
inst.set_old_num( assumed_receiver_state->num );
|
||||||
@@ -227,7 +228,7 @@ void TransportSender<MyState>::send_in_fragments( string diff, uint64_t new_num,
|
|||||||
vector<Fragment> fragments = fragmenter.make_fragments( inst, connection->get_MTU() );
|
vector<Fragment> fragments = fragmenter.make_fragments( inst, connection->get_MTU() );
|
||||||
|
|
||||||
for ( auto i = fragments.begin(); i != fragments.end(); i++ ) {
|
for ( auto i = fragments.begin(); i != fragments.end(); i++ ) {
|
||||||
connection->send( i->tostring(), send_timestamp );
|
connection->send( i->tostring() );
|
||||||
|
|
||||||
if ( verbose ) {
|
if ( verbose ) {
|
||||||
fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n",
|
fprintf( stderr, "[%d] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d\n",
|
||||||
@@ -238,6 +239,8 @@ void TransportSender<MyState>::send_in_fragments( string diff, uint64_t new_num,
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pending_data_ack = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class MyState>
|
template <class MyState>
|
||||||
|
|||||||
+5
-1
@@ -31,7 +31,7 @@ namespace Network {
|
|||||||
void rationalize_states( void );
|
void rationalize_states( void );
|
||||||
void send_to_receiver( string diff );
|
void send_to_receiver( string diff );
|
||||||
void send_empty_ack( void );
|
void send_empty_ack( void );
|
||||||
void send_in_fragments( string diff, uint64_t new_num, bool send_timestamp = true );
|
void send_in_fragments( string diff, uint64_t new_num );
|
||||||
|
|
||||||
/* state of sender */
|
/* state of sender */
|
||||||
Connection *connection;
|
Connection *connection;
|
||||||
@@ -57,6 +57,7 @@ namespace Network {
|
|||||||
|
|
||||||
/* information about receiver state */
|
/* information about receiver state */
|
||||||
uint64_t ack_num;
|
uint64_t ack_num;
|
||||||
|
bool pending_data_ack;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/* constructor */
|
/* constructor */
|
||||||
@@ -74,6 +75,9 @@ namespace Network {
|
|||||||
/* Executed upon entry to new receiver state */
|
/* Executed upon entry to new receiver state */
|
||||||
void set_ack_num( uint64_t s_ack_num ) { ack_num = s_ack_num; }
|
void set_ack_num( uint64_t s_ack_num ) { ack_num = s_ack_num; }
|
||||||
|
|
||||||
|
/* Accelerate reply ack */
|
||||||
|
void set_data_ack( void ) { pending_data_ack = true; }
|
||||||
|
|
||||||
/* Starts shutdown sequence */
|
/* Starts shutdown sequence */
|
||||||
void start_shutdown( void ) { shutdown_in_progress = true; }
|
void start_shutdown( void ) { shutdown_in_progress = true; }
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user