diff --git a/src/FwdState.cc b/src/FwdState.cc index bbca57e56f6..83bd07b31d4 100644 --- a/src/FwdState.cc +++ b/src/FwdState.cc @@ -580,7 +580,12 @@ FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure) if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) return; - storedWholeReply_ = whyWeAreSure; + if (!storedWholeReply_) { + storedWholeReply_ = whyWeAreSure; + + if (!reforward()) + entry->completeSuccessfully(whyWeAreSure); + } } void @@ -1314,7 +1319,11 @@ FwdState::reforward() return 0; } - assert(e->store_status == STORE_PENDING); + if (e->store_status == STORE_OK) { + debugs(17, 3, "No, the entry is STORE_OK already"); + return 0; + } + assert(e->mem_obj); #if URL_CHECKSUM_DEBUG diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc index 15a20b4b10c..7a48812a003 100644 --- a/src/clients/FtpClient.cc +++ b/src/clients/FtpClient.cc @@ -179,6 +179,14 @@ Ftp::DataChannel::addr(const Ip::Address &import) port = import.port(); } +void +Ftp::DataChannel::appended(const size_t size) +{ + readBuf->appended(size); + payloadSeen += size; + debugs(9, 5, size << " bytes to readBuf, payloadSeen: " << payloadSeen); +} + /* Ftp::Client */ Ftp::Client::Client(FwdState *fwdState): @@ -979,8 +987,7 @@ Ftp::Client::dataRead(const CommIoCbParams &io) } if (io.flag == Comm::OK && io.size > 0) { - debugs(9, 5, "appended " << io.size << " bytes to readBuf"); - data.readBuf->appended(io.size); + data.appended(io.size); #if USE_DELAY_POOLS DelayId delayId = entry->mem_obj->mostBytesAllowed(); delayId.bytesIn(io.size); diff --git a/src/clients/FtpClient.h b/src/clients/FtpClient.h index a42e0e199a6..9e1d4614f33 100644 --- a/src/clients/FtpClient.h +++ b/src/clients/FtpClient.h @@ -99,11 +99,17 @@ class DataChannel: public Ftp::Channel void addr(const Ip::Address &addr); ///< import host and port + /// updates counters after this number of bytes have been added to readBuf + void appended(size_t size); + public: MemBuf *readBuf; char *host; unsigned short port; bool read_pending; + + /// amount of message payload/body received so far + uint64_t payloadSeen = 0; }; /// FTP client functionality shared among FTP Gateway and Relay clients. diff --git a/src/clients/FtpGateway.cc b/src/clients/FtpGateway.cc index 675c08dfec2..c2bc3a0bbca 100644 --- a/src/clients/FtpGateway.cc +++ b/src/clients/FtpGateway.cc @@ -970,8 +970,10 @@ Ftp::Gateway::processReplyBody() return; } + const auto availableDataSize = data.readBuf->contentSize(); + /* Directory listings are special. They write ther own headers via the error objects */ - if (!flags.http_header_sent && data.readBuf->contentSize() >= 0 && !flags.isdir) + if (!flags.http_header_sent && !flags.isdir) appendSuccessHeader(); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { @@ -1000,14 +1002,17 @@ Ftp::Gateway::processReplyBody() parseListing(); maybeReadVirginBody(); return; - } else if (const auto csize = data.readBuf->contentSize()) { - writeReplyBody(data.readBuf->content(), csize); - debugs(9, 5, "consuming " << csize << " bytes of readBuf"); - data.readBuf->consume(csize); + } else if (availableDataSize) { + writeReplyBody(data.readBuf->content(), availableDataSize); + debugs(9, 5, "consuming " << availableDataSize << " bytes of readBuf"); + data.readBuf->consume(availableDataSize); } entry->flush(); + if (availableDataSize && theSize >= 0 && data.payloadSeen >= static_cast(theSize)) + markParsedVirginReplyAsWhole("whole virgin body"); + maybeReadVirginBody(); } @@ -1151,6 +1156,7 @@ Ftp::Gateway::start() SBuf realm(ftpRealm()); // local copy so SBuf will not disappear too early const auto reply = ftpAuthRequired(request.getRaw(), realm, fwd->al); entry->replaceHttpReply(reply); + fwd->markStoredReplyAsWhole("checkAuth failed"); serverComplete(); return; } @@ -1257,6 +1263,7 @@ Ftp::Gateway::loginFailed() // add it to the store entry for response.... entry->replaceHttpReply(newrep); + fwd->markStoredReplyAsWhole("loginFailed"); serverComplete(); } @@ -2225,6 +2232,7 @@ Ftp::Gateway::completedListing() ctrl.message = nullptr; entry->replaceHttpReply(ferr.BuildHttpReply()); entry->flush(); + fwd->markStoredReplyAsWhole("completedListing"); entry->unlock("Ftp::Gateway"); } @@ -2239,8 +2247,9 @@ ftpReadTransferDone(Ftp::Gateway * ftpState) if (ftpState->flags.listing) { ftpState->completedListing(); /* QUIT operation handles sending the reply to client */ + } else { + ftpState->markParsedVirginReplyAsWhole("ftpReadTransferDone code 226 or 250"); } - ftpState->markParsedVirginReplyAsWhole("ftpReadTransferDone code 226 or 250"); ftpSendQuit(ftpState); } else { /* != 226 */ debugs(9, DBG_IMPORTANT, "Got code " << code << " after reading data"); @@ -2272,7 +2281,6 @@ ftpWriteTransferDone(Ftp::Gateway * ftpState) } ftpState->entry->timestampsSet(); /* XXX Is this needed? */ - ftpState->markParsedVirginReplyAsWhole("ftpWriteTransferDone code 226 or 250"); ftpSendReply(ftpState); } @@ -2401,6 +2409,7 @@ ftpFail(Ftp::Gateway *ftpState) delete ftperr; ftpState->entry->replaceHttpReply(newrep); + ftpState->fwd->markStoredReplyAsWhole("ftpFail"); ftpSendQuit(ftpState); } @@ -2480,6 +2489,8 @@ ftpSendReply(Ftp::Gateway * ftpState) ftpState->entry->replaceHttpReply(err.BuildHttpReply()); + ftpState->fwd->markStoredReplyAsWhole("ftpSendReply"); + ftpSendQuit(ftpState); }