summaryrefslogtreecommitdiffstats
path: root/src/Data
diff options
context:
space:
mode:
Diffstat (limited to 'src/Data')
-rw-r--r--src/Data/Sensor.hs491
1 files changed, 491 insertions, 0 deletions
diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs
new file mode 100644
index 0000000..29ead27
--- /dev/null
+++ b/src/Data/Sensor.hs
@@ -0,0 +1,491 @@
+module Data.Sensor where
+
+import Control.Applicative
+import Control.Arrow
+import Control.Category (Category (..))
+import Control.Category qualified as C
+import Control.Concurrent.STM (retry)
+import Control.Exception (AsyncException (ThreadKilled))
+import Control.Exception qualified as E
+import Control.Monad
+import Control.Monad.Reader
+import Data.Dynamic
+import Data.List
+import Data.Map (Map)
+import Data.Map qualified as M
+import Data.Maybe
+import Data.Set (Set)
+import Data.Set qualified as S
+import UnliftIO
+import UnliftIO.Concurrent hiding (yield)
+import Prelude hiding (id, (.))
+
+newtype Sensor m a b = Sensor'
+ { unSensor :: MSF m (Stale (Value a)) (Stale (Value b))
+ }
+
+mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b
+mkSensor = Sensor' . ignoreStaleInput
+
+ignoreStaleInput ::
+ (Monad m) =>
+ MSF m (Value a) (Stale (Value b)) ->
+ MSF m (Stale (Value a)) (Stale (Value b))
+ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do
+ case (i, x') of
+ (Stale _, Just x) -> returnA -< (Stale x, x')
+ _ -> do
+ x <- sf -< i.unwrap
+ returnA -< (x, Just x.unwrap)
+
+instance (Monad m) => Functor (Sensor m a) where
+ f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh)
+
+(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b)
+f <$$> x = fmap f <$> x
+
+infixl 4 <$$>
+
+instance (Monad m) => Applicative (Sensor m a) where
+ pure x = mkSensor do loop Fresh
+ where
+ loop stale =
+ MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale)
+
+ f <*> x =
+ mkSensor
+ ( liftA2
+ (combineStaleValue ($))
+ (f.unSensor <<< arr Fresh)
+ (x.unSensor <<< arr Fresh)
+ )
+
+instance (Monad m) => Category (Sensor m) where
+ id = mkSensor (C.id <<< arr Fresh)
+
+ sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh))
+
+instance (Monad m) => Arrow (Sensor m) where
+ arr f = mkSensor (arr (f <$$>) <<< arr Fresh)
+
+ first :: Sensor m a b -> Sensor m (a, d) (b, d)
+ first sf =
+ mkSensor
+ ( arr (\(x, d) -> (,d) <$$> x)
+ <<< first (sf.unSensor <<< arr Fresh)
+ <<< arr (\x -> (fst <$> x, snd (x.unwrap)))
+ )
+
+feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b
+feedbackS c sf =
+ mkSensor . feedback c $
+ arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap)))
+ <<< (sf.unSensor <<< arr Fresh)
+ <<< arr (\(x, d) -> (,d) <$> x)
+
+data Stale a
+ = Stale {unwrap :: a}
+ | Fresh {unwrap :: a}
+ | Input {unwrap :: a}
+ deriving (Functor, Show)
+
+instance (Monad m) => Monad (Sensor m a) where
+ m >>= f =
+ mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v ->
+ (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh)
+
+switchOnFresh ::
+ (Monad m) =>
+ MSF m a (Stale (Value b)) ->
+ MSF m a (Stale (Value b), Bool)
+switchOnFresh =
+ fmap
+ ( \v ->
+ ( v,
+ case v of
+ Fresh _ -> True
+ _ -> False
+ )
+ )
+
+switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b
+switch m f = feedback Nothing $ go m f
+ where
+ go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b))
+ go m f = MSF $ \(i, k') -> do
+ ((x, me), m') <- m.unMSF i
+ let k = case (me, k') of
+ (False, Just k) -> k
+ _ -> f x
+ (z, k') <- k.unMSF i
+ pure ((z, (Just k')), go m' f)
+
+data Value a = Value
+ { dependsOn :: Set String,
+ atTick :: Map String Int,
+ unwrap :: a
+ }
+ deriving (Show, Functor, Eq)
+
+combineStaleValue ::
+ (a -> b -> c) ->
+ Stale (Value a) ->
+ Stale (Value b) ->
+ Stale (Value c)
+combineStaleValue f =
+ combineStale (combineValue f)
+
+combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c
+combineValue f a b =
+ Value
+ (S.union a.dependsOn b.dependsOn)
+ (M.unionWith max a.atTick b.atTick)
+ (f a.unwrap b.unwrap)
+
+combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c
+combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
+combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
+combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap)
+combineStale f a b = Fresh (f a.unwrap b.unwrap)
+
+class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where
+ aggregate :: s -> AggregateT s m ()
+
+newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a}
+
+deriving instance (Functor m) => Functor (AggregateT s m)
+
+deriving instance (Applicative m) => Applicative (AggregateT s m)
+
+deriving instance (Monad m) => Monad (AggregateT s m)
+
+deriving instance (MonadIO m) => MonadIO (AggregateT s m)
+
+instance MonadTrans (AggregateT s) where
+ lift = AggregateT . lift
+
+deriving instance (MonadFail m) => MonadFail (AggregateT s m)
+
+deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m)
+
+deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m)
+
+-- XXX aggregates should be keyed on something else than `String`
+data AggregateE m = AggregateE
+ { readAggregatesM :: TMVar (S.Set String),
+ currentTickM :: TMVar Int,
+ localQueueT :: TChan (),
+ self :: String
+ }
+
+yield :: (Aggregate m s a) => a -> AggregateT s m ()
+yield x = do
+ SensorE {liveAggregatesM, globalQueueT} <- lift askS
+ AggregateE {readAggregatesM, currentTickM, self} <- ask
+ atomically do
+ writeTChan globalQueueT ()
+ liveAggregates <- readTMVar liveAggregatesM
+ valueT <- do
+ case M.lookup self liveAggregates of
+ Nothing -> retry
+ Just LiveAggregate {valueT} -> pure valueT
+ dependsOn <- takeTMVar readAggregatesM
+ putTMVar readAggregatesM S.empty
+ currentTick <- takeTMVar currentTickM
+ putTMVar currentTickM (currentTick + 1)
+ let atTick = M.singleton self currentTick
+ let v = Value (S.insert self dependsOn) atTick (toDyn x)
+ writeTVar valueT (Just v)
+
+sensor :: (Aggregate m s a) => s -> Sensor m () a
+sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do
+ SensorE {liveAggregatesM, startAggregateT} <- askS
+ atomically do writeTQueue startAggregateT (AnyAggregate s)
+ atomically do
+ liveAggregates <- readTMVar liveAggregatesM
+ valueT <- case M.lookup (show s) liveAggregates of
+ Nothing -> retry
+ Just LiveAggregate {valueT} -> pure valueT
+ v <-
+ readTVar valueT >>= \case
+ Nothing -> retry
+ Just v -> pure v
+ let stale
+ | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh
+ | otherwise = Stale
+ pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v))
+
+data Restart = Restart String SomeException deriving (Show)
+
+instance Exception Restart
+
+-- XXX merge `sample'` and `sample`
+sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m ()
+sample' f s = do
+ SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS
+ outputT <- newTVarIO Nothing
+
+ _ <- forkIO $ forever do
+ startAggregate =<< atomically do readTQueue startAggregateT
+
+ h <-
+ reactInit
+ ( ( arrM $ \v -> do
+ deadAggregates <- atomically do
+ case v of
+ Input _ -> error "Input"
+ Stale _ -> do
+ writeTVar outputT Nothing
+ pure M.empty
+ Fresh v -> do
+ writeTVar outputT (Just v.unwrap)
+ liveAggregates <- readTMVar liveAggregatesM
+ globalTick <- takeTMVar globalTickM
+ putTMVar globalTickM (globalTick + 1)
+ pure
+ ( M.filterWithKey
+ ( \key liveAggregate ->
+ not (S.member key v.dependsOn)
+ && liveAggregate.spawnedAt < globalTick
+ )
+ liveAggregates
+ )
+ mapM_
+ ( \liveAggregate -> do
+ liftIO (killThread (liveAggregate.threadId))
+ )
+ deadAggregates
+ )
+ <<< s.unSensor
+ <<< arr (Input . Value S.empty M.empty)
+ )
+ let loop c = do
+ mask $ \restore ->
+ catch
+ ( restore do
+ react h
+ c' <-
+ maybe (pure c) (fmap Just . f c)
+ =<< atomically do readTVar outputT
+ _ <- atomically do readTChan globalQueueT
+ pure c'
+ )
+ (\(Restart _ _) -> pure c)
+ >>= loop
+ loop Nothing
+
+runSensorT :: (MonadIO m) => SensorT m a -> m a
+runSensorT m = do
+ runReaderT m.unSensorT =<< sensorE
+
+sensorE :: (MonadIO m) => m (SensorE (t m))
+sensorE = do
+ liveAggregatesM <- newTMVarIO M.empty
+ startAggregateT <- newTQueueIO
+ globalQueueT <- newTChanIO
+ globalTickM <- newTMVarIO 0
+ mainThread <- myThreadId
+ pure SensorE {..}
+
+sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a]
+sample n s = do
+ SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS
+ outputsT <- newTVarIO []
+
+ _ <- forkIO $ forever do
+ startAggregate =<< atomically do readTQueue startAggregateT
+
+ h <-
+ reactInit
+ ( ( arrM $ \v -> do
+ case v of
+ Input _ -> error "Input"
+ Stale _ -> pure ()
+ Fresh v -> do
+ deadAggregates <- atomically do
+ writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT
+ liveAggregates <- readTMVar liveAggregatesM
+ globalTick <- takeTMVar globalTickM
+ putTMVar globalTickM (globalTick + 1)
+ pure
+ ( M.filterWithKey
+ ( \key liveAggregate ->
+ not (S.member key v.dependsOn)
+ && liveAggregate.spawnedAt < globalTick
+ )
+ liveAggregates
+ )
+ mapM_ (liftIO . killThread . (.threadId)) deadAggregates
+ )
+ <<< s.unSensor
+ <<< arr (Input . Value S.empty M.empty)
+ )
+ let loop = do
+ react h
+ xs <- atomically do readTVar outputsT
+ if length xs >= n
+ then pure (reverse xs)
+ else do
+ _ <- atomically do readTChan globalQueueT
+ loop
+ loop
+
+newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a}
+
+deriving instance (Functor m) => Functor (SensorT m)
+
+deriving instance (Applicative m) => Applicative (SensorT m)
+
+deriving instance (Monad m) => Monad (SensorT m)
+
+deriving instance (MonadIO m) => MonadIO (SensorT m)
+
+instance MonadTrans SensorT where
+ lift = SensorT . lift
+
+deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m)
+
+deriving instance (MonadFail m) => MonadFail (SensorT m)
+
+deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m)
+
+data SensorE m = SensorE
+ { liveAggregatesM :: TMVar (Map String (LiveAggregate m Dynamic)),
+ startAggregateT :: TQueue (AnyAggregate m),
+ globalQueueT :: TChan (),
+ globalTickM :: TMVar Int,
+ mainThread :: ThreadId
+ }
+
+class (MonadUnliftIO m) => MonadSensor m where
+ askS :: m (SensorE m)
+
+instance (MonadUnliftIO m) => MonadSensor (SensorT m) where
+ askS = ask
+
+data AnyAggregate m where
+ AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m
+
+data LiveAggregate m a = LiveAggregate
+ { valueT :: TVar (Maybe (Value a)),
+ threadId :: ThreadId,
+ spawnedAt :: Int
+ }
+
+startAggregate :: (MonadSensor m) => AnyAggregate m -> m ()
+startAggregate (AnyAggregate s) = do
+ let self = show s
+ SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS
+ liveAggregates <- atomically do takeTMVar liveAggregatesM
+ if M.member self liveAggregates
+ then atomically do putTMVar liveAggregatesM liveAggregates
+ else do
+ readAggregatesM <- newTMVarIO S.empty
+ currentTickM <- newTMVarIO 0
+ localQueueT <- atomically do dupTChan globalQueueT
+
+ threadId <-
+ forkFinally
+ (runReaderT (aggregate s).unAggregateT AggregateE {..})
+ ( either
+ ( \e ->
+ if
+ | fromException e == Just ThreadKilled -> do
+ dropAggregate self
+ | otherwise -> do
+ dropAggregate self
+ liftIO (E.throwTo mainThread (Restart self e))
+ )
+ pure
+ )
+ valueT <- newTVarIO Nothing
+ atomically do
+ spawnedAt <- readTMVar globalTickM
+ putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates)
+
+dropAggregate :: (MonadSensor m) => String -> m ()
+dropAggregate self = do
+ SensorE {liveAggregatesM} <- askS
+ atomically do
+ liveAggregates <- takeTMVar liveAggregatesM
+ putTMVar liveAggregatesM (M.delete self liveAggregates)
+
+-- XXX we should depend on `dunai`
+data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)}
+
+instance (Monad m) => Category (MSF m) where
+ id = go
+ where
+ go = MSF $ \a -> return (a, go)
+
+ sf2 . sf1 = MSF $ \a -> do
+ (b, sf1') <- unMSF sf1 a
+ (c, sf2') <- unMSF sf2 b
+ let sf' = sf2' . sf1'
+ c `seq` return (c, sf')
+
+instance (Monad m) => Arrow (MSF m) where
+ arr f = arrM (return . f)
+
+ first =
+ morphGS $ \f (a, c) -> do
+ (b, msf') <- f a
+ return ((b, c), msf')
+
+instance (Monad m) => Functor (MSF m a) where
+ fmap f msf = msf >>> arr f
+
+instance (Functor m, Monad m) => Applicative (MSF m a) where
+ pure = arr . const
+
+ fs <*> bs = (fs &&& bs) >>> arr (uncurry ($))
+
+morphGS ::
+ (Monad m2) =>
+ (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) ->
+ MSF m1 a1 b1 ->
+ MSF m2 a2 b2
+morphGS morph msf = MSF $ \a2 -> do
+ (b2, msf') <- morph (unMSF msf) a2
+ return (b2, morphGS morph msf')
+
+feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b
+feedback c sf = MSF $ \a -> do
+ ((b', c'), sf') <- unMSF sf (a, c)
+ return (b', feedback c' sf')
+
+embed :: (Monad m) => MSF m a b -> [a] -> m [b]
+embed _ [] = return []
+embed sf (a : as) = do
+ (b, sf') <- unMSF sf a
+ bs <- embed sf' as
+ return (b : bs)
+
+reactimate :: (Monad m) => MSF m () () -> m ()
+reactimate sf = do
+ (_, sf') <- unMSF sf ()
+ reactimate sf'
+
+arrM :: (Monad m) => (a -> m b) -> MSF m a b
+arrM f =
+ morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id
+
+type ReactHandle m = IORef (MSF m () ())
+
+reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m)
+reactInit = liftIO . newIORef
+
+react :: (MonadIO m) => ReactHandle m -> m ()
+react handle = do
+ msf <- liftIO $ readIORef handle
+ (_, msf') <- unMSF msf ()
+ liftIO $ writeIORef handle msf'
+
+instance (Monad m) => ArrowChoice (MSF m) where
+ left :: MSF m a b -> MSF m (Either a c) (Either b c)
+ left sf = MSF f
+ where
+ f (Left a) = do
+ (b, sf') <- unMSF sf a
+ return (Left b, left sf')
+ f (Right c) = return (Right c, left sf)