Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes infinite loop triggered by oversized metrics #33

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions src/Network/StatsD/Datadog.hs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ module Network.StatsD.Datadog (
HasStatus(..),
HasMessage(..),
-- * Dummy client
StatsClient(Dummy)
StatsClient(Dummy),
MetricLargerThanBufferSizeException
) where
import Control.Applicative ((<$>))
import Control.Exception (SomeException)
import Control.Lens
import Control.Monad (void)
import Control.Monad (void, when)
import Control.Reaper
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as L
Expand Down Expand Up @@ -379,9 +380,14 @@ accumulateStats maxBufSize = go 0 []
go :: Int -> [ByteString] -> Seq.Seq ByteString -> (L.ByteString, Seq.Seq ByteString)
go !accum chunks s = case Seq.viewl s of
Seq.EmptyL -> (finalizeChunks chunks, Seq.empty)
(bs Seq.:< rest) -> let newSize = B.length bs + accum in if newSize > maxBufSize
then (finalizeChunks chunks, s)
else go newSize (bs : chunks) rest
(bs Seq.:< rest) ->
let newChunkSize = B.length bs
newTotalSize = newChunkSize + accum
in if newChunkSize > maxBufSize
then error "Oversized chunk made it into datadog accumulateStats. Please report this as a bug."
else if newTotalSize > maxBufSize
then (finalizeChunks chunks, s)
else go newTotalSize (bs : chunks) rest

finalizeChunks :: [ByteString] -> L.ByteString
finalizeChunks = L.fromChunks . reverse
Expand All @@ -404,7 +410,7 @@ mkStatsClient s = liftIO $ do
{ reaperAction = \stats -> catch (builderAction h (dogStatsSettingsBufferSize s) stats) $ \e ->
dogStatsSettingsOnException s e stats
, reaperDelay = dogStatsSettingsMaxDelay s
, reaperCons = \item work -> work Seq.|> runUtf8Builder item
, reaperCons = \item work -> work Seq.|> item
, reaperNull = Seq.null
, reaperEmpty = Seq.empty
}
Expand All @@ -427,7 +433,7 @@ withDogStatsD s = bracket (mkStatsClient s) finalizeStatsClient
-- | Note that Dummy is not the only constructor, just the only publicly available one.
data StatsClient = StatsClient
{ statsClientHandle :: !Handle
, statsClientReaper :: Reaper (Seq.Seq ByteString) (Utf8Builder ())
, statsClientReaper :: Reaper (Seq.Seq ByteString) ByteString
, statsClientSettings :: DogStatsSettings
}
| Dummy -- ^ Just drops all stats.
Expand All @@ -442,8 +448,18 @@ data StatsClient = StatsClient
-- > send client $ metric "wombat.force_count" Gauge (9001 :: Int)
-- > send client $ serviceCheck "Wombat Radar" ServiceOk
send :: (MonadIO m, ToStatsD v) => StatsClient -> v -> m ()
send StatsClient {statsClientReaper} v =
liftIO $ reaperAdd statsClientReaper (toStatsD v >> appendChar7 '\n')
send StatsClient {statsClientReaper, statsClientSettings} v = do
let bytes = runUtf8Builder (toStatsD v >> appendChar7 '\n')
bytesSize = B.length bytes
maxBufSize = dogStatsSettingsBufferSize statsClientSettings

when (bytesSize > maxBufSize) $ throwIO $
MetricLargerThanBufferSizeException
{ metricSize = bytesSize
, maxBufferSize = maxBufSize
}

liftIO $ reaperAdd statsClientReaper bytes
send Dummy _ = return ()
{-# INLINEABLE send #-}

Expand All @@ -454,3 +470,11 @@ finalizeStatsClient (StatsClient h r s) = liftIO $ do
void $ builderAction h (dogStatsSettingsBufferSize s) remainingStats
hClose h
finalizeStatsClient Dummy = return ()

data MetricLargerThanBufferSizeException =
MetricLargerThanBufferSizeException
{ metricSize :: Int
, maxBufferSize :: Int
} deriving (Show, Typeable)

instance Exception MetricLargerThanBufferSizeException
43 changes: 41 additions & 2 deletions test/Test/Network/Datadog/StatsD.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module Test.Network.Datadog.StatsD (spec) where

import Control.Monad.Catch (bracket)
import Control.Monad.Catch (SomeException, bracket, try, displayException)
import Control.Concurrent (forkFinally, killThread, threadDelay)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Network.Socket
( AddrInfoFlag (AI_PASSIVE)
, Socket
Expand All @@ -19,8 +21,9 @@ import Network.Socket
import Control.Monad
import System.Timeout (timeout)
import Test.Hspec (Spec, describe, expectationFailure, it)
import qualified Data.Text as Text

import Network.StatsD.Datadog (defaultSettings, event, send, withDogStatsD)
import Network.StatsD.Datadog (MetricLargerThanBufferSizeException, defaultSettings, event, send, withDogStatsD, dogStatsSettingsMaxDelay)

spec :: Spec
spec = describe "StatsD spec" $ do
Expand Down Expand Up @@ -52,3 +55,39 @@ spec = describe "StatsD spec" $ do
case val of
Just _ -> pure ()
Nothing -> expectationFailure "Did not receive DogStatsD event"
it "does not go into an infinite loop when trying sending a metric larger than a UPD packet" $ do
let longText = Text.replicate 65507 "x"
sendLargeMetric =
withDogStatsD defaultSettings $ \stats -> do
try $ send stats $ event "foo" longText

threadFinishedVar <- newEmptyMVar

-- run the `withDogStatsD` computation on another thread so we can successfully time out the
-- test. The infinite loop caused by large packets was not interruptible by asynchronous
-- exceptions. Most likely did not allocate any memory, so there was no safe spot to raise
-- the async exception.
threadId <- forkFinally sendLargeMetric (putMVar threadFinishedVar)
threadResult <- timeout 10000000 $ takeMVar threadFinishedVar

-- timeout the killThread call because it will end up waiting indefinitely trying to deliver
-- the asynchronous exception if the infinite loop is triggered.
_ <- timeout 100000 $ killThread threadId

case threadResult of
-- The thread finished and try caught the expected exception.
Just (Right (Left (_ :: MetricLargerThanBufferSizeException))) ->
pure ()

-- The thread finished with *no* exception, which is unexpected.
Just (Right (Right _)) ->
expectationFailure "Expected a MetricLargerThanBufferSizeException, but no exception was thrown."

-- The thread finished with an exception that was not caught by the `try` above.
Just (Left err) ->
expectationFailure $ "Expected a MetricLargerThanBufferSizeException to be thrown by send, but got: " ++ show err

-- The takeMVar above timed out, indicating the thread didn't finish.
Nothing ->
expectationFailure "Sending thread did not finish before timeout."