summaryrefslogtreecommitdiffstats
path: root/src/Data/Sensor.hs
blob: 2cb7eaaf1f857e868d321e2af64296615c4d8a1e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
{-# LANGUAGE UndecidableInstances #-}

module Data.Sensor where

import Control.Applicative
import Control.Arrow
import Control.Category (Category (..))
import Control.Category qualified as C
import Control.Concurrent.STM (retry)
import Control.Exception (AsyncException (ThreadKilled))
import Control.Exception qualified as E
import Control.Monad
import Control.Monad.Reader
import Data.Dynamic
import Data.List
import Data.Map (Map)
import Data.Map qualified as M
import Data.Maybe
import Data.Set (Set)
import Data.Set qualified as S
import UnliftIO
import UnliftIO.Concurrent hiding (yield)
import Prelude hiding (id, (.))

newtype Sensor m a b = Sensor'
  { unSensor :: MSF (SensorT m) (Stale (Value a)) (Stale (Value b))
  }

mkSensor :: (Monad m) => MSF (SensorT m) (Value a) (Stale (Value b)) -> Sensor m a b
mkSensor = Sensor' . ignoreStaleInput

ignoreStaleInput ::
  (Monad m) =>
  MSF m (Value a) (Stale (Value b)) ->
  MSF m (Stale (Value a)) (Stale (Value b))
ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do
  case (i, x') of
    (Stale _, Just x) -> returnA -< (Stale x, x')
    _ -> do
      x <- sf -< i.unwrap
      returnA -< (x, Just x.unwrap)

instance (Monad m) => Functor (Sensor m a) where
  f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh)

(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b)
f <$$> x = fmap f <$> x

infixl 4 <$$>

instance (Monad m) => Applicative (Sensor m a) where
  pure x = mkSensor do loop Fresh
    where
      loop stale =
        MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale)

  f <*> x =
    mkSensor
      ( liftA2
          (combineStaleValue ($))
          (f.unSensor <<< arr Fresh)
          (x.unSensor <<< arr Fresh)
      )

instance (Monad m) => Category (Sensor m) where
  id = mkSensor (C.id <<< arr Fresh)

  sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh))

instance (Monad m) => Arrow (Sensor m) where
  arr f = mkSensor (arr (f <$$>) <<< arr Fresh)

  first :: Sensor m a b -> Sensor m (a, d) (b, d)
  first sf =
    mkSensor
      ( arr (\(x, d) -> (,d) <$$> x)
          <<< first (sf.unSensor <<< arr Fresh)
          <<< arr (\x -> (fst <$> x, snd (x.unwrap)))
      )

feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b
feedbackS c sf =
  mkSensor . feedback c $
    arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap)))
      <<< (sf.unSensor <<< arr Fresh)
      <<< arr (\(x, d) -> (,d) <$> x)

data Stale a
  = Stale {unwrap :: a}
  | Fresh {unwrap :: a}
  | Input {unwrap :: a}
  deriving (Functor, Show)

instance (Monad m) => Monad (Sensor m a) where
  m >>= f =
    mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v ->
      (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh)

switchOnFresh ::
  (Monad m) =>
  MSF m a (Stale (Value b)) ->
  MSF m a (Stale (Value b), Bool)
switchOnFresh =
  fmap
    ( \v ->
        ( v,
          case v of
            Fresh _ -> True
            _ -> False
        )
    )

switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b
switch m f = feedback Nothing $ go m f
  where
    go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b))
    go m f = MSF $ \(i, k') -> do
      ((x, me), m') <- m.unMSF i
      let k = case (me, k') of
            (False, Just k) -> k
            _ -> f x
      (z, k') <- k.unMSF i
      pure ((z, (Just k')), go m' f)

data Value a = Value
  { dependsOn :: Set String,
    atTick :: Map String Int,
    unwrap :: a
  }
  deriving (Show, Functor, Eq)

combineStaleValue ::
  (a -> b -> c) ->
  Stale (Value a) ->
  Stale (Value b) ->
  Stale (Value c)
combineStaleValue f =
  combineStale (combineValue f)

combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c
combineValue f a b =
  Value
    (S.union a.dependsOn b.dependsOn)
    (M.unionWith max a.atTick b.atTick)
    (f a.unwrap b.unwrap)

combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c
combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap)
combineStale f a b = Fresh (f a.unwrap b.unwrap)

class (MonadIO m, Show s, Typeable a) => Aggregate m s a | s -> a where
  aggregate :: s -> (a -> m ()) -> m ()

newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT AggregateE m a}

deriving instance (Functor m) => Functor (AggregateT s m)

deriving instance (Applicative m) => Applicative (AggregateT s m)

deriving instance (Monad m) => Monad (AggregateT s m)

deriving instance (MonadIO m) => MonadIO (AggregateT s m)

deriving instance MonadTrans (AggregateT s)

deriving instance (MonadFail m) => MonadFail (AggregateT s m)

deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m)

deriving instance (Monad m) => MonadReader AggregateE (AggregateT s m)

-- XXX aggregates should be keyed on something else than `String`
data AggregateE = AggregateE
  { readAggregatesM :: TMVar (S.Set String),
    currentTickM :: TMVar Int,
    localQueueT :: TChan (),
    self :: String
  }

yield :: (Aggregate m s a) => a -> AggregateT s (SensorT m) ()
yield x = do
  SensorE {liveAggregatesM, globalQueueT} <- lift ask
  AggregateE {readAggregatesM, currentTickM, self} <- ask
  atomically do
    writeTChan globalQueueT ()
    liveAggregates <- readTMVar liveAggregatesM
    valueT <- do
      case M.lookup self liveAggregates of
        Nothing -> retry
        Just LiveAggregate {valueT} -> pure valueT
    dependsOn <- takeTMVar readAggregatesM
    putTMVar readAggregatesM S.empty
    currentTick <- takeTMVar currentTickM
    putTMVar currentTickM (currentTick + 1)
    let atTick = M.singleton self currentTick
    let v = Value (S.insert self dependsOn) atTick (toDyn x)
    writeTVar valueT (Just v)

sensor :: (Aggregate m s a) => s -> Sensor m () a
sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do
  SensorE {liveAggregatesM, startAggregateT} <- ask
  atomically do writeTQueue startAggregateT (AnyAggregate s)
  atomically do
    liveAggregates <- readTMVar liveAggregatesM
    valueT <- case M.lookup (show s) liveAggregates of
      Nothing -> retry
      Just LiveAggregate {valueT} -> pure valueT
    v <-
      readTVar valueT >>= \case
        Nothing -> retry
        Just v -> pure v
    let stale
          | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh
          | otherwise = Stale
    pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v))

data Restart = Restart String SomeException deriving (Show)

instance Exception Restart

-- XXX merge `sample'` and `sample`
sample' :: (MonadUnliftIO m) => (Maybe c -> a -> m c) -> Sensor m () a -> m ()
sample' f s = runSensorT do
  SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- ask
  outputT <- newTVarIO Nothing

  _ <- forkIO $ forever do
    startAggregate =<< atomically do readTQueue startAggregateT

  h <-
    reactInit
      ( ( arrM $ \v -> do
            deadAggregates <- atomically do
              case v of
                Input _ -> error "Input"
                Stale _ -> do
                  writeTVar outputT Nothing
                  pure M.empty
                Fresh v -> do
                  writeTVar outputT (Just v.unwrap)
                  liveAggregates <- readTMVar liveAggregatesM
                  globalTick <- takeTMVar globalTickM
                  putTMVar globalTickM (globalTick + 1)
                  pure
                    ( M.filterWithKey
                        ( \key liveAggregate ->
                            not (S.member key v.dependsOn)
                              && liveAggregate.spawnedAt < globalTick
                        )
                        liveAggregates
                    )
            mapM_
              ( \liveAggregate -> do
                  liftIO (killThread (liveAggregate.threadId))
              )
              deadAggregates
        )
          <<< s.unSensor
          <<< arr (Input . Value S.empty M.empty)
      )
  let loop c = do
        mask $ \restore ->
          catch
            ( restore do
                react h
                c' <-
                  maybe (pure c) (lift . fmap Just . f c)
                    =<< atomically do readTVar outputT
                _ <- atomically do readTChan globalQueueT
                pure c'
            )
            (\(Restart _ _) -> pure c)
            >>= loop
  loop Nothing

runSensorT :: (MonadIO m) => SensorT m a -> m a
runSensorT m = do
  runReaderT m.unSensorT =<< sensorE

sensorE :: (MonadIO m) => m (SensorE m)
sensorE = do
  liveAggregatesM <- newTMVarIO M.empty
  startAggregateT <- newTQueueIO
  globalQueueT <- newTChanIO
  globalTickM <- newTMVarIO 0
  mainThread <- myThreadId
  pure SensorE {..}

sample :: (MonadUnliftIO m) => Int -> Sensor m () a -> m [a]
sample n s = runSensorT do
  SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- ask
  outputsT <- newTVarIO []

  _ <- forkIO $ forever do
    startAggregate =<< atomically do readTQueue startAggregateT

  h <-
    reactInit
      ( ( arrM $ \v -> do
            case v of
              Input _ -> error "Input"
              Stale _ -> pure ()
              Fresh v -> do
                deadAggregates <- atomically do
                  writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT
                  liveAggregates <- readTMVar liveAggregatesM
                  globalTick <- takeTMVar globalTickM
                  putTMVar globalTickM (globalTick + 1)
                  pure
                    ( M.filterWithKey
                        ( \key liveAggregate ->
                            not (S.member key v.dependsOn)
                              && liveAggregate.spawnedAt < globalTick
                        )
                        liveAggregates
                    )
                mapM_ (liftIO . killThread . (.threadId)) deadAggregates
        )
          <<< s.unSensor
          <<< arr (Input . Value S.empty M.empty)
      )
  let loop = do
        react h
        xs <- atomically do readTVar outputsT
        if length xs >= n
          then pure (reverse xs)
          else do
            _ <- atomically do readTChan globalQueueT
            loop
  loop

newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE m) m a}

deriving instance (Functor m) => Functor (SensorT m)

deriving instance (Applicative m) => Applicative (SensorT m)

deriving instance (Monad m) => Monad (SensorT m)

deriving instance (MonadIO m) => MonadIO (SensorT m)

instance MonadTrans SensorT where
  lift = SensorT . lift

deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m)

deriving instance (MonadFail m) => MonadFail (SensorT m)

deriving instance (Monad m) => MonadReader (SensorE m) (SensorT m)

data SensorE m = SensorE
  { liveAggregatesM :: TMVar (Map String (LiveAggregate Dynamic)),
    startAggregateT :: TQueue (AnyAggregate m),
    globalQueueT :: TChan (),
    globalTickM :: TMVar Int,
    mainThread :: ThreadId
  }

data AnyAggregate m where
  AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m

data LiveAggregate a = LiveAggregate
  { valueT :: TVar (Maybe (Value a)),
    threadId :: ThreadId,
    spawnedAt :: Int
  }

runAggregateT :: AggregateT s m a -> AggregateE -> m a
runAggregateT (AggregateT m) e =
  runReaderT m e

withRunInBase :: ((forall a. (AggregateT s (SensorT m) a) -> m a) -> m x) -> AggregateT s (SensorT m) x
withRunInBase = undefined

startAggregate :: (MonadUnliftIO m) => AnyAggregate m -> SensorT m ()
startAggregate (AnyAggregate s) = do
  let self = show s
  SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- ask
  liveAggregates <- atomically do takeTMVar liveAggregatesM
  if M.member self liveAggregates
    then atomically do putTMVar liveAggregatesM liveAggregates
    else do
      readAggregatesM <- newTMVarIO S.empty
      currentTickM <- newTMVarIO 0
      localQueueT <- atomically do dupTChan globalQueueT

      threadId <-
        forkFinally
          ( do
              sensorE <- ask
              (lift :: m a -> SensorT m a)
                ( runReaderT
                    ( runAggregateT
                        (lift (lift (aggregate s undefined)))
                        {-
                        ( withRunInBase $ \runInBase ->
                            (aggregate s (runInBase . yield))
                        )-}
                        AggregateE {..} ::
                        SensorT m ()
                    ).unSensorT
                    sensorE ::
                    m ()
                )
          )
          ( either
              ( \e ->
                  if
                    | fromException e == Just ThreadKilled -> do
                        dropAggregate self
                    | otherwise -> do
                        dropAggregate self
                        liftIO (E.throwTo mainThread (Restart self e))
              )
              pure
          )
      valueT <- newTVarIO Nothing
      atomically do
        spawnedAt <- readTMVar globalTickM
        putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates)

dropAggregate :: (MonadIO m) => String -> SensorT m ()
dropAggregate self = do
  SensorE {liveAggregatesM} <- ask
  atomically do
    liveAggregates <- takeTMVar liveAggregatesM
    putTMVar liveAggregatesM (M.delete self liveAggregates)

-- XXX we should depend on `dunai`
data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)}

instance (Monad m) => Category (MSF m) where
  id = go
    where
      go = MSF $ \a -> return (a, go)

  sf2 . sf1 = MSF $ \a -> do
    (b, sf1') <- unMSF sf1 a
    (c, sf2') <- unMSF sf2 b
    let sf' = sf2' . sf1'
    c `seq` return (c, sf')

instance (Monad m) => Arrow (MSF m) where
  arr f = arrM (return . f)

  first =
    morphGS $ \f (a, c) -> do
      (b, msf') <- f a
      return ((b, c), msf')

instance (Monad m) => Functor (MSF m a) where
  fmap f msf = msf >>> arr f

instance (Functor m, Monad m) => Applicative (MSF m a) where
  pure = arr . const

  fs <*> bs = (fs &&& bs) >>> arr (uncurry ($))

morphGS ::
  (Monad m2) =>
  (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) ->
  MSF m1 a1 b1 ->
  MSF m2 a2 b2
morphGS morph msf = MSF $ \a2 -> do
  (b2, msf') <- morph (unMSF msf) a2
  return (b2, morphGS morph msf')

feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b
feedback c sf = MSF $ \a -> do
  ((b', c'), sf') <- unMSF sf (a, c)
  return (b', feedback c' sf')

embed :: (Monad m) => MSF m a b -> [a] -> m [b]
embed _ [] = return []
embed sf (a : as) = do
  (b, sf') <- unMSF sf a
  bs <- embed sf' as
  return (b : bs)

reactimate :: (Monad m) => MSF m () () -> m ()
reactimate sf = do
  (_, sf') <- unMSF sf ()
  reactimate sf'

arrM :: (Monad m) => (a -> m b) -> MSF m a b
arrM f =
  morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id

type ReactHandle m = IORef (MSF m () ())

reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m)
reactInit = liftIO . newIORef

react :: (MonadIO m) => ReactHandle m -> m ()
react handle = do
  msf <- liftIO $ readIORef handle
  (_, msf') <- unMSF msf ()
  liftIO $ writeIORef handle msf'

instance (Monad m) => ArrowChoice (MSF m) where
  left :: MSF m a b -> MSF m (Either a c) (Either b c)
  left sf = MSF f
    where
      f (Left a) = do
        (b, sf') <- unMSF sf a
        return (Left b, left sf')
      f (Right c) = return (Right c, left sf)