{-# LANGUAGE BlockArguments #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE NoFieldSelectors #-} {-# OPTIONS_GHC -fno-warn-name-shadowing #-} module Main where import Control.Arrow ((***)) import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Data.ByteString.Char8 qualified as B import Data.ByteString.Lazy qualified as LB import Data.ByteString.UTF8 qualified as B (fromString, toString) import Data.List import Data.List.Split import Data.Map.Merge.Strict qualified as M import Data.Map.Strict qualified as M import Data.Maybe import Data.String import Data.String.Interpolate (i) import Data.Tagged import Data.Text qualified as T import Debug.Trace import Git import Git.Libgit2 import Prettyprinter import Prettyprinter.Render.Terminal import System.Directory import System.Environment import System.FilePath import System.INotify import System.IO import System.IO.Temp import System.IO.Unsafe import System.Process (CmdSpec (..)) import System.Process.Typed import System.Process.Typed.Internal import Text.Printf stateDirectory :: FilePath stateDirectory = unsafePerformIO (getEnv "ABUILDER_STATE") ref :: String ref = fromMaybe "main" (unsafePerformIO (lookupEnv "ABUILDER_BRANCH")) type JobName = String type Url = FilePath urls :: M.Map JobName Url urls = M.fromList . map ((,) <$> takeBaseName <*> id) $ splitOn ":" (unsafePerformIO (getEnv "ABUILDER_URLS")) concurrentBuilders :: Int concurrentBuilders = 2 logLevel :: Importance logLevel = maybe Info (toEnum . read) $ unsafePerformIO (lookupEnv "ABUILDER_LOG_LEVEL") type DesiredOutputs = M.Map JobName CommitHash type CommitHash = String type ActualOutputs = M.Map JobName CommitHash data BuildJobs = BuildJobs { pendingBuilds :: M.Map JobName CommitHash, runningBuilds :: M.Map JobName CommitHash } deriving (Show) diff :: DesiredOutputs -> ActualOutputs -> BuildJobs diff desiredOutputs actualOutputs = BuildJobs { pendingBuilds = M.merge M.preserveMissing M.dropMissing ( M.zipWithMaybeMatched ( \_ actualCommit desiredCommit -> if desiredCommit /= actualCommit then Just desiredCommit else Nothing ) ) desiredOutputs actualOutputs, runningBuilds = M.empty } replaceBuildJobs :: BuildJobs -> BuildJobs -> BuildJobs replaceBuildJobs oldBuildJobs newBuildJobs = BuildJobs { pendingBuilds = M.differenceWithKey ( \_ pendingCommit runningCommit -> if pendingCommit /= runningCommit then Just pendingCommit else Nothing ) newBuildJobs.pendingBuilds oldBuildJobs.runningBuilds, runningBuilds = oldBuildJobs.runningBuilds } data BuildJob = BuildJob { jobName :: JobName, commitHash :: CommitHash } deriving (Show) obtainBuildJob :: BuildJobs -> (Maybe BuildJob, BuildJobs) obtainBuildJob buildJobs = do case uncurry BuildJob <$> M.lookupMin buildJobs.pendingBuilds of Just buildJob@(BuildJob {jobName, commitHash}) -> ( Just buildJob, buildJobs { pendingBuilds = M.delete jobName buildJobs.pendingBuilds, runningBuilds = M.insert jobName commitHash buildJobs.runningBuilds } ) Nothing -> (Nothing, buildJobs) completeBuildJob :: BuildJob -> BuildJobs -> BuildJobs completeBuildJob (BuildJob {jobName, commitHash}) buildJobs = buildJobs { runningBuilds = M.filterWithKey ( \jobName' commitHash' -> jobName' /= jobName || commitHash' /= commitHash ) buildJobs.runningBuilds } data Builder = Builder Int deriving (Show) data LogEntry = LogEntry { component :: String, importance :: Importance, message :: Doc AnsiStyle } deriving (Show) data Importance = Error | Info | Warning | Debug deriving (Show, Eq, Ord, Enum) main :: IO () main = do hSetBuffering stderr LineBuffering hSetBuffering stdout LineBuffering inotify <- initINotify desiredOutputsT <- newTVarIO M.empty actualOutputsT <- newTVarIO M.empty buildJobsT <- newTVarIO (BuildJobs M.empty M.empty) logs <- newTQueueIO createDirectoryIfMissing True stateDirectory setCurrentDirectory stateDirectory mapM_ (\_ -> forkIO (builder logs buildJobsT)) (map Builder [1 .. concurrentBuilders]) mapM_ (uncurry (watch inotify logs desiredOutputsT)) (M.toList urls) _ <- forkIO (scheduler desiredOutputsT actualOutputsT buildJobsT) forever do log <- atomically $ readTQueue logs case log of LogEntry {component, message, importance} -> when (importance <= logLevel) do let c = case importance of Error -> Red Info -> White Warning -> Yellow Debug -> Black putDoc . annotate (color c) $ annotate bold (brackets (pretty component)) <+> message <> hardline scheduler :: TVar DesiredOutputs -> TVar ActualOutputs -> TVar BuildJobs -> IO () scheduler desiredOutputsT actualOutputsT buildJobsT = do lastDesiredOutputsT <- newTVarIO Nothing forever $ atomically do lastDesiredOutputs <- readTVar lastDesiredOutputsT desiredOutputs <- readTVar desiredOutputsT check (Just desiredOutputs /= lastDesiredOutputs) actualOutputs <- readTVar actualOutputsT let buildJobs' = diff desiredOutputs actualOutputs buildJobs <- readTVar buildJobsT writeTVar buildJobsT (replaceBuildJobs buildJobs buildJobs') writeTVar lastDesiredOutputsT (Just desiredOutputs) builder :: TQueue LogEntry -> TVar BuildJobs -> IO () builder logs buildJobsT = forever ( do buildJob <- atomically do buildJobs <- readTVar buildJobsT let (maybeBuildJob, buildJobs') = obtainBuildJob buildJobs check (isJust maybeBuildJob) writeTVar buildJobsT buildJobs' pure (fromJust maybeBuildJob) build logs buildJob `catch` ( \(e :: SomeException) -> do print e ) ) build :: TQueue LogEntry -> BuildJob -> IO () build logs (BuildJob {jobName, commitHash}) = do let url = urls M.! jobName rev = commitHash refDir = jobName ref tmpDir = jobName <> "-" <> rev log_ logs jobName Info [printf "building commit %s" rev] exitCodeT <- newEmptyTMVarIO _ <- flip forkFinally (atomically . putTMVar exitCodeT) do withSystemTempDirectory tmpDir $ \tmpDir -> do writeFile (tmpDir "default.nix") $ [i| import (fetchGit { url = "#{url}"; ref = "#{ref}"; rev = "#{rev}"; }) |] drv <- sh logs jobName (setWorkingDir tmpDir "nix-instantiate") res <- sh logs jobName . setWorkingDir tmpDir $ fromString (printf "nix-store --realise '%s'" drv) pure res exitCode <- atomically $ takeTMVar exitCodeT case exitCode of Left e -> do log_ logs jobName Error [printf "failed to build commit %s" rev] throw e Right nixDir -> do log_ logs jobName Info [printf "built commit %s" rev] createDirectoryIfMissing True jobName runProcess_ (fromString (printf ">/dev/null nix-store --add-root '%s' --realise '%s'" refDir nixDir)) watch :: INotify -> TQueue LogEntry -> TVar DesiredOutputs -> JobName -> Url -> IO () watch inotify logs desiredOutputsT jobName url = do let bareFp = url "refs/heads" nonBareFp = url ".git/refs/heads" isBare <- doesDirectoryExist bareFp _ <- addWatch inotify [ Modify, MoveIn ] (B.fromString (if isBare then bareFp else nonBareFp)) $ \e -> do let isChange = case e of System.INotify.Modified _ (Just (B.toString -> filePath)) -> filePath == ref System.INotify.MovedIn False (B.toString -> filePath) _ -> filePath == ref _ -> False when isChange do updateDesiredOutputs log_ logs jobName Info [printf "watching %s" url] updateDesiredOutputs where updateDesiredOutputs = do rev <- withRepository lgFactory url do Just cid <- resolveReference ("refs/heads/" <> T.pack ref) show . untag . (.commitOid) <$> lookupCommit (Tagged cid) atomically do desiredOutputs <- readTVar desiredOutputsT writeTVar desiredOutputsT (M.insert jobName rev desiredOutputs) log_ logs jobName Info [printf "queueing commit %s" rev] log_ :: TQueue LogEntry -> String -> Importance -> [String] -> IO () log_ logs component importance messages = atomically do mapM_ ( writeTQueue logs . LogEntry component importance . pretty ) messages sh :: TQueue LogEntry -> String -> ProcessConfig stdin stdoutIgnored stderrIgnored -> IO String sh logs component proc = do log_ logs component Debug $ [ printf "+ %s" ( case proc.pcCmdSpec of RawCommand bin args -> intercalate " " (bin : args) ShellCommand s -> s ) ] (out, err) <- ( B.toString . B.strip . LB.toStrict *** map (B.toString . B.strip) . B.lines . LB.toStrict ) <$> readProcess_ proc log_ logs component Warning err pure out debug :: Show a => String -> a -> a debug s x = trace (printf "%s: %s\n" s (show x)) x