aboutsummaryrefslogtreecommitdiffstats
path: root/app/History/Plan.hs
blob: 3dec3918788b375338801d1da4d6cf16d58101e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
module History.Plan
  ( Planable (..),
    formulate,
    realise,
  )
where

import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.STM (TQueue, TVar, atomically, newTQueueIO, newTVarIO, readTQueue, readTVar, retry, writeTQueue, writeTVar)
import Control.Monad (forever)
import Data.Kind (Type)
import Data.List (intercalate)
import Data.List.NonEmpty qualified as NE
import Data.Map qualified as M
import Data.Proxy (Proxy (Proxy))
import Data.Time.Clock
import Parallel (parMapM_)
import System.IO (hFlush, hPutStr, stderr, stdout)
import System.Time.Monotonic (Clock, clockGetTime, newClock)
import Text.Printf (printf)

-- | Left-associative monadic fold of a structure with automatic parallelism, status-reporting and intermittent results (cacelability).
--
-- Semantically, `realise (formulate xs)` is equivalent to
--
-- ```haskell
-- \(x:xs) -> foldM propagate (assume <$> protoOf x) =<< mapM protoOf xs
-- ```
--
-- However, this module provides the following features:
--
-- -  `protoOf` and `propagate` are computed in parallel,
-- -  `propagate` is scheduled to terminate as soon as possible,
--
-- This makes it possible to scale the computation across multiple cores, but still interrupt the process for intermittent results.
--
-- Additionally, a progress report is presented on stdout.
class Planable output where
  type Id output :: Type
  type Proto output :: Type
  protoOf :: Proxy output -> Id output -> IO (Proto output)
  assume :: Proto output -> output
  propagate :: [Id output] -> output -> Proto output -> output

data Plan output = Plan (NE.NonEmpty (Id output)) [Task output]

data Task output
  = Compute (Id output)
  | Propagate (Maybe (Id output)) (Id output)

isCompute, isPropagate :: Task output -> Bool
isCompute (Compute _) = True
isCompute _ = False
isPropagate (Propagate _ _) = True
isPropagate _ = False

formulate ::
  Planable output =>
  Proxy output ->
  NE.NonEmpty (Id output) ->
  Plan output
formulate _ ids@(NE.uncons -> (id, (maybe [] NE.toList -> rest))) =
  Plan ids $
    Compute id
      : Propagate Nothing id
      : concat
        ( zipWith
            ( \id id' ->
                [ Compute id,
                  Propagate id' id
                ]
            )
            rest
            (map Just (id : rest))
        )

data State output = State
  { tasks :: [Task output],
    protos :: M.Map (Id output) (Proto output),
    outputs :: M.Map (Id output) output,
    elapsed :: Clock
  }

realise :: (Ord (Id output), Planable output) => Plan output -> IO output
realise plan@(Plan ids tasks) = do
  elapsed <- newClock
  let state0 =
        State
          { tasks = tasks,
            protos = M.empty,
            outputs = M.empty,
            elapsed = elapsed
          }
  stateT <- newTVarIO state0
  statusT <- newTQueueIO
  tid <- forkIO $ forever do
    state <- atomically (readTQueue statusT)
    hPutStr stderr . printStatus =<< status state0 state
    hFlush stdout
  parMapM_ (step Proxy plan statusT stateT) tasks
  let id = NE.last ids
  output <- atomically $ do
    maybe retry pure . M.lookup id . (.outputs) =<< readTVar stateT
  killThread tid
  pure output

step ::
  (Ord (Id output), Planable output) =>
  Proxy output ->
  Plan output ->
  TQueue (State output) ->
  TVar (State output) ->
  Task output ->
  IO ()
step proxy _ statusT stateT (Compute id) = do
  proto <- protoOf proxy id
  atomically do
    state <- readTVar stateT
    let state' = state {protos = M.insert id proto state.protos}
    writeTVar stateT state'
    writeTQueue statusT state'
  pure ()
step _ (Plan ids _) statusT stateT (Propagate (Just id') id) = do
  (output, proto) <- atomically do
    state <- readTVar stateT
    output <- maybe retry pure (M.lookup id' state.outputs)
    proto <- maybe retry pure (M.lookup id state.protos)
    pure (output, proto)
  let output' = propagate (NE.toList ids) output proto
  atomically do
    state <- readTVar stateT
    let state' = state {outputs = M.insert id output' state.outputs}
    writeTVar stateT state'
    writeTQueue statusT state'
step _ _ statusT stateT (Propagate Nothing id) = do
  proto <- atomically do
    state <- readTVar stateT
    maybe retry pure (M.lookup id state.protos)
  let output = assume proto
  atomically do
    state <- readTVar stateT
    let state' = state {outputs = M.insert id output state.outputs}
    writeTVar stateT state'
    writeTQueue statusT state'

data Status = Status
  { numTasks :: Progress Int,
    numProtos :: Progress Int,
    numOutputs :: Progress Int,
    elapsed :: DiffTime
  }
  deriving (Show, Eq)

data Progress a = Progress
  { total :: a,
    completed :: a
  }
  deriving (Show, Eq)

status :: State output -> State output -> IO Status
status state0 state = do
  let totalTasks = length state0.tasks
      totalProtos = length (filter isCompute state0.tasks)
      totalOutputs = length (filter isPropagate state0.tasks)
      completedTasks = completedProtos + completedOutputs
      completedProtos = M.size state.protos
      completedOutputs = M.size state.outputs
  elapsed <- clockGetTime state.elapsed
  pure
    Status
      { numTasks = Progress totalTasks completedTasks,
        numProtos = Progress totalProtos completedProtos,
        numOutputs = Progress totalOutputs completedOutputs,
        elapsed = elapsed
      }

printStatus :: Status -> String
printStatus Status {..} = do
  let formatProgress completed total = pad total completed <> "/" <> total
      pad total completed = replicate (length total - length completed) ' ' <> completed
      eta =
        formatEta
          ( (fromIntegral numTasks.total * (realToFrac elapsed))
              / fromIntegral numTasks.completed
          )
  (<> "\r") . intercalate " " $
    [ formatProgress (show numTasks.completed) (show numTasks.total),
      formatProgress (show numProtos.completed) (show numProtos.total),
      formatProgress (show numOutputs.completed) (show numOutputs.total),
      "ETA " <> eta
    ]

formatEta :: Double -> String
formatEta s = do
  if s < 60
    then printf "%.1fs" s
    else do
      let m = s / 60
      if m < 60
        then printf "%.1fm" m
        else do
          let h = m / 60
          if h < 24
            then printf "%.1hf" h
            else do
              let d = h / 24
              printf "%.1fd" d