From 4383ff03de682344644605a0043d09e2fb6d4c1e Mon Sep 17 00:00:00 2001 From: David Vollbracht Date: Tue, 14 May 2019 14:33:03 -0400 Subject: [PATCH] Fixes infinite loop triggered by oversized metrics Adds a length check to `send` that checks whether the metric being sent is longer than the max buffer size. An exception is raised to the caller (*not* in the reaper thread) if the metric is too large. In order to check the length before adding the item to the reaper thread, the UTF8 building had to move to be done in the calling thread rather than the background. Prior to this, if a chunk larger than the max buffer size made it into `builderAction` it would cause an infinite loop. It would try to flush existing chunks to make room for the new one, but would then get stuck because the oversize chunk was still too big. The exact same thing would happen on the next `builderAction` iteration (and so forth). Fixes #32. --- src/Network/StatsD/Datadog.hs | 42 ++++++++++++++++++++++------ test/Test/Network/Datadog/StatsD.hs | 43 +++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/src/Network/StatsD/Datadog.hs b/src/Network/StatsD/Datadog.hs index 46deeb6..9d78f43 100644 --- a/src/Network/StatsD/Datadog.hs +++ b/src/Network/StatsD/Datadog.hs @@ -53,12 +53,13 @@ module Network.StatsD.Datadog ( HasStatus(..), HasMessage(..), -- * Dummy client - StatsClient(Dummy) + StatsClient(Dummy), + MetricLargerThanBufferSizeException ) where import Control.Applicative ((<$>)) import Control.Exception (SomeException) import Control.Lens -import Control.Monad (void) +import Control.Monad (void, when) import Control.Reaper import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy as L @@ -379,9 +380,14 @@ accumulateStats maxBufSize = go 0 [] go :: Int -> [ByteString] -> Seq.Seq ByteString -> (L.ByteString, Seq.Seq ByteString) go !accum chunks s = case Seq.viewl s of Seq.EmptyL -> (finalizeChunks chunks, Seq.empty) - (bs Seq.:< rest) -> let newSize = B.length bs + accum in if newSize > maxBufSize - then (finalizeChunks chunks, s) - else go newSize (bs : chunks) rest + (bs Seq.:< rest) -> + let newChunkSize = B.length bs + newTotalSize = newChunkSize + accum + in if newChunkSize > maxBufSize + then error "Oversized chunk made it into datadog accumulateStats. Please report this as a bug." + else if newTotalSize > maxBufSize + then (finalizeChunks chunks, s) + else go newTotalSize (bs : chunks) rest finalizeChunks :: [ByteString] -> L.ByteString finalizeChunks = L.fromChunks . reverse @@ -404,7 +410,7 @@ mkStatsClient s = liftIO $ do { reaperAction = \stats -> catch (builderAction h (dogStatsSettingsBufferSize s) stats) $ \e -> dogStatsSettingsOnException s e stats , reaperDelay = dogStatsSettingsMaxDelay s - , reaperCons = \item work -> work Seq.|> runUtf8Builder item + , reaperCons = \item work -> work Seq.|> item , reaperNull = Seq.null , reaperEmpty = Seq.empty } @@ -427,7 +433,7 @@ withDogStatsD s = bracket (mkStatsClient s) finalizeStatsClient -- | Note that Dummy is not the only constructor, just the only publicly available one. data StatsClient = StatsClient { statsClientHandle :: !Handle - , statsClientReaper :: Reaper (Seq.Seq ByteString) (Utf8Builder ()) + , statsClientReaper :: Reaper (Seq.Seq ByteString) ByteString , statsClientSettings :: DogStatsSettings } | Dummy -- ^ Just drops all stats. @@ -442,8 +448,18 @@ data StatsClient = StatsClient -- > send client $ metric "wombat.force_count" Gauge (9001 :: Int) -- > send client $ serviceCheck "Wombat Radar" ServiceOk send :: (MonadIO m, ToStatsD v) => StatsClient -> v -> m () -send StatsClient {statsClientReaper} v = - liftIO $ reaperAdd statsClientReaper (toStatsD v >> appendChar7 '\n') +send StatsClient {statsClientReaper, statsClientSettings} v = do + let bytes = runUtf8Builder (toStatsD v >> appendChar7 '\n') + bytesSize = B.length bytes + maxBufSize = dogStatsSettingsBufferSize statsClientSettings + + when (bytesSize > maxBufSize) $ throwIO $ + MetricLargerThanBufferSizeException + { metricSize = bytesSize + , maxBufferSize = maxBufSize + } + + liftIO $ reaperAdd statsClientReaper bytes send Dummy _ = return () {-# INLINEABLE send #-} @@ -454,3 +470,11 @@ finalizeStatsClient (StatsClient h r s) = liftIO $ do void $ builderAction h (dogStatsSettingsBufferSize s) remainingStats hClose h finalizeStatsClient Dummy = return () + +data MetricLargerThanBufferSizeException = + MetricLargerThanBufferSizeException + { metricSize :: Int + , maxBufferSize :: Int + } deriving (Show, Typeable) + +instance Exception MetricLargerThanBufferSizeException diff --git a/test/Test/Network/Datadog/StatsD.hs b/test/Test/Network/Datadog/StatsD.hs index fc4ebd5..923782e 100644 --- a/test/Test/Network/Datadog/StatsD.hs +++ b/test/Test/Network/Datadog/StatsD.hs @@ -1,6 +1,8 @@ module Test.Network.Datadog.StatsD (spec) where -import Control.Monad.Catch (bracket) +import Control.Monad.Catch (SomeException, bracket, try, displayException) +import Control.Concurrent (forkFinally, killThread, threadDelay) +import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Network.Socket ( AddrInfoFlag (AI_PASSIVE) , Socket @@ -19,8 +21,9 @@ import Network.Socket import Control.Monad import System.Timeout (timeout) import Test.Hspec (Spec, describe, expectationFailure, it) +import qualified Data.Text as Text -import Network.StatsD.Datadog (defaultSettings, event, send, withDogStatsD) +import Network.StatsD.Datadog (MetricLargerThanBufferSizeException, defaultSettings, event, send, withDogStatsD, dogStatsSettingsMaxDelay) spec :: Spec spec = describe "StatsD spec" $ do @@ -52,3 +55,39 @@ spec = describe "StatsD spec" $ do case val of Just _ -> pure () Nothing -> expectationFailure "Did not receive DogStatsD event" + it "does not go into an infinite loop when trying sending a metric larger than a UPD packet" $ do + let longText = Text.replicate 65507 "x" + sendLargeMetric = + withDogStatsD defaultSettings $ \stats -> do + try $ send stats $ event "foo" longText + + threadFinishedVar <- newEmptyMVar + + -- run the `withDogStatsD` computation on another thread so we can successfully time out the + -- test. The infinite loop caused by large packets was not interruptible by asynchronous + -- exceptions. Most likely did not allocate any memory, so there was no safe spot to raise + -- the async exception. + threadId <- forkFinally sendLargeMetric (putMVar threadFinishedVar) + threadResult <- timeout 10000000 $ takeMVar threadFinishedVar + + -- timeout the killThread call because it will end up waiting indefinitely trying to deliver + -- the asynchronous exception if the infinite loop is triggered. + _ <- timeout 100000 $ killThread threadId + + case threadResult of + -- The thread finished and try caught the expected exception. + Just (Right (Left (_ :: MetricLargerThanBufferSizeException))) -> + pure () + + -- The thread finished with *no* exception, which is unexpected. + Just (Right (Right _)) -> + expectationFailure "Expected a MetricLargerThanBufferSizeException, but no exception was thrown." + + -- The thread finished with an exception that was not caught by the `try` above. + Just (Left err) -> + expectationFailure $ "Expected a MetricLargerThanBufferSizeException to be thrown by send, but got: " ++ show err + + -- The takeMVar above timed out, indicating the thread didn't finish. + Nothing -> + expectationFailure "Sending thread did not finish before timeout." +