{-# LANGUAGE BlockArguments #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE NoFieldSelectors #-} {-# OPTIONS_GHC -fno-warn-name-shadowing #-} module Main where import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Data.ByteString.Char8 qualified as B import Data.ByteString.Lazy qualified as LB import Data.ByteString.Lazy.UTF8 qualified as LB import Data.ByteString.UTF8 qualified as B import Data.List.Split import Data.Map.Merge.Strict qualified as M import Data.Map.Strict qualified as M import Data.Maybe import Data.String import Data.String.Interpolate (i) import Data.Tagged import Data.Text qualified as T import Git import Git.Libgit2 import System.Directory import System.Environment import System.FilePath import System.INotify import System.IO import System.IO.Temp import System.IO.Unsafe import System.Process.Typed import Text.Printf stateDirectory :: FilePath stateDirectory = unsafePerformIO (getEnv "ABUILDER_STATE") ref :: String ref = fromMaybe "main" (unsafePerformIO (lookupEnv "ABUILDER_BRANCH")) type JobName = String type Url = FilePath urls :: M.Map JobName Url urls = M.fromList . map ((,) <$> takeBaseName <*> id) $ splitOn ":" (unsafePerformIO (getEnv "ABUILDER_URLS")) concurrentBuilders :: Int concurrentBuilders = 2 type DesiredOutputs = M.Map JobName CommitHash type CommitHash = String type ActualOutputs = M.Map JobName CommitHash data BuildJobs = BuildJobs { pendingBuilds :: M.Map JobName CommitHash, runningBuilds :: M.Map JobName CommitHash } deriving (Show) diff :: DesiredOutputs -> ActualOutputs -> BuildJobs diff desiredOutputs actualOutputs = BuildJobs { pendingBuilds = M.merge M.preserveMissing M.dropMissing ( M.zipWithMaybeMatched ( \_ actualCommit desiredCommit -> if desiredCommit /= actualCommit then Just desiredCommit else Nothing ) ) desiredOutputs actualOutputs, runningBuilds = M.empty } replaceBuildJobs :: BuildJobs -> BuildJobs -> BuildJobs replaceBuildJobs oldBuildJobs newBuildJobs = BuildJobs { pendingBuilds = M.differenceWithKey ( \_ pendingCommit runningCommit -> if pendingCommit /= runningCommit then Just pendingCommit else Nothing ) newBuildJobs.pendingBuilds oldBuildJobs.runningBuilds, runningBuilds = oldBuildJobs.runningBuilds } data BuildJob = BuildJob { jobName :: JobName, commitHash :: CommitHash } deriving (Show) obtainBuildJob :: BuildJobs -> (Maybe BuildJob, BuildJobs) obtainBuildJob buildJobs = do case uncurry BuildJob <$> M.lookupMin buildJobs.pendingBuilds of Just buildJob@(BuildJob {jobName, commitHash}) -> ( Just buildJob, buildJobs { pendingBuilds = M.delete jobName buildJobs.pendingBuilds, runningBuilds = M.insert jobName commitHash buildJobs.runningBuilds } ) Nothing -> (Nothing, buildJobs) completeBuildJob :: BuildJob -> BuildJobs -> BuildJobs completeBuildJob (BuildJob {jobName, commitHash}) buildJobs = buildJobs { runningBuilds = M.filterWithKey ( \jobName' commitHash' -> jobName' /= jobName || commitHash' /= commitHash ) buildJobs.runningBuilds } data Builder = Builder Int deriving (Show) data LogEntry = BuildEntry { builder :: Builder, buildJob :: BuildJob, payload :: LB.ByteString } deriving (Show) main :: IO () main = do hSetBuffering stderr LineBuffering hSetBuffering stdout LineBuffering inotify <- initINotify desiredOutputsT <- newTVarIO M.empty actualOutputsT <- newTVarIO M.empty buildJobsT <- newTVarIO (BuildJobs M.empty M.empty) logs <- newTQueueIO createDirectoryIfMissing True stateDirectory setCurrentDirectory stateDirectory mapM_ (forkIO . builder logs buildJobsT) (map Builder [1 .. concurrentBuilders]) mapM_ (uncurry (watch inotify desiredOutputsT)) (M.toList urls) _ <- forkIO (scheduler desiredOutputsT actualOutputsT buildJobsT) forever do log <- atomically $ readTQueue logs case log of BuildEntry _ (BuildJob {jobName}) (LB.toString -> payload) -> printf "[%s] %s\n" jobName payload scheduler :: TVar DesiredOutputs -> TVar ActualOutputs -> TVar BuildJobs -> IO () scheduler desiredOutputsT actualOutputsT buildJobsT = do lastDesiredOutputsT <- newTVarIO Nothing forever $ atomically do lastDesiredOutputs <- readTVar lastDesiredOutputsT desiredOutputs <- readTVar desiredOutputsT check (Just desiredOutputs /= lastDesiredOutputs) actualOutputs <- readTVar actualOutputsT let buildJobs' = diff desiredOutputs actualOutputs buildJobs <- readTVar buildJobsT writeTVar buildJobsT (replaceBuildJobs buildJobs buildJobs') writeTVar lastDesiredOutputsT (Just desiredOutputs) builder :: TQueue LogEntry -> TVar BuildJobs -> Builder -> IO () builder logs buildJobsT builder = forever ( do buildJob <- atomically do buildJobs <- readTVar buildJobsT let (maybeBuildJob, buildJobs') = obtainBuildJob buildJobs check (isJust maybeBuildJob) writeTVar buildJobsT buildJobs' pure (fromJust maybeBuildJob) build logs builder buildJob `catch` ( \(e :: SomeException) -> do print e ) ) build :: TQueue LogEntry -> Builder -> BuildJob -> IO () build logs builder buildJob@(BuildJob {jobName, commitHash}) = do let url = urls M.! jobName rev = commitHash refDir = jobName ref tmpDir = jobName <> "-" <> rev exitCodeT <- newEmptyTMVarIO _ <- flip forkFinally (atomically . putTMVar exitCodeT) do withSystemTempDirectory tmpDir $ \tmpDir -> do writeFile (tmpDir "default.nix") $ [i| import (fetchGit { url = "#{url}"; ref = "#{ref}"; rev = "#{rev}"; }) |] ((B.toString . B.strip . LB.toStrict) -> drv, LB.lines -> err) <- readProcess_ (setWorkingDir tmpDir "nix-instantiate") mapM_ ( atomically . writeTQueue logs . BuildEntry builder buildJob ) err ((B.toString . B.strip . LB.toStrict) -> res, LB.lines -> err) <- readProcess_ ( setWorkingDir tmpDir (fromString (printf "nix-store --realise '%s'" drv)) ) mapM_ ( atomically . writeTQueue logs . BuildEntry builder buildJob ) err pure res exitCode <- atomically $ takeTMVar exitCodeT case exitCode of Left e -> throw e Right nixDir -> do createDirectoryIfMissing True jobName runProcess_ (fromString (printf "nix-store --add-root '%s' --realise '%s'" refDir nixDir)) watch :: INotify -> TVar DesiredOutputs -> JobName -> Url -> IO () watch inotify desiredOutputsT jobName url = do let bareFp = url "refs/heads" nonBareFp = url ".git/refs/heads" isBare <- doesDirectoryExist bareFp _ <- addWatch inotify [ Modify, MoveIn ] (B.fromString (if isBare then bareFp else nonBareFp)) $ \e -> do let isChange = case e of System.INotify.Modified _ (Just (B.toString -> filePath)) -> filePath == ref System.INotify.MovedIn False (B.toString -> filePath) _ -> filePath == ref _ -> False when isChange do updateDesiredOutputs updateDesiredOutputs where updateDesiredOutputs = do rev <- withRepository lgFactory url do Just cid <- resolveReference ("refs/heads/" <> T.pack ref) show . untag . (.commitOid) <$> lookupCommit (Tagged cid) atomically do desiredOutputs <- readTVar desiredOutputsT writeTVar desiredOutputsT (M.insert jobName rev desiredOutputs)