diff --git a/clientlib/fsprotocol.c b/clientlib/fsprotocol.c index e7c71759..8ba59a26 100644 --- a/clientlib/fsprotocol.c +++ b/clientlib/fsprotocol.c @@ -90,6 +90,19 @@ FSTATIC void _fsprotocol_flush_pending_connshut(FsProtoElem* fspe); #define AUDITFSPE(fspe) { if (fspe) _fsprotocol_auditfspe(fspe, __FUNCTION__, __LINE__); } #define AUDITIREADY(self) {_fsprotocol_auditiready(__FUNCTION__, __LINE__, self);} +#define outq_len(fspe) (fspe)->outq->_q->length +#define is_outq_empty(fspe) (outq_len(fspe)== 0) +#define set_pending(fspe) { \ + (fspe)->parent->unacked = g_list_prepend((fspe)->parent->unacked, fspe); \ +} +#define reset_pending(fspe) { \ + (fspe)->parent->unacked = g_list_remove((fspe)->parent->unacked, fspe); \ +} +#define is_pending(fspe) (g_list_find((fspe)->parent->unacked, fspe) != NULL) +#define update_nextxmit_time(fspe) { \ + (fspe)->nextrexmit = g_get_monotonic_time() + (fspe)->parent->rexmit_interval; \ +} + DEBUGDECLARATIONS /// @defgroup FsProtocol FsProtocol class @@ -479,7 +492,7 @@ _fsprotocol_fsa(FsProtoElem* fspe, ///< The FSPE we're processing fspe->finalizetimer = g_timeout_add_seconds(1+parent->acktimeout/1000000, _fsprotocol_finalizetimer, fspe); } // Check for possible errors in our FSA tables... - if (FSPR_NONE == nextstate && curstate != nextstate && fspe->outq->_q->length != 0) { + if (FSPR_NONE == nextstate && curstate != nextstate && !is_outq_empty(fspe)) { char * deststr = fspe->endpoint->baseclass.toString(&fspe->endpoint->baseclass); g_critical("%s.%d: Inappropriate transition for %s to state NONE" ": (%s, %s) => %s. Actions=[%s], outq length=%d" @@ -488,7 +501,7 @@ _fsprotocol_fsa(FsProtoElem* fspe, ///< The FSPE we're processing , _fsprotocol_fsa_inputs(input) , _fsprotocol_fsa_states(nextstate) , _fsprotocol_fsa_actions(action) - , fspe->outq->_q->length); + , outq_len(fspe)); FREE(deststr); deststr = NULL; _fsprotocol_fsa_log_history(fspe, curstate, nextstate, input, action); } @@ -526,25 +539,22 @@ _fsprotocol_flush_pending_connshut(FsProtoElem* fspe) FSTATIC void _fsprotocol_auditfspe(const FsProtoElem* self, const char * function, int lineno) { - guint outqlen = self->outq->_q->length; - FsProtocol* parent = self->parent; - gboolean in_unackedlist = (g_list_find(parent->unacked, self) != NULL); guint64 now = g_get_monotonic_time(); - if (outqlen != 0 && !in_unackedlist) { - g_critical("%s:%d: outqlen is %d but not in unacked list" - , function, lineno, outqlen); + if (!is_outq_empty(self) && !is_pending(self)) { + g_critical("%s:%d: outqlen is %d but not in the pending list" + , function, lineno, outq_len(self)); DUMP("WARN: previous unacked warning was for this address", &self->endpoint->baseclass, NULL); } - if (outqlen == 0 && in_unackedlist) { - g_critical("%s:%d: outqlen is zero but it IS in the unacked list" + if (is_outq_empty(self) && is_pending(self)) { + g_critical("%s:%d: outqlen is zero but it IS in the pending list" , function, lineno); DUMP("WARN: previous unacked warning was for this address", &self->endpoint->baseclass, NULL); } // If something is hung, it should start complaining soon... - if (in_unackedlist && now > (self->nextrexmit + self->parent->rexmit_interval)) { + if (!is_pending(self) && now > (self->nextrexmit + self->parent->rexmit_interval)) { g_critical("%s:%d: Overdue retransmissions in FSPE %p", function, lineno, self); DUMP("WARN: previous overdue warning was for this IP addr", &self->endpoint->baseclass, NULL); @@ -795,8 +805,9 @@ _fsprotocol_fspe_reinit(FsProtoElem* self) if (!g_queue_is_empty(self->outq->_q)) { DUMP3("REINIT OF OUTQ", &self->outq->baseclass, __FUNCTION__); self->outq->flush(self->outq); + // No longer waiting on any ACKs - takes us off the unACKed list... - self->parent->unacked = g_list_remove(self->parent->unacked, self); + reset_pending(self) self->outq->isready = FALSE; } // See the code in _fsqueue_enq and also in seqnoframe_new_init for how all these pieces @@ -1160,9 +1171,9 @@ _fsprotocol_receive(FsProtocol* self ///< Self pointer // and got a duplicate ACK DUMP3("Received bad ACK from", &fspe->endpoint->baseclass, NULL); DUMP3(__FUNCTION__, &fs->baseclass, " was ACK received."); - }else if (fspe->outq->_q->length == 0) { + }else if (is_outq_empty(fspe)) { // Remove this connection from the list of connections with unacknowledged packets - fspe->parent->unacked = g_list_remove(fspe->parent->unacked, fspe); + reset_pending(fspe); fspe->nextrexmit = 0; TRYXMIT(fspe); fspe->acktimeout = 0; @@ -1284,13 +1295,14 @@ _fsprotocol_send1(FsProtocol* self ///< Our object DEBUGMSG3("%s.%d: calling fsprotocol_fsa(FSPROTO_REQSEND)", __FUNCTION__, __LINE__); _fsprotocol_fsa(fspe, FSPROTO_REQSEND, NULL); - if (fspe->outq->_q->length == 0) { - ///@todo: This might be slow if we send a lot of packets to an endpoint - ///< before getting a response, but that's not very likely. + if (is_outq_empty(fspe)) { guint64 now = g_get_monotonic_time(); - // Add this channel to the list of channels that need ACKs - fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe); - fspe->nextrexmit = now + self->rexmit_interval; + ///@todo: This might be slow if we send a lot of packets to an endpoint + /// before getting a response, but that's not very likely. + // Add this channel to the list of channels that need ACKs + set_pending(fspe); + update_nextxmit_time(fspe); + fspe->acktimeout = now + self->acktimeout; } DEBUGMSG4("%s.%d: calling fspe->outq->enq()", __FUNCTION__, __LINE__); @@ -1416,28 +1428,27 @@ FSTATIC void _fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to operate on { GList* qelem; - FsQueue* outq; FsProtocol* parent; SeqnoFrame* lastseq; NetIO* io; - guint orig_outstanding; + guint orig_pending; gint64 now; g_return_if_fail(fspe != NULL); - outq = fspe->outq; parent = fspe->parent; lastseq = fspe->lastseqsent; io = parent->io; - orig_outstanding = fspe->outq->_q->length; + orig_pending = outq_len(fspe); + now = g_get_monotonic_time(); AUDITFSPE(fspe); // Look for any new packets that might have showed up to send // Check to see if we've exceeded our window size... - if (fspe->outq->_q->length < parent->window_size) { + if (outq_len(fspe) < parent->window_size) { // Nope. Look for packets that we haven't yet sent. // This code is sub-optimal when congestion occurs and we have a larger // window size (i.e. when we have a number of un-ACKed packets) - for (qelem=outq->_q->head; NULL != qelem; qelem=qelem->next) { + for (qelem=fspe->outq->_q->head; NULL != qelem; qelem=qelem->next) { FrameSet* fs = CASTTOCLASS(FrameSet, qelem->data); SeqnoFrame* seq = fs->getseqno(fs); if (NULL != lastseq && NULL != seq && seq->compare(seq, lastseq) <= 0) { @@ -1458,25 +1469,24 @@ _fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to } lastseq = fspe->lastseqsent = seq; REF2(lastseq); - if (fspe->outq->_q->length >= parent->window_size) { + if (outq_len(fspe) >= parent->window_size) { // Can't send any more on this channel until we get some ACKs. break; } } + update_nextxmit_time(fspe); } AUDITFSPE(fspe); - now = g_get_monotonic_time(); - if (fspe->nextrexmit == 0 && fspe->outq->_q->length > 0) { + if (fspe->nextrexmit == 0 && !is_outq_empty(fspe)) { // Next retransmission time not yet set... - fspe->nextrexmit = now + parent->rexmit_interval; + update_nextxmit_time(fspe); AUDITFSPE(fspe); } else if (fspe->nextrexmit != 0 && now > fspe->nextrexmit) { + FrameSet* fs = fspe->outq->qhead(fspe->outq); // It's time to retransmit something. Hurray! - FrameSet* fs = outq->qhead(outq); if (NULL != fs) { - // Update next retransmission time... - fspe->nextrexmit = now + parent->rexmit_interval; + update_nextxmit_time(fspe); DUMP3(__FUNCTION__, &fspe->endpoint->baseclass, " Retransmission target"); DUMP3(__FUNCTION__, &fs->baseclass, " is frameset being REsent"); io->sendaframeset(io, fspe->endpoint, fs); @@ -1494,9 +1504,9 @@ _fsprotocol_xmitifwecan(FsProtoElem* fspe) ///< The FrameSet protocol element to } // Make sure we remember to check this periodicially for retransmits... - if (orig_outstanding == 0 && fspe->outq->_q->length > 0) { - // Put this connection on the list of connections with unacked packets - fspe->parent->unacked = g_list_prepend(fspe->parent->unacked, fspe); + if (orig_pending == 0 && !is_outq_empty(fspe)) { + // Put 'fspe' on the list of fspe's with unacked packets + set_pending(fspe); // See comment in the _send function regarding eventual efficiency concerns } AUDITFSPE(fspe); diff --git a/clientlib/fsqueue.c b/clientlib/fsqueue.c index 4001c0d3..c03b5645 100644 --- a/clientlib/fsqueue.c +++ b/clientlib/fsqueue.c @@ -83,6 +83,12 @@ _fsqueue_enq(FsQueue* self ///< us - the FsQueue we're operating on frameset_prepend_frame(fs, &seqno->baseclass); // And put this FrameSet at the end of the queue g_queue_push_tail(self->_q, fs); + if (DEBUG >= 3) { + char *destaddr = self->_destaddr->baseclass.toString(&self->_destaddr->baseclass); + DEBUGMSG("%s.%d: queued frameset fstype=%d seqno="FMT_64BIT"d (dest=%s)", __FUNCTION__, __LINE__, + fs->fstype, self->_nextseqno-1, destaddr); + g_free(destaddr); destaddr = NULL; + } // Now do all the paperwork :-D // We need for the FrameSet to be kept around for potentially a long time... diff --git a/clientlib/packetdecoder.c b/clientlib/packetdecoder.c index a7eb362c..538cc16f 100644 --- a/clientlib/packetdecoder.c +++ b/clientlib/packetdecoder.c @@ -137,6 +137,8 @@ _decode_packet_framedata_to_frameobject(PacketDecoder* self, ///<[in/out] Packet ret = unknownframe_tlvconstructor(*pktstart, *pktend, newpacket, &newpacketend); } if (NULL == ret) { + g_warning("%s.%d: tlv contructor for frametype %d failed" + , __FUNCTION__, __LINE__, frametype); return NULL; } g_return_val_if_fail(ret != NULL, NULL); @@ -247,6 +249,8 @@ _pktdata_to_framesetlist(PacketDecoder*self, ///<[in] PacketDecoder object newframestart = newpacket; } if (NULL == newframe) { + g_warning("%s.%d: conversion from framedata to frameobject in frameset %d failed" + , __FUNCTION__, __LINE__, fs->fstype); UNREF(fs); goto errout; } @@ -275,6 +279,8 @@ _pktdata_to_framesetlist(PacketDecoder*self, ///<[in] PacketDecoder object } if (fs) { ret = g_slist_append(ret, fs); fs = NULL; + } else { + g_warning("%s.%d: no frameset found", __FUNCTION__, __LINE__); } curframeset = nextframeset; }