diff --git a/src/Network/StatsD/Datadog.hs b/src/Network/StatsD/Datadog.hs index 46deeb6..9d78f43 100644 --- a/src/Network/StatsD/Datadog.hs +++ b/src/Network/StatsD/Datadog.hs @@ -53,12 +53,13 @@ module Network.StatsD.Datadog ( HasStatus(..), HasMessage(..), -- * Dummy client - StatsClient(Dummy) + StatsClient(Dummy), + MetricLargerThanBufferSizeException ) where import Control.Applicative ((<$>)) import Control.Exception (SomeException) import Control.Lens -import Control.Monad (void) +import Control.Monad (void, when) import Control.Reaper import Data.ByteString (ByteString) import qualified Data.ByteString.Lazy as L @@ -379,9 +380,14 @@ accumulateStats maxBufSize = go 0 [] go :: Int -> [ByteString] -> Seq.Seq ByteString -> (L.ByteString, Seq.Seq ByteString) go !accum chunks s = case Seq.viewl s of Seq.EmptyL -> (finalizeChunks chunks, Seq.empty) - (bs Seq.:< rest) -> let newSize = B.length bs + accum in if newSize > maxBufSize - then (finalizeChunks chunks, s) - else go newSize (bs : chunks) rest + (bs Seq.:< rest) -> + let newChunkSize = B.length bs + newTotalSize = newChunkSize + accum + in if newChunkSize > maxBufSize + then error "Oversized chunk made it into datadog accumulateStats. Please report this as a bug." + else if newTotalSize > maxBufSize + then (finalizeChunks chunks, s) + else go newTotalSize (bs : chunks) rest finalizeChunks :: [ByteString] -> L.ByteString finalizeChunks = L.fromChunks . reverse @@ -404,7 +410,7 @@ mkStatsClient s = liftIO $ do { reaperAction = \stats -> catch (builderAction h (dogStatsSettingsBufferSize s) stats) $ \e -> dogStatsSettingsOnException s e stats , reaperDelay = dogStatsSettingsMaxDelay s - , reaperCons = \item work -> work Seq.|> runUtf8Builder item + , reaperCons = \item work -> work Seq.|> item , reaperNull = Seq.null , reaperEmpty = Seq.empty } @@ -427,7 +433,7 @@ withDogStatsD s = bracket (mkStatsClient s) finalizeStatsClient -- | Note that Dummy is not the only constructor, just the only publicly available one. data StatsClient = StatsClient { statsClientHandle :: !Handle - , statsClientReaper :: Reaper (Seq.Seq ByteString) (Utf8Builder ()) + , statsClientReaper :: Reaper (Seq.Seq ByteString) ByteString , statsClientSettings :: DogStatsSettings } | Dummy -- ^ Just drops all stats. @@ -442,8 +448,18 @@ data StatsClient = StatsClient -- > send client $ metric "wombat.force_count" Gauge (9001 :: Int) -- > send client $ serviceCheck "Wombat Radar" ServiceOk send :: (MonadIO m, ToStatsD v) => StatsClient -> v -> m () -send StatsClient {statsClientReaper} v = - liftIO $ reaperAdd statsClientReaper (toStatsD v >> appendChar7 '\n') +send StatsClient {statsClientReaper, statsClientSettings} v = do + let bytes = runUtf8Builder (toStatsD v >> appendChar7 '\n') + bytesSize = B.length bytes + maxBufSize = dogStatsSettingsBufferSize statsClientSettings + + when (bytesSize > maxBufSize) $ throwIO $ + MetricLargerThanBufferSizeException + { metricSize = bytesSize + , maxBufferSize = maxBufSize + } + + liftIO $ reaperAdd statsClientReaper bytes send Dummy _ = return () {-# INLINEABLE send #-} @@ -454,3 +470,11 @@ finalizeStatsClient (StatsClient h r s) = liftIO $ do void $ builderAction h (dogStatsSettingsBufferSize s) remainingStats hClose h finalizeStatsClient Dummy = return () + +data MetricLargerThanBufferSizeException = + MetricLargerThanBufferSizeException + { metricSize :: Int + , maxBufferSize :: Int + } deriving (Show, Typeable) + +instance Exception MetricLargerThanBufferSizeException diff --git a/test/Test/Network/Datadog/StatsD.hs b/test/Test/Network/Datadog/StatsD.hs index fc4ebd5..923782e 100644 --- a/test/Test/Network/Datadog/StatsD.hs +++ b/test/Test/Network/Datadog/StatsD.hs @@ -1,6 +1,8 @@ module Test.Network.Datadog.StatsD (spec) where -import Control.Monad.Catch (bracket) +import Control.Monad.Catch (SomeException, bracket, try, displayException) +import Control.Concurrent (forkFinally, killThread, threadDelay) +import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Network.Socket ( AddrInfoFlag (AI_PASSIVE) , Socket @@ -19,8 +21,9 @@ import Network.Socket import Control.Monad import System.Timeout (timeout) import Test.Hspec (Spec, describe, expectationFailure, it) +import qualified Data.Text as Text -import Network.StatsD.Datadog (defaultSettings, event, send, withDogStatsD) +import Network.StatsD.Datadog (MetricLargerThanBufferSizeException, defaultSettings, event, send, withDogStatsD, dogStatsSettingsMaxDelay) spec :: Spec spec = describe "StatsD spec" $ do @@ -52,3 +55,39 @@ spec = describe "StatsD spec" $ do case val of Just _ -> pure () Nothing -> expectationFailure "Did not receive DogStatsD event" + it "does not go into an infinite loop when trying sending a metric larger than a UPD packet" $ do + let longText = Text.replicate 65507 "x" + sendLargeMetric = + withDogStatsD defaultSettings $ \stats -> do + try $ send stats $ event "foo" longText + + threadFinishedVar <- newEmptyMVar + + -- run the `withDogStatsD` computation on another thread so we can successfully time out the + -- test. The infinite loop caused by large packets was not interruptible by asynchronous + -- exceptions. Most likely did not allocate any memory, so there was no safe spot to raise + -- the async exception. + threadId <- forkFinally sendLargeMetric (putMVar threadFinishedVar) + threadResult <- timeout 10000000 $ takeMVar threadFinishedVar + + -- timeout the killThread call because it will end up waiting indefinitely trying to deliver + -- the asynchronous exception if the infinite loop is triggered. + _ <- timeout 100000 $ killThread threadId + + case threadResult of + -- The thread finished and try caught the expected exception. + Just (Right (Left (_ :: MetricLargerThanBufferSizeException))) -> + pure () + + -- The thread finished with *no* exception, which is unexpected. + Just (Right (Right _)) -> + expectationFailure "Expected a MetricLargerThanBufferSizeException, but no exception was thrown." + + -- The thread finished with an exception that was not caught by the `try` above. + Just (Left err) -> + expectationFailure $ "Expected a MetricLargerThanBufferSizeException to be thrown by send, but got: " ++ show err + + -- The takeMVar above timed out, indicating the thread didn't finish. + Nothing -> + expectationFailure "Sending thread did not finish before timeout." +