From 8a2d06011a1b74c17963b2715b11d0ca7ba202a8 Mon Sep 17 00:00:00 2001 From: Alexander Foremny Date: Tue, 20 Feb 2024 04:05:43 +0100 Subject: init --- app/Main.hs | 261 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 261 insertions(+) create mode 100644 app/Main.hs (limited to 'app') diff --git a/app/Main.hs b/app/Main.hs new file mode 100644 index 0000000..4765405 --- /dev/null +++ b/app/Main.hs @@ -0,0 +1,261 @@ +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE NoFieldSelectors #-} +{-# OPTIONS_GHC -fno-warn-name-shadowing #-} + +module Main where + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Data.ByteString.Lazy qualified as LB +import Data.ByteString.Lazy.UTF8 qualified as LB +import Data.ByteString.UTF8 qualified as B +import Data.List.Split +import Data.Map.Merge.Strict qualified as M +import Data.Map.Strict qualified as M +import Data.Maybe +import Data.String.Interpolate (i) +import Data.Tagged +import Data.Text qualified as T +import Git +import Git.Libgit2 +import System.Directory +import System.Environment +import System.FilePath +import System.INotify +import System.IO +import System.IO.Temp +import System.IO.Unsafe +import System.Process.Typed +import Text.Printf + +stateDirectory :: FilePath +stateDirectory = unsafePerformIO (getEnv "ABUILDER_STATE") + +ref :: String +ref = fromMaybe "main" (unsafePerformIO (lookupEnv "ABUILDER_BRANCH")) + +type JobName = String + +type Url = FilePath + +urls :: M.Map JobName Url +urls = + M.fromList . map ((,) <$> takeBaseName <*> id) $ + splitOn ":" (unsafePerformIO (getEnv "ABUILDER_URLS")) + +concurrentBuilders :: Int +concurrentBuilders = 2 + +type DesiredOutputs = M.Map JobName CommitHash + +type CommitHash = String + +type ActualOutputs = M.Map JobName CommitHash + +data BuildJobs = BuildJobs + { pendingBuilds :: M.Map JobName CommitHash, + runningBuilds :: M.Map JobName CommitHash + } + deriving (Show) + +diff :: DesiredOutputs -> ActualOutputs -> BuildJobs +diff desiredOutputs actualOutputs = + BuildJobs + { pendingBuilds = + M.merge + M.preserveMissing + M.dropMissing + ( M.zipWithMaybeMatched + ( \_ actualCommit desiredCommit -> + if desiredCommit /= actualCommit + then Just desiredCommit + else Nothing + ) + ) + desiredOutputs + actualOutputs, + runningBuilds = M.empty + } + +replaceBuildJobs :: BuildJobs -> BuildJobs -> BuildJobs +replaceBuildJobs oldBuildJobs newBuildJobs = + BuildJobs + { pendingBuilds = + M.differenceWithKey + ( \_ pendingCommit runningCommit -> + if pendingCommit /= runningCommit + then Just pendingCommit + else Nothing + ) + newBuildJobs.pendingBuilds + oldBuildJobs.runningBuilds, + runningBuilds = oldBuildJobs.runningBuilds + } + +data BuildJob = BuildJob + { jobName :: JobName, + commitHash :: CommitHash + } + deriving (Show) + +obtainBuildJob :: BuildJobs -> (Maybe BuildJob, BuildJobs) +obtainBuildJob buildJobs = do + case uncurry BuildJob <$> M.lookupMin buildJobs.pendingBuilds of + Just buildJob@(BuildJob {jobName, commitHash}) -> + ( Just buildJob, + buildJobs + { pendingBuilds = M.delete jobName buildJobs.pendingBuilds, + runningBuilds = M.insert jobName commitHash buildJobs.runningBuilds + } + ) + Nothing -> + (Nothing, buildJobs) + +completeBuildJob :: BuildJob -> BuildJobs -> BuildJobs +completeBuildJob (BuildJob {jobName, commitHash}) buildJobs = + buildJobs + { runningBuilds = + M.filterWithKey + ( \jobName' commitHash' -> + jobName' /= jobName || commitHash' /= commitHash + ) + buildJobs.runningBuilds + } + +data Builder = Builder Int + deriving (Show) + +data LogEntry = BuildEntry + { builder :: Builder, + buildJob :: BuildJob, + payload :: LB.ByteString + } + deriving (Show) + +main :: IO () +main = do + hSetBuffering stderr LineBuffering + hSetBuffering stdout LineBuffering + inotify <- initINotify + desiredOutputsT <- newTVarIO M.empty + actualOutputsT <- newTVarIO M.empty + buildJobsT <- newTVarIO (BuildJobs M.empty M.empty) + logs <- newTQueueIO + createDirectoryIfMissing True stateDirectory + setCurrentDirectory stateDirectory + mapM_ + (forkIO . builder logs buildJobsT) + (map Builder [1 .. concurrentBuilders]) + mapM_ (uncurry (watch inotify desiredOutputsT)) (M.toList urls) + _ <- forkIO (scheduler desiredOutputsT actualOutputsT buildJobsT) + forever do + log <- atomically $ readTQueue logs + case log of + BuildEntry _ (BuildJob {jobName}) (LB.toString -> payload) -> + printf "[%s] %s\n" jobName payload + +scheduler :: + TVar DesiredOutputs -> + TVar ActualOutputs -> + TVar BuildJobs -> + IO () +scheduler desiredOutputsT actualOutputsT buildJobsT = do + lastDesiredOutputsT <- newTVarIO Nothing + forever $ atomically do + lastDesiredOutputs <- readTVar lastDesiredOutputsT + desiredOutputs <- readTVar desiredOutputsT + check (Just desiredOutputs /= lastDesiredOutputs) + actualOutputs <- readTVar actualOutputsT + let buildJobs' = diff desiredOutputs actualOutputs + buildJobs <- readTVar buildJobsT + writeTVar buildJobsT (replaceBuildJobs buildJobs buildJobs') + writeTVar lastDesiredOutputsT (Just desiredOutputs) + +builder :: TQueue LogEntry -> TVar BuildJobs -> Builder -> IO () +builder logs buildJobsT builder = + forever + ( do + buildJob <- atomically do + buildJobs <- readTVar buildJobsT + let (maybeBuildJob, buildJobs') = obtainBuildJob buildJobs + check (isJust maybeBuildJob) + writeTVar buildJobsT buildJobs' + pure (fromJust maybeBuildJob) + + build logs builder buildJob + `catch` ( \(e :: SomeException) -> do + print e + ) + ) + +build :: TQueue LogEntry -> Builder -> BuildJob -> IO () +build logs builder buildJob@(BuildJob {jobName, commitHash}) = do + let url = urls M.! jobName + rev = commitHash + refDir = jobName ref + tmpDir = jobName <> "-" <> rev + exitCodeT <- newEmptyTMVarIO + _ <- + flip forkFinally (atomically . putTMVar exitCodeT) do + withSystemTempDirectory tmpDir $ \tmpDir -> do + writeFile (tmpDir "default.nix") $ + [i| + import + (fetchGit { + url = "#{url}"; + ref = "#{ref}"; + rev = "#{rev}"; + }) + |] + (LB.lines -> out) <- + readProcessInterleaved_ + (setWorkingDir tmpDir "nix-build .") + mapM_ + ( atomically + . writeTQueue logs + . BuildEntry builder buildJob + ) + out + getSymbolicLinkTarget (tmpDir "result") + exitCode <- atomically $ takeTMVar exitCodeT + case exitCode of + Left e -> throw e + Right nixDir -> do + createDirectoryIfMissing True jobName + _ <- try @SomeException (removeDirectoryLink refDir) + createDirectoryLink nixDir refDir + +watch :: INotify -> TVar DesiredOutputs -> JobName -> Url -> IO () +watch inotify desiredOutputsT jobName url = do + let bareFp = url "refs/heads" + nonBareFp = url ".git/refs/heads" + isBare <- doesDirectoryExist bareFp + _ <- addWatch + inotify + [ Modify, + MoveIn + ] + (B.fromString (if isBare then bareFp else nonBareFp)) + $ \e -> do + let isChange = + case e of + System.INotify.Modified _ (Just (B.toString -> filePath)) -> filePath == ref + System.INotify.MovedIn False (B.toString -> filePath) _ -> filePath == ref + _ -> False + when isChange do + updateDesiredOutputs + updateDesiredOutputs + where + updateDesiredOutputs = do + rev <- withRepository lgFactory url do + Just cid <- resolveReference ("refs/heads/" <> T.pack ref) + show . untag . (.commitOid) <$> lookupCommit (Tagged cid) + atomically do + desiredOutputs <- readTVar desiredOutputsT + writeTVar desiredOutputsT (M.insert jobName rev desiredOutputs) -- cgit v1.2.3