diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 10a0271a0bf..45a6af16c2e 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -466,6 +466,7 @@ steps: - trigger-benchmarks artifact_paths: - ./benchmark-history*.tgz + - ./benchmark-history.csv command: | ./scripts/buildkite/main/benchmark-history.sh agents: diff --git a/.gitignore b/.gitignore index da35f86d3a9..88fe4770327 100644 --- a/.gitignore +++ b/.gitignore @@ -88,3 +88,6 @@ test/e2e/.direnv/flake-profile* node_modules package-lock.json package.json + +# buildkite artifacts testing +artifacts \ No newline at end of file diff --git a/lib/benchmarks/cardano-wallet-benchmarks.cabal b/lib/benchmarks/cardano-wallet-benchmarks.cabal index d531d31c8ea..89cf1c5cf80 100644 --- a/lib/benchmarks/cardano-wallet-benchmarks.cabal +++ b/lib/benchmarks/cardano-wallet-benchmarks.cabal @@ -39,6 +39,7 @@ library hs-source-dirs: src build-depends: , aeson + , attoparsec , base , bytestring , cardano-wallet @@ -83,6 +84,7 @@ library , time , unliftio , unliftio-core + , unordered-containers , vector , wai-middleware-logging , with-utf8 @@ -275,6 +277,7 @@ executable benchmark-history , cardano-wallet-buildkite , cassava , containers + , contra-tracer , directory , filepath , exceptions diff --git a/lib/benchmarks/exe/benchmark-history.hs b/lib/benchmarks/exe/benchmark-history.hs index ec6aca4ade6..c2e5e8c7c66 100644 --- a/lib/benchmarks/exe/benchmark-history.hs +++ b/lib/benchmarks/exe/benchmark-history.hs @@ -8,12 +8,13 @@ import Prelude import Buildkite.API - ( Artifact (filename, job_id) + ( Artifact (job_id, path) , Build (branch, jobs, number) - , GetArtifact , Job (finished_at) , Time (Time) - , WithAuthPipeline + ) +import Buildkite.Artifacts.CSV + ( fetchCSVArtifactContent ) import Buildkite.Client ( BuildAPI @@ -23,6 +24,17 @@ import Buildkite.Client , getArtifactsContent , getBuildsOfBranch ) +import Buildkite.Connection + ( Connector + , OrganizationName (..) + , PipelineName (..) + , bailout + , newConnector + , skip410 + ) +import Buildkite.LimitsLock + ( LimitsLockLog (..) + ) import Cardano.Wallet.Benchmarks.Charting ( renderHarmonizedHistoryChartSVG ) @@ -34,22 +46,28 @@ import Cardano.Wallet.Benchmarks.History ( History , IndexedSemantic (IndexedSemantic) , harmonizeHistory + , parseHistory , pastDays , renderHarmonizedHistoryCsv ) -import Control.Monad - ( when +import Control.Tracer + ( Tracer + , stdoutTracer + , traceWith ) import Data.Csv ( decodeByName , encodeByName ) -import Data.Data - ( Proxy (..) - ) import Data.Foldable ( Foldable (..) ) +import Data.Function + ( (&) + ) +import Data.Functor.Contravariant + ( (>$<) + ) import Data.Semigroup ( First (..) ) @@ -61,21 +79,6 @@ import Data.Time ( Day , UTCTime (utctDay) ) -import Network.HTTP.Client - ( Manager - , ManagerSettings (managerModifyRequest) - , Request (shouldStripHeaderOnRedirect) - , newManager - ) -import Network.HTTP.Client.TLS - ( tlsManagerSettings - ) -import Network.HTTP.Media - ( (//) - ) -import Network.HTTP.Types.Status - ( status410 - ) import Options.Applicative ( Parser , ParserInfo @@ -91,32 +94,11 @@ import Options.Applicative , progDesc , strOption ) -import Servant.API - ( Accept (..) - , MimeUnrender - ) -import Servant.API.ContentTypes - ( MimeRender (..) - , MimeUnrender (..) - ) -import Servant.Client - ( BaseUrl (..) - , ClientEnv - , ClientError (FailureResponse) - , ClientM - , ResponseF (..) - , Scheme (..) - , client - , mkClientEnv - , runClientM - ) import Streaming - ( Of (..) + ( MonadIO (..) + , Of (..) , Stream ) -import System.Environment - ( getEnv - ) import System.FilePath ( (<.>) , () @@ -129,68 +111,83 @@ import qualified Data.Text as T import qualified Streaming as S import qualified Streaming.Prelude as S -data CSV - -instance Accept CSV where - contentType _ = "text" // "csv" - -instance Show a => MimeRender CSV a where - mimeRender _ val = BL8.pack $ show val +cardanoFoundationName :: OrganizationName +cardanoFoundationName = OrganizationName "cardano-foundation" -instance MimeUnrender CSV BL8.ByteString where - mimeUnrender _ = Right +mainPipeline :: PipelineName +mainPipeline = PipelineName "cardano-wallet" -organizationSlug :: Text -organizationSlug = "cardano-foundation" +bind + :: Monad m + => (a -> Stream (Of b) m x) + -> Stream (Of a) m r + -> Stream (Of b) m r +bind = flip S.for -pipelineSlug :: Text -pipelineSlug = "cardano-wallet" +data Progress + = ProgressBuild Int Text + | FoundCheckpoint Artifact -buildkiteDomain :: String -buildkiteDomain = "api.buildkite.com" +type Checkpoint = Maybe (BuildJobsMap, Artifact, BL8.ByteString) -buildkitePort :: Int -buildkitePort = 443 +exitOnCheckpoint + :: Tracer IO Progress + -> ((BuildJobsMap, Artifact) -> IO Checkpoint) + -> Stream (Of (BuildJobsMap, Artifact)) IO () + -> Stream (Of (BuildJobsMap, Artifact)) IO Checkpoint +exitOnCheckpoint t f s = S.effect $ do + m <- S.next s + case m of + Left () -> pure $ pure Nothing + Right ((b, a), s') -> do + let run = do + S.yield (b, a) + exitOnCheckpoint t f s' + if path a == "benchmark-history.csv" + then do + r <- f (b, a) + pure $ case r of + Nothing -> run + Just x -> do + liftIO $ traceWith t $ FoundCheckpoint a + pure $ Just x + else pure run -withAuthWallet :: String -> WithAuthPipeline a -> a -withAuthWallet apiToken f = - f (Just $ T.pack apiToken) organizationSlug pipelineSlug - -fetchArtifactContent - :: WithAuthPipeline (Int -> Text -> Text -> ClientM BL8.ByteString) -fetchArtifactContent = client (Proxy :: Proxy (GetArtifact CSV BL8.ByteString)) - -queryBuildkite :: - (forall a . HandleClientError a -> ClientM a -> IO (Maybe a)) - -> (forall a . WithAuthPipeline a -> a) - -> Day -> IO History -queryBuildkite q w d0 = do - let skip410Q = Query (q skip410) w - bailoutQ = Query (q bailout) w - S.foldMap_ Prelude.id - $ flip - S.for +getHistory + :: Tracer IO Progress + -> Connector + -> Day + -> IO (Of History Checkpoint) +getHistory progressTracer mkQuery d0 = do + let queryPipeline = mkQuery cardanoFoundationName mainPipeline + skip410Q = queryPipeline skip410 + bailoutQ = queryPipeline bailout + getAnyCSVArtifact + :: MonadIO m + => (BuildJobsMap, Artifact) + -> m (Maybe (BuildJobsMap, Artifact, BL8.ByteString)) + getAnyCSVArtifact = + uncurry + $ getArtifactsContent skip410Q fetchCSVArtifactContent + S.foldMap Prelude.id + $ getReleaseCandidateBuilds bailoutQ d0 + & bind (getArtifacts bailoutQ) + & exitOnCheckpoint progressTracer getAnyCSVArtifact + & S.filter (\(_, a) -> "bench-results.csv" `isSuffixOf` path a) + & S.chain + ( \(b, _) -> + liftIO + $ traceWith progressTracer + $ ProgressBuild (number b) (branch b) + ) + & S.mapM getAnyCSVArtifact + & S.concat + & bind historyPoints + & bind ( \case Right h -> S.yield h Left e -> error e ) - $ flip S.for historyPoints - $ flip S.for (\(a, j) -> getArtifactsContent - skip410Q - fetchArtifactContent j a) - $ S.chain - ( \(_, b) -> - putStrLn - $ "Build number: " - <> show (number b) - <> ", branch: " - <> T.unpack (branch b) - ) - - $ S.filter (\(a, _) -> "bench-results.csv" `isSuffixOf` filename a) - $ S.map (\(b, a) -> (a, b)) - $ flip S.for (getArtifacts bailoutQ) - $ getReleaseCandidateBuilds bailoutQ d0 mkReleaseCandidateName :: Day -> String mkReleaseCandidateName d = "release-candidate/v" ++ show d @@ -202,11 +199,6 @@ getReleaseCandidateBuilds q d = S.effect $ do $ flip S.for (getBuildsOfBranch q . mkReleaseCandidateName) $ S.each ds -getToken :: IO String -getToken = do - token <- getEnv "BUILDKITE_API_TOKEN" - pure $ "Bearer " ++ token - data Options = Options { _optSinceDate :: Day , _outputDir :: FilePath @@ -235,72 +227,50 @@ optionsParser = <> header "benchmark-history - a tool for benchmark data analysis" ) -type HandleClientError a = IO (Either ClientError a) -> IO (Maybe a) +data Logs = ProgressLogs Progress | ConnectorLogs LimitsLockLog + +renderLogs :: Logs -> String +renderLogs = \case + ProgressLogs (ProgressBuild n b) -> + "Processing build " <> show n <> " on branch " <> T.unpack b + ProgressLogs (FoundCheckpoint a) -> + "Found chain point: " <> T.unpack (path a) + ConnectorLogs (RateLimitReached s) -> + "Rate limit reached. Waiting for " <> show s <> " seconds." + +tracer :: Tracer IO Logs +tracer = renderLogs >$< stdoutTracer main :: IO () main = do - bkToken <- getToken Options sinceDay outputDir <- execParser optionsParser - manager <- newManager $ specialSettings False - let env = buildkiteEnv manager - runQuery :: HandleClientError a -> ClientM a -> IO (Maybe a) - runQuery f action = f $ runClientM action env - result <- queryBuildkite runQuery (withAuthWallet bkToken) sinceDay - let eHarmonized = harmonizeHistory result - case eHarmonized of + connector <- + newConnector "BUILDKITE_API_TOKEN" 10 $ ConnectorLogs >$< tracer + result :> mCheckpoint <- + getHistory (ProgressLogs >$< tracer) connector sinceDay + old <- case mCheckpoint of + Nothing -> pure mempty + Just (_, _, b) -> case parseHistory b of + Left e -> error e + Right h -> pure h + case harmonizeHistory $ result <> old of Left rs -> error $ "Failed to harmonize history: " ++ show rs Right harmonized -> do - putStrLn $ "Harmonized history: " <> show harmonized - let csv = uncurry encodeByName $ renderHarmonizedHistoryCsv harmonized - BL8.writeFile (outputDir "benchmark_history" <.> "csv") csv + let csv = + uncurry encodeByName + $ renderHarmonizedHistoryCsv harmonized + BL8.writeFile (outputDir "benchmark-history" <.> "csv") csv renderHarmonizedHistoryChartSVG outputDir harmonized -bailout :: HandleClientError a -bailout = handle (error . show) - -handle :: (ClientError -> IO (Maybe a)) -> HandleClientError a -handle g f = do - res <- f - case res of - Left e -> g e - Right a -> pure $ Just a - -skip410 :: HandleClientError a -skip410 = handle $ \case - FailureResponse _ (Response s _ _ _) - | s == status410 -> pure Nothing - e -> error $ show e - -buildkiteEnv :: Manager -> ClientEnv -buildkiteEnv manager = - mkClientEnv manager - $ BaseUrl Https buildkiteDomain buildkitePort "" - -specialSettings :: Bool -> ManagerSettings -specialSettings logs = - tlsManagerSettings - { managerModifyRequest = \req -> do - let req' = - req - { shouldStripHeaderOnRedirect = - \case - "Authorization" -> True - _ -> False - } - when logs - $ putStrLn - $ "Querying: " ++ show req' - pure req' - } - parseResults :: BL8.ByteString -> Either String ([(IndexedSemantic, Result)]) parseResults = fmap (fmap f . zip [0 ..] . toList . snd) . decodeByName where f (i, (Benchmark s r)) = (IndexedSemantic s i, r) historyPoints - :: (BuildJobsMap, Artifact, BL8.ByteString) - -> Stream (Of (Either String History)) IO () + :: Monad m + => (BuildJobsMap, Artifact, BL8.ByteString) + -> Stream (Of (Either String History)) m () historyPoints (b, a, r) = let rs = parseResults r diff --git a/lib/benchmarks/src/Cardano/Wallet/Benchmarks/History.hs b/lib/benchmarks/src/Cardano/Wallet/Benchmarks/History.hs index 90098e8dfcf..57bf092a95c 100644 --- a/lib/benchmarks/src/Cardano/Wallet/Benchmarks/History.hs +++ b/lib/benchmarks/src/Cardano/Wallet/Benchmarks/History.hs @@ -8,6 +8,7 @@ module Cardano.Wallet.Benchmarks.History , harmonizeHistory , pastDays , renderHarmonizedHistoryCsv + , parseHistory ) where @@ -19,13 +20,22 @@ import Cardano.Wallet.Benchmarks.Collect , Unit (..) , convertUnit ) +import Control.Monad + ( forM + ) import Data.Bifunctor ( second ) import Data.Csv - ( Header + ( FromField (..) + , FromNamedRecord (..) + , Header + , NamedRecord + , Parser , ToNamedRecord (..) + , decodeByName , namedRecord + , (.:) , (.=) ) import Data.Foldable @@ -41,6 +51,9 @@ import Data.Map.Monoidal.Strict ( MonoidalMap (..) , assocs ) +import Data.Maybe + ( maybeToList + ) import Data.Semigroup ( First (..) ) @@ -51,10 +64,14 @@ import Data.Time ( Day , UTCTime (..) , addDays + , defaultTimeLocale , getCurrentTime + , parseTimeM ) import qualified Data.ByteString.Char8 as B8 +import qualified Data.ByteString.Lazy.Char8 as BL8 +import qualified Data.HashMap.Strict as HMap import qualified Data.Map as Map import qualified Data.Map.Monoidal.Strict as MMap import qualified Data.Text as T @@ -185,6 +202,15 @@ data Row = Row } deriving stock (Show) +rowToHarmonizedRow :: Row -> History +rowToHarmonizedRow (Row s i u vs) = + MMap.singleton (IndexedSemantic s i) + $ fold + $ do + (d, mv) <- vs + v <- maybeToList mv + pure $ MMap.singleton d $ First $ Result v u 1 + instance ToNamedRecord Row where toNamedRecord (Row s i u vs) = namedRecord @@ -193,6 +219,31 @@ instance ToNamedRecord Row where : ("Unit" .= u) : fmap (\(d, v) -> (B8.pack . show $ d) .= v) vs +instance FromNamedRecord Row where + parseNamedRecord r = + Row + <$> r .: "Semantic" + <*> r .: "Index" + <*> r .: "Unit" + <*> parseDays r + +parseDays :: NamedRecord -> Parser [(Day, Maybe Double)] +parseDays hm = + let + hm' = + HMap.delete "Semantic" + $ HMap.delete "Index" + $ HMap.delete "Unit" hm + fields = HMap.toList hm' + in + forM fields $ \(k, v) -> do + d <- parseTimeM True defaultTimeLocale "%Y-%m-%d" (B8.unpack k) + v' <- parseField v + return (d, v') + +parseHistory :: BL8.ByteString -> Either String History +parseHistory r = foldMap rowToHarmonizedRow . toList . snd <$> decodeByName r + -- | Render a harmonized history as a CSV file. renderHarmonizedHistoryCsv :: HarmonizedHistory -> (Header, [Row]) renderHarmonizedHistoryCsv (HarmonizedHistory h ds) = (header', rows) diff --git a/lib/buildkite/cardano-wallet-buildkite.cabal b/lib/buildkite/cardano-wallet-buildkite.cabal index 35982e15f3b..8be703bb810 100644 --- a/lib/buildkite/cardano-wallet-buildkite.cabal +++ b/lib/buildkite/cardano-wallet-buildkite.cabal @@ -39,15 +39,18 @@ library , base , bytestring , containers + , contra-tracer , exceptions , http-client , http-client-tls , http-media + , http-types , pretty-simple , servant , servant-client , streaming , text + , stm , time if flag(release) @@ -56,4 +59,7 @@ library exposed-modules: Buildkite.API + Buildkite.Artifacts.CSV Buildkite.Client + Buildkite.Connection + Buildkite.LimitsLock diff --git a/lib/buildkite/src/Buildkite/API.hs b/lib/buildkite/src/Buildkite/API.hs index 18a4545ea6f..b68e8c773ca 100644 --- a/lib/buildkite/src/Buildkite/API.hs +++ b/lib/buildkite/src/Buildkite/API.hs @@ -14,28 +14,39 @@ module Buildkite.API , Build (..) , Job (..) , Time (..) + , renderDate + , parseDate , fetchBuilds , fetchBuildsOfBranch , fetchArtifacts , jobId , artifactId + , WithLockingAuthPipeline , WithAuthPipeline + , WithLocking , overJobs , GetArtifact + , newLimitsLock + , LimitsLock + , withLimitsLock ) where import Prelude +import Buildkite.LimitsLock + ( LimitsLock (..) + , newLimitsLock + ) import Control.Monad - ( mzero - , (>=>) + ( (>=>) + ) +import Control.Monad.IO.Class + ( MonadIO (..) ) import Data.Aeson ( FromJSON (..) - ) -import Data.Aeson.Types - ( Parser + , ToJSON (..) ) import Data.Data ( Proxy (..) @@ -46,17 +57,24 @@ import Data.Text import Data.Time ( UTCTime , defaultTimeLocale + , formatTime , parseTimeM ) import GHC.Generics ( Generic ) +import GHC.Stack + ( HasCallStack + ) import Servant.API ( Capture , Get , Header + , Headers (getResponse) , JSON , QueryParam + , ResponseHeader (..) + , lookupResponseHeader , (:>) ) import Servant.Client @@ -64,31 +82,38 @@ import Servant.Client , client ) +import qualified Data.Text as T + newtype Time = Time { time :: UTCTime } deriving (Show) -parseDate :: String -> Parser UTCTime -parseDate dateString = - maybe mzero pure - $ parseTimeM True defaultTimeLocale "%Y-%m-%dT%H:%M:%S%QZ" dateString +parseDate :: MonadFail m => [Char] -> m UTCTime +parseDate = parseTimeM True defaultTimeLocale "%Y-%m-%dT%H:%M:%S%QZ" + +renderDate :: UTCTime -> Text +renderDate = T.pack . formatTime defaultTimeLocale "%Y-%m-%dT%H:%M:%S%QZ" instance FromJSON Time where parseJSON = parseJSON >=> parseDate >=> pure . Time +instance ToJSON Time where + toJSON = toJSON . renderDate . time data Job = Job { id :: Text , name :: Maybe Text , step_key :: Maybe Text , finished_at :: Maybe Time + , created_at :: Maybe Time } deriving (Show, Generic) instance FromJSON Job +instance ToJSON Job jobId :: Job -> Text -jobId (Job i _ _ _) = i +jobId (Job i _ _ _ _) = i data Build l = Build { number :: Int @@ -107,19 +132,25 @@ data Artifact = Artifact , id :: Text , job_id :: Text , state :: Text + , file_size :: Int + , path :: Text } deriving (Show, Generic) artifactId :: Artifact -> Text -artifactId (Artifact _ i _ _) = i +artifactId (Artifact _ i _ _ _ _) = i newtype ArtifactURL = ArtifactURL { url :: Text } deriving (Show, Generic) -instance FromJSON (Build []) +instance FromJSON (l Job) => FromJSON (Build l) +instance ToJSON (l Job) => ToJSON (Build l) + instance FromJSON Artifact +instance ToJSON Artifact + instance FromJSON ArtifactURL type PreamblePipeline a = @@ -135,7 +166,7 @@ type GetBuilds = PreamblePipeline ( "builds" :> QueryParam "page" Int - :> Get '[JSON] [Build []] + :> Get '[JSON] (BuildKiteHeaders [Build []]) ) type GetBuildsOfBranch = @@ -143,7 +174,7 @@ type GetBuildsOfBranch = ( "builds" :> QueryParam "branch" String :> QueryParam "page" Int - :> Get '[JSON] [Build []] + :> Get '[JSON] (BuildKiteHeaders [Build []]) ) type PreambleBuilds a = @@ -157,7 +188,7 @@ type GetArtifacts = PreambleBuilds ( QueryParam "page" Int :> "artifacts" - :> Get '[JSON] [Artifact] + :> Get '[JSON] (BuildKiteHeaders [Artifact]) ) type PreambleJobs a = @@ -175,19 +206,47 @@ type GetArtifact mime content = :> Get '[mime] content ) -type WithAuthPipeline a = Maybe Text -> Text -> Text -> a +type WithLockingAuthPipeline a = WithLocking (Maybe Text -> Text -> Text -> a) +type WithLocking a = LimitsLock -> a +type WithAuthPipeline a = Maybe Text -> Text -> Text -> a + +respectLimits :: HasCallStack => LimitsLock -> BuildKiteHeaders o -> ClientM o +respectLimits l r = do + let remaining = case lookupResponseHeader r + :: ResponseHeader "RateLimit-Remaining" Int of + Header remaining' -> remaining' + _ -> error "RateLimit-Remaining header not found" + let reset = case lookupResponseHeader r + :: ResponseHeader "RateLimit-Reset" Int of + Header reset' -> reset' + _ -> error "RateLimit-Reset header not found" + liftIO $ setLimit l remaining reset + pure $ getResponse r + +withLimitsLock :: HasCallStack => LimitsLock -> ClientM (BuildKiteHeaders b) -> ClientM b +withLimitsLock l f = do + liftIO $ checkLimit l + r <- f + respectLimits l r fetchBuilds - :: WithAuthPipeline + :: WithLockingAuthPipeline (Maybe Int -> ClientM [Build []]) -fetchBuilds = client $ Proxy @GetBuilds +fetchBuilds l ma o r mc = + withLimitsLock l $ client (Proxy @GetBuilds) ma o r mc fetchBuildsOfBranch - :: WithAuthPipeline + :: WithLockingAuthPipeline (Maybe String -> Maybe Int -> ClientM [Build []]) -fetchBuildsOfBranch = client $ Proxy @GetBuildsOfBranch +fetchBuildsOfBranch l ma o r mb mc = + withLimitsLock l $ client (Proxy @GetBuildsOfBranch) ma o r mb mc fetchArtifacts - :: WithAuthPipeline + :: WithLockingAuthPipeline (Int -> Maybe Int -> ClientM [Artifact]) -fetchArtifacts = client $ Proxy @GetArtifacts +fetchArtifacts l ma o r bn mp = do + withLimitsLock l $ client (Proxy @GetArtifacts) ma o r bn mp + +type RateLimitRemaining = Header "RateLimit-Remaining" Int +type RateLimitReset = Header "RateLimit-Reset" Int +type BuildKiteHeaders = Headers '[RateLimitRemaining, RateLimitReset] diff --git a/lib/buildkite/src/Buildkite/Artifacts/CSV.hs b/lib/buildkite/src/Buildkite/Artifacts/CSV.hs new file mode 100644 index 00000000000..9f6e954937a --- /dev/null +++ b/lib/buildkite/src/Buildkite/Artifacts/CSV.hs @@ -0,0 +1,51 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} + +module Buildkite.Artifacts.CSV + ( fetchCSVArtifactContent + ) +where + +import Prelude + +import Buildkite.API + ( GetArtifact + , WithAuthPipeline + ) +import Data.Proxy + ( Proxy (..) + ) +import Data.Text + ( Text + ) +import Network.HTTP.Media + ( (//) + ) +import Servant.API.ContentTypes + ( Accept (contentType) + , MimeRender (..) + , MimeUnrender (mimeUnrender) + ) +import Servant.Client + ( ClientM + , client + ) + +import qualified Data.ByteString.Lazy.Char8 as BL8 + +data CSV + +instance Accept CSV where + contentType _ = "text" // "csv" + +instance Show a => MimeRender CSV a where + mimeRender _ val = BL8.pack $ show val + +instance MimeUnrender CSV BL8.ByteString where + mimeUnrender _ = Right + +fetchCSVArtifactContent + :: WithAuthPipeline (Int -> Text -> Text -> ClientM BL8.ByteString) +fetchCSVArtifactContent = + client (Proxy :: Proxy (GetArtifact CSV BL8.ByteString)) diff --git a/lib/buildkite/src/Buildkite/Client.hs b/lib/buildkite/src/Buildkite/Client.hs index 3c08890a7bc..8b3f21fb9dc 100644 --- a/lib/buildkite/src/Buildkite/Client.hs +++ b/lib/buildkite/src/Buildkite/Client.hs @@ -8,7 +8,6 @@ module Buildkite.Client , JobMap , BuildJobsMap , BuildAPI - , paging , getBuilds , getBuildsOfBranch , getArtifacts @@ -24,6 +23,7 @@ import Buildkite.API , ArtifactURL (..) , Job , WithAuthPipeline + , WithLockingAuthPipeline , fetchArtifacts , fetchBuilds , fetchBuildsOfBranch @@ -32,7 +32,8 @@ import Buildkite.API , overJobs ) import Control.Monad.IO.Class - ( liftIO + ( MonadIO + , liftIO ) import Data.Map ( Map @@ -62,11 +63,13 @@ import qualified Data.ByteString.Lazy.Char8 as BL import qualified Data.Map as Map import qualified Streaming.Prelude as S +-- | An opaque containg a handle to the Buildkite API data Query = Query - { query :: forall a. ClientM a -> IO (Maybe a) - , withAuth :: forall a. WithAuthPipeline a -> a - } + { _query :: forall a. ClientM a -> IO (Maybe a) + , _withLockAuth :: forall a. WithLockingAuthPipeline a -> a + , _withAuth :: forall a. WithAuthPipeline a -> a + } type JobMap = Map Text Job @@ -74,7 +77,9 @@ type BuildJobsMap = BKAPI.Build (Map Text) type BuildAPI = BKAPI.Build [] -paging :: Monad m => (Maybe Int -> m (Maybe [a])) +paging + :: Monad m + => (Maybe Int -> m (Maybe [a])) -> Stream (Of a) m () paging f = go 1 where @@ -90,14 +95,14 @@ paging f = go 1 _ -> go $ page + 1 getBuilds :: Query -> Stream (Of BuildAPI) IO () -getBuilds (Query q w) = paging $ q . w fetchBuilds +getBuilds (Query q w _) = paging $ q . w fetchBuilds getBuildsOfBranch :: Query -> String -> Stream (Of BuildAPI) IO () -getBuildsOfBranch (Query q w) branch = +getBuildsOfBranch (Query q w _) branch = paging $ q . w fetchBuildsOfBranch (Just branch) getArtifacts :: Query -> BuildAPI -> Stream (Of (BuildJobsMap, Artifact)) IO () -getArtifacts (Query q w) build = +getArtifacts (Query q w _) build = S.map (build',) $ paging $ q . w fetchArtifacts (number build) where build' = overJobs build $ \job' -> Map.fromList $ do @@ -105,7 +110,8 @@ getArtifacts (Query q w) build = pure (jobId jobV, jobV) getArtifactsContent - :: Query + :: MonadIO m + => Query -> WithAuthPipeline ( Int -> Text @@ -114,19 +120,19 @@ getArtifactsContent ) -> BuildJobsMap -> Artifact - -> Stream (Of (BuildJobsMap, Artifact, r)) IO () -getArtifactsContent (Query q w) getArtifact build artifact = do + -> m (Maybe (BuildJobsMap, Artifact, r)) +getArtifactsContent (Query q _ w) getArtifact build artifact = do mBenchResults <- do - lift + liftIO $ q $ w getArtifact (number build) (job_id artifact) (BKAPI.artifactId artifact) - case mBenchResults of - Nothing -> pure () - Just benchResults -> S.yield (build, artifact, benchResults) + pure $ case mBenchResults of + Nothing -> Nothing + Just benchResults -> Just (build, artifact, benchResults) downloadArtifact :: ArtifactURL -> Stream (Of BL.ByteString) IO () downloadArtifact (ArtifactURL url') = do diff --git a/lib/buildkite/src/Buildkite/Connection.hs b/lib/buildkite/src/Buildkite/Connection.hs new file mode 100644 index 00000000000..8bb3f1bf223 --- /dev/null +++ b/lib/buildkite/src/Buildkite/Connection.hs @@ -0,0 +1,148 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE RankNTypes #-} + +module Buildkite.Connection + ( PipelineName (..) + , OrganizationName (..) + + -- * Connector + , Connector + , newConnector + + -- * Error handling + , HandleClientError (..) + , bailout + , skip410 + , handleClientError + ) +where + +import Prelude + +import Buildkite.Client + ( Query (..) + ) +import Buildkite.LimitsLock + ( LimitsLockLog + , newLimitsLock + ) +import Control.Tracer + ( Tracer + ) +import Data.Functor + ( (<&>) + ) +import Data.Text + ( Text + ) +import Network.HTTP.Client + ( Manager + , ManagerSettings (..) + , Request (..) + , newManager + ) +import Network.HTTP.Client.TLS + ( tlsManagerSettings + ) +import Network.HTTP.Types + ( status410 + ) +import Servant.Client + ( BaseUrl (..) + , ClientEnv + , ClientError (..) + , ClientM + , ResponseF (..) + , Scheme (..) + , mkClientEnv + , runClientM + ) +import System.Environment + ( getEnv + ) + +import qualified Data.Text as T + +buildkiteDomain :: String +buildkiteDomain = "api.buildkite.com" + +buildkitePort :: Int +buildkitePort = 443 + +-- | The name of the pipeline +newtype PipelineName = PipelineName Text + +-- | The name of the organization +newtype OrganizationName = OrganizationName Text + +-- | How to handle http client errors +newtype HandleClientError + = HandleClientError (forall a. IO (Either ClientError a) -> IO (Maybe a)) + +-- | The way to create a query type +type Connector = OrganizationName -> PipelineName -> HandleClientError -> Query + +-- | Create a new connect computer +newConnector + :: String + -- ^ Environment variable for the API token + -> Int + -- ^ The maximum remaining request per minute, before locking (< 200) + -> Tracer IO LimitsLockLog + -- ^ The tracer for the limits lock events + -> IO Connector +newConnector tokenEnvVar lockLimit lockTracer = do + apiToken <- getEnv tokenEnvVar <&> \t -> "Bearer " ++ t + manager <- newManager $ stripAuthOnRedirect tlsManagerSettings + let env = buildkiteEnv manager + runQuery :: HandleClientError -> ClientM a -> IO (Maybe a) + runQuery (HandleClientError f) action = f $ runClientM action env + limitsLock <- newLimitsLock lockTracer lockLimit + pure $ \(OrganizationName organizationSlug) (PipelineName slug) h -> + let withLockingAuthPipeline action = + action limitsLock (Just $ T.pack apiToken) organizationSlug slug + withAuthPipeline action = + action (Just $ T.pack apiToken) organizationSlug slug + in Query (runQuery h) withLockingAuthPipeline withAuthPipeline + +-- the environment for the buildkite API +buildkiteEnv :: Manager -> ClientEnv +buildkiteEnv manager = + mkClientEnv manager + $ BaseUrl Https buildkiteDomain buildkitePort "" + +-- necessary for the buildkite API on getting artifacts content (redirect to AWS) +stripAuthOnRedirect :: ManagerSettings -> ManagerSettings +stripAuthOnRedirect settings = + settings + { managerModifyRequest = \req -> do + pure + $ req + { shouldStripHeaderOnRedirect = + \case + "Authorization" -> True + _ -> False + } + } + +data SkipOrAbort = Skip | Abort + +bailout :: HandleClientError +bailout = handleClientError $ const Abort + +handleClientError :: (ClientError -> SkipOrAbort) -> HandleClientError +handleClientError g = HandleClientError $ \f -> do + res <- f + case res of + Left e -> case g e of + Abort -> error $ show e + Skip -> pure Nothing + Right a -> pure $ Just a + +skip410 :: HandleClientError +skip410 = handleClientError $ \case + FailureResponse _ (Response s _ _ _) + | s == status410 -> Skip + _ -> Abort diff --git a/lib/buildkite/src/Buildkite/LimitsLock.hs b/lib/buildkite/src/Buildkite/LimitsLock.hs new file mode 100644 index 00000000000..4844172fb35 --- /dev/null +++ b/lib/buildkite/src/Buildkite/LimitsLock.hs @@ -0,0 +1,84 @@ +{-# LANGUAGE RecordWildCards #-} + +module Buildkite.LimitsLock + ( LimitsLock (..) + , SetLimit + , LimitsLockLog (..) + , newLimitsLock + ) where + +import Prelude + +import Control.Concurrent + ( threadDelay + ) +import Control.Concurrent.STM + ( atomically + , newTVarIO + , readTVar + , retry + , writeTVar + ) +import Control.Monad + ( join + , when + ) +import Control.Tracer + ( Tracer + , traceWith + ) +import Data.Functor + ( ($>) + ) + +-- | Set the limit of requests to a resource. This specific interface is dictated +-- by buildkite's API, but it could be generalized to any rate limiting system. +type SetLimit = + Int + -- ^ Remaining requests. + -> Int + -- ^ Seconds to wait before the allowed limit is reset. + -> IO () + +-- | A lock to limit the number of requests to a resource. +data LimitsLock = LimitsLock + { setLimit :: SetLimit + -- ^ Try to lock the resource. + , checkLimit :: IO () + -- ^ Check if the resource is locked and wait until it is available. + } + +-- | Log messages for 'LimitsLock'. +newtype LimitsLockLog = RateLimitReached Int + deriving (Show) + +-- | Create a new 'LimitsLock'. To simplify the implementation, the setLimit is +-- implemented as blocking. A more sophisticated implementation could use +-- a thread to reset the lock after the time has passed. +newLimitsLock + :: Tracer IO LimitsLockLog + -- ^ Tracer for logging messages. + -> Int + -- ^ Number of remaining requests to consider as the limit to block the resource. + -> IO LimitsLock +newLimitsLock tracer safeRemaining = do + lock <- newTVarIO False + let + pass = pure () + block secs = do + traceWith tracer $ RateLimitReached secs + threadDelay $ secs * 1000000 + atomically $ writeTVar lock False + setLimit remaining secs = join $ atomically $ do + locked <- readTVar lock + if locked + then pure pass + else + if remaining < safeRemaining + then writeTVar lock True $> block secs + else pure pass + checkLimit = do + atomically $ do + locked <- readTVar lock + when locked retry + pure $ LimitsLock{..} diff --git a/scripts/buildkite/main/benchmark-history.sh b/scripts/buildkite/main/benchmark-history.sh index 3cc98f6dca5..0780f003682 100755 --- a/scripts/buildkite/main/benchmark-history.sh +++ b/scripts/buildkite/main/benchmark-history.sh @@ -6,7 +6,7 @@ set -euox pipefail mkdir -p ./benchmark-history benchmark-history \ - --since 2024-06-24 \ + --since 2024-08-25 \ --charts-dir benchmark-history # shellcheck disable=SC2295 @@ -16,5 +16,9 @@ branch="${BUILDKITE_BRANCH#release-candidate/}" # against a release candidate branch. branch_sanitized="${branch//\//-}" +mkdir -p artifacts + +mv benchmark-history/benchmark-history.csv . + # shellcheck disable=SC2086 tar -czf ./benchmark-history.${branch_sanitized}.tgz benchmark-history