1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
|
module Data.Sensor where
import Control.Applicative
import Control.Arrow
import Control.Category (Category (..))
import Control.Category qualified as C
import Control.Concurrent.STM (retry)
import Control.Exception (AsyncException (ThreadKilled))
import Control.Exception qualified as E
import Control.Monad
import Control.Monad.Reader
import Data.Dynamic
import Data.List
import Data.Map (Map)
import Data.Map qualified as M
import Data.Maybe
import Data.Set (Set)
import Data.Set qualified as S
import UnliftIO
import UnliftIO.Concurrent hiding (yield)
import Prelude hiding (id, (.))
newtype Sensor m a b = Sensor'
{ unSensor :: MSF (SensorT m) (Stale (Value a)) (Stale (Value b))
}
mkSensor :: (Monad m) => MSF (SensorT m) (Value a) (Stale (Value b)) -> Sensor m a b
mkSensor = Sensor' . ignoreStaleInput
ignoreStaleInput ::
(Monad m) =>
MSF m (Value a) (Stale (Value b)) ->
MSF m (Stale (Value a)) (Stale (Value b))
ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do
case (i, x') of
(Stale _, Just x) -> returnA -< (Stale x, x')
_ -> do
x <- sf -< i.unwrap
returnA -< (x, Just x.unwrap)
instance (Monad m) => Functor (Sensor m a) where
f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh)
(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b)
f <$$> x = fmap f <$> x
infixl 4 <$$>
instance (Monad m) => Applicative (Sensor m a) where
pure x = mkSensor do loop Fresh
where
loop stale =
MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale)
f <*> x =
mkSensor
( liftA2
(combineStaleValue ($))
(f.unSensor <<< arr Fresh)
(x.unSensor <<< arr Fresh)
)
instance (Monad m) => Category (Sensor m) where
id = mkSensor (C.id <<< arr Fresh)
sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh))
instance (Monad m) => Arrow (Sensor m) where
arr f = mkSensor (arr (f <$$>) <<< arr Fresh)
first :: Sensor m a b -> Sensor m (a, d) (b, d)
first sf =
mkSensor
( arr (\(x, d) -> (,d) <$$> x)
<<< first (sf.unSensor <<< arr Fresh)
<<< arr (\x -> (fst <$> x, snd (x.unwrap)))
)
feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b
feedbackS c sf =
mkSensor . feedback c $
arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap)))
<<< (sf.unSensor <<< arr Fresh)
<<< arr (\(x, d) -> (,d) <$> x)
data Stale a
= Stale {unwrap :: a}
| Fresh {unwrap :: a}
| Input {unwrap :: a}
deriving (Functor, Show)
instance (Monad m) => Monad (Sensor m a) where
m >>= f =
mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v ->
(combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh)
switchOnFresh ::
(Monad m) =>
MSF m a (Stale (Value b)) ->
MSF m a (Stale (Value b), Bool)
switchOnFresh =
fmap
( \v ->
( v,
case v of
Fresh _ -> True
_ -> False
)
)
switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b
switch m f = feedback Nothing $ go m f
where
go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b))
go m f = MSF $ \(i, k') -> do
((x, me), m') <- m.unMSF i
let k = case (me, k') of
(False, Just k) -> k
_ -> f x
(z, k') <- k.unMSF i
pure ((z, (Just k')), go m' f)
data Value a = Value
{ dependsOn :: Set String,
atTick :: Map String Int,
unwrap :: a
}
deriving (Show, Functor, Eq)
combineStaleValue ::
(a -> b -> c) ->
Stale (Value a) ->
Stale (Value b) ->
Stale (Value c)
combineStaleValue f =
combineStale (combineValue f)
combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c
combineValue f a b =
Value
(S.union a.dependsOn b.dependsOn)
(M.unionWith max a.atTick b.atTick)
(f a.unwrap b.unwrap)
combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c
combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap)
combineStale f a b = Fresh (f a.unwrap b.unwrap)
class (MonadIO m, Show s, Typeable a) => Aggregate m s a | s -> a where
aggregate :: s -> AggregateT s (SensorT m) ()
newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT AggregateE m a}
deriving instance (Functor m) => Functor (AggregateT s m)
deriving instance (Applicative m) => Applicative (AggregateT s m)
deriving instance (Monad m) => Monad (AggregateT s m)
deriving instance (MonadIO m) => MonadIO (AggregateT s m)
instance MonadTrans (AggregateT s) where
lift = AggregateT . lift
deriving instance (MonadFail m) => MonadFail (AggregateT s m)
deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m)
deriving instance (Monad m) => MonadReader AggregateE (AggregateT s m)
-- XXX aggregates should be keyed on something else than `String`
data AggregateE = AggregateE
{ readAggregatesM :: TMVar (S.Set String),
currentTickM :: TMVar Int,
localQueueT :: TChan (),
self :: String
}
yield :: (Aggregate m s a) => a -> AggregateT s (SensorT m) ()
yield x = do
SensorE {liveAggregatesM, globalQueueT} <- lift ask
AggregateE {readAggregatesM, currentTickM, self} <- ask
atomically do
writeTChan globalQueueT ()
liveAggregates <- readTMVar liveAggregatesM
valueT <- do
case M.lookup self liveAggregates of
Nothing -> retry
Just LiveAggregate {valueT} -> pure valueT
dependsOn <- takeTMVar readAggregatesM
putTMVar readAggregatesM S.empty
currentTick <- takeTMVar currentTickM
putTMVar currentTickM (currentTick + 1)
let atTick = M.singleton self currentTick
let v = Value (S.insert self dependsOn) atTick (toDyn x)
writeTVar valueT (Just v)
sensor :: (Aggregate m s a) => s -> Sensor m () a
sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do
SensorE {liveAggregatesM, startAggregateT} <- ask
atomically do writeTQueue startAggregateT (AnyAggregate s)
atomically do
liveAggregates <- readTMVar liveAggregatesM
valueT <- case M.lookup (show s) liveAggregates of
Nothing -> retry
Just LiveAggregate {valueT} -> pure valueT
v <-
readTVar valueT >>= \case
Nothing -> retry
Just v -> pure v
let stale
| (isNothing v' || (Just (const () <$> v) /= v')) = Fresh
| otherwise = Stale
pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v))
data Restart = Restart String SomeException deriving (Show)
instance Exception Restart
-- XXX merge `sample'` and `sample`
sample' :: (MonadUnliftIO m) => (Maybe c -> a -> m c) -> Sensor m () a -> m ()
sample' f s = runSensorT do
SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- ask
outputT <- newTVarIO Nothing
_ <- forkIO $ forever do
startAggregate =<< atomically do readTQueue startAggregateT
h <-
reactInit
( ( arrM $ \v -> do
deadAggregates <- atomically do
case v of
Input _ -> error "Input"
Stale _ -> do
writeTVar outputT Nothing
pure M.empty
Fresh v -> do
writeTVar outputT (Just v.unwrap)
liveAggregates <- readTMVar liveAggregatesM
globalTick <- takeTMVar globalTickM
putTMVar globalTickM (globalTick + 1)
pure
( M.filterWithKey
( \key liveAggregate ->
not (S.member key v.dependsOn)
&& liveAggregate.spawnedAt < globalTick
)
liveAggregates
)
mapM_
( \liveAggregate -> do
liftIO (killThread (liveAggregate.threadId))
)
deadAggregates
)
<<< s.unSensor
<<< arr (Input . Value S.empty M.empty)
)
let loop c = do
mask $ \restore ->
catch
( restore do
react h
c' <-
maybe (pure c) (lift . fmap Just . f c)
=<< atomically do readTVar outputT
_ <- atomically do readTChan globalQueueT
pure c'
)
(\(Restart _ _) -> pure c)
>>= loop
loop Nothing
runSensorT :: (MonadIO m) => SensorT m a -> m a
runSensorT m = do
runReaderT m.unSensorT =<< sensorE
sensorE :: (MonadIO m) => m (SensorE m)
sensorE = do
liveAggregatesM <- newTMVarIO M.empty
startAggregateT <- newTQueueIO
globalQueueT <- newTChanIO
globalTickM <- newTMVarIO 0
mainThread <- myThreadId
pure SensorE {..}
sample :: (MonadUnliftIO m) => Int -> Sensor m () a -> m [a]
sample n s = runSensorT do
SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- ask
outputsT <- newTVarIO []
_ <- forkIO $ forever do
startAggregate =<< atomically do readTQueue startAggregateT
h <-
reactInit
( ( arrM $ \v -> do
case v of
Input _ -> error "Input"
Stale _ -> pure ()
Fresh v -> do
deadAggregates <- atomically do
writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT
liveAggregates <- readTMVar liveAggregatesM
globalTick <- takeTMVar globalTickM
putTMVar globalTickM (globalTick + 1)
pure
( M.filterWithKey
( \key liveAggregate ->
not (S.member key v.dependsOn)
&& liveAggregate.spawnedAt < globalTick
)
liveAggregates
)
mapM_ (liftIO . killThread . (.threadId)) deadAggregates
)
<<< s.unSensor
<<< arr (Input . Value S.empty M.empty)
)
let loop = do
react h
xs <- atomically do readTVar outputsT
if length xs >= n
then pure (reverse xs)
else do
_ <- atomically do readTChan globalQueueT
loop
loop
newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE m) m a}
deriving instance (Functor m) => Functor (SensorT m)
deriving instance (Applicative m) => Applicative (SensorT m)
deriving instance (Monad m) => Monad (SensorT m)
deriving instance (MonadIO m) => MonadIO (SensorT m)
instance MonadTrans SensorT where
lift = SensorT . lift
deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m)
deriving instance (MonadFail m) => MonadFail (SensorT m)
deriving instance (Monad m) => MonadReader (SensorE m) (SensorT m)
data SensorE m = SensorE
{ liveAggregatesM :: TMVar (Map String (LiveAggregate Dynamic)),
startAggregateT :: TQueue (AnyAggregate m),
globalQueueT :: TChan (),
globalTickM :: TMVar Int,
mainThread :: ThreadId
}
data AnyAggregate m where
AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m
data LiveAggregate a = LiveAggregate
{ valueT :: TVar (Maybe (Value a)),
threadId :: ThreadId,
spawnedAt :: Int
}
startAggregate :: (MonadUnliftIO m) => AnyAggregate m -> SensorT m ()
startAggregate (AnyAggregate s) = do
let self = show s
SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- ask
liveAggregates <- atomically do takeTMVar liveAggregatesM
if M.member self liveAggregates
then atomically do putTMVar liveAggregatesM liveAggregates
else do
readAggregatesM <- newTMVarIO S.empty
currentTickM <- newTMVarIO 0
localQueueT <- atomically do dupTChan globalQueueT
threadId <-
forkFinally
(runReaderT (aggregate s).unAggregateT AggregateE {..})
( either
( \e ->
if
| fromException e == Just ThreadKilled -> do
dropAggregate self
| otherwise -> do
dropAggregate self
liftIO (E.throwTo mainThread (Restart self e))
)
pure
)
valueT <- newTVarIO Nothing
atomically do
spawnedAt <- readTMVar globalTickM
putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates)
dropAggregate :: (MonadIO m) => String -> SensorT m ()
dropAggregate self = do
SensorE {liveAggregatesM} <- ask
atomically do
liveAggregates <- takeTMVar liveAggregatesM
putTMVar liveAggregatesM (M.delete self liveAggregates)
-- XXX we should depend on `dunai`
data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)}
instance (Monad m) => Category (MSF m) where
id = go
where
go = MSF $ \a -> return (a, go)
sf2 . sf1 = MSF $ \a -> do
(b, sf1') <- unMSF sf1 a
(c, sf2') <- unMSF sf2 b
let sf' = sf2' . sf1'
c `seq` return (c, sf')
instance (Monad m) => Arrow (MSF m) where
arr f = arrM (return . f)
first =
morphGS $ \f (a, c) -> do
(b, msf') <- f a
return ((b, c), msf')
instance (Monad m) => Functor (MSF m a) where
fmap f msf = msf >>> arr f
instance (Functor m, Monad m) => Applicative (MSF m a) where
pure = arr . const
fs <*> bs = (fs &&& bs) >>> arr (uncurry ($))
morphGS ::
(Monad m2) =>
(forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) ->
MSF m1 a1 b1 ->
MSF m2 a2 b2
morphGS morph msf = MSF $ \a2 -> do
(b2, msf') <- morph (unMSF msf) a2
return (b2, morphGS morph msf')
feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b
feedback c sf = MSF $ \a -> do
((b', c'), sf') <- unMSF sf (a, c)
return (b', feedback c' sf')
embed :: (Monad m) => MSF m a b -> [a] -> m [b]
embed _ [] = return []
embed sf (a : as) = do
(b, sf') <- unMSF sf a
bs <- embed sf' as
return (b : bs)
reactimate :: (Monad m) => MSF m () () -> m ()
reactimate sf = do
(_, sf') <- unMSF sf ()
reactimate sf'
arrM :: (Monad m) => (a -> m b) -> MSF m a b
arrM f =
morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id
type ReactHandle m = IORef (MSF m () ())
reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m)
reactInit = liftIO . newIORef
react :: (MonadIO m) => ReactHandle m -> m ()
react handle = do
msf <- liftIO $ readIORef handle
(_, msf') <- unMSF msf ()
liftIO $ writeIORef handle msf'
instance (Monad m) => ArrowChoice (MSF m) where
left :: MSF m a b -> MSF m (Either a c) (Either b c)
left sf = MSF f
where
f (Left a) = do
(b, sf') <- unMSF sf a
return (Left b, left sf')
f (Right c) = return (Right c, left sf)
|