From 272b3ace747857729171780edae898819d211832 Mon Sep 17 00:00:00 2001 From: Alexander Foremny Date: Thu, 25 Jul 2024 09:42:11 +0200 Subject: init --- src/Data/Sensor.hs | 491 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 491 insertions(+) create mode 100644 src/Data/Sensor.hs (limited to 'src') diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs new file mode 100644 index 0000000..29ead27 --- /dev/null +++ b/src/Data/Sensor.hs @@ -0,0 +1,491 @@ +module Data.Sensor where + +import Control.Applicative +import Control.Arrow +import Control.Category (Category (..)) +import Control.Category qualified as C +import Control.Concurrent.STM (retry) +import Control.Exception (AsyncException (ThreadKilled)) +import Control.Exception qualified as E +import Control.Monad +import Control.Monad.Reader +import Data.Dynamic +import Data.List +import Data.Map (Map) +import Data.Map qualified as M +import Data.Maybe +import Data.Set (Set) +import Data.Set qualified as S +import UnliftIO +import UnliftIO.Concurrent hiding (yield) +import Prelude hiding (id, (.)) + +newtype Sensor m a b = Sensor' + { unSensor :: MSF m (Stale (Value a)) (Stale (Value b)) + } + +mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b +mkSensor = Sensor' . ignoreStaleInput + +ignoreStaleInput :: + (Monad m) => + MSF m (Value a) (Stale (Value b)) -> + MSF m (Stale (Value a)) (Stale (Value b)) +ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do + case (i, x') of + (Stale _, Just x) -> returnA -< (Stale x, x') + _ -> do + x <- sf -< i.unwrap + returnA -< (x, Just x.unwrap) + +instance (Monad m) => Functor (Sensor m a) where + f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh) + +(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b) +f <$$> x = fmap f <$> x + +infixl 4 <$$> + +instance (Monad m) => Applicative (Sensor m a) where + pure x = mkSensor do loop Fresh + where + loop stale = + MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale) + + f <*> x = + mkSensor + ( liftA2 + (combineStaleValue ($)) + (f.unSensor <<< arr Fresh) + (x.unSensor <<< arr Fresh) + ) + +instance (Monad m) => Category (Sensor m) where + id = mkSensor (C.id <<< arr Fresh) + + sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh)) + +instance (Monad m) => Arrow (Sensor m) where + arr f = mkSensor (arr (f <$$>) <<< arr Fresh) + + first :: Sensor m a b -> Sensor m (a, d) (b, d) + first sf = + mkSensor + ( arr (\(x, d) -> (,d) <$$> x) + <<< first (sf.unSensor <<< arr Fresh) + <<< arr (\x -> (fst <$> x, snd (x.unwrap))) + ) + +feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b +feedbackS c sf = + mkSensor . feedback c $ + arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap))) + <<< (sf.unSensor <<< arr Fresh) + <<< arr (\(x, d) -> (,d) <$> x) + +data Stale a + = Stale {unwrap :: a} + | Fresh {unwrap :: a} + | Input {unwrap :: a} + deriving (Functor, Show) + +instance (Monad m) => Monad (Sensor m a) where + m >>= f = + mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v -> + (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh) + +switchOnFresh :: + (Monad m) => + MSF m a (Stale (Value b)) -> + MSF m a (Stale (Value b), Bool) +switchOnFresh = + fmap + ( \v -> + ( v, + case v of + Fresh _ -> True + _ -> False + ) + ) + +switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b +switch m f = feedback Nothing $ go m f + where + go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b)) + go m f = MSF $ \(i, k') -> do + ((x, me), m') <- m.unMSF i + let k = case (me, k') of + (False, Just k) -> k + _ -> f x + (z, k') <- k.unMSF i + pure ((z, (Just k')), go m' f) + +data Value a = Value + { dependsOn :: Set String, + atTick :: Map String Int, + unwrap :: a + } + deriving (Show, Functor, Eq) + +combineStaleValue :: + (a -> b -> c) -> + Stale (Value a) -> + Stale (Value b) -> + Stale (Value c) +combineStaleValue f = + combineStale (combineValue f) + +combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c +combineValue f a b = + Value + (S.union a.dependsOn b.dependsOn) + (M.unionWith max a.atTick b.atTick) + (f a.unwrap b.unwrap) + +combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c +combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap) +combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap) +combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap) +combineStale f a b = Fresh (f a.unwrap b.unwrap) + +class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where + aggregate :: s -> AggregateT s m () + +newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a} + +deriving instance (Functor m) => Functor (AggregateT s m) + +deriving instance (Applicative m) => Applicative (AggregateT s m) + +deriving instance (Monad m) => Monad (AggregateT s m) + +deriving instance (MonadIO m) => MonadIO (AggregateT s m) + +instance MonadTrans (AggregateT s) where + lift = AggregateT . lift + +deriving instance (MonadFail m) => MonadFail (AggregateT s m) + +deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m) + +deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m) + +-- XXX aggregates should be keyed on something else than `String` +data AggregateE m = AggregateE + { readAggregatesM :: TMVar (S.Set String), + currentTickM :: TMVar Int, + localQueueT :: TChan (), + self :: String + } + +yield :: (Aggregate m s a) => a -> AggregateT s m () +yield x = do + SensorE {liveAggregatesM, globalQueueT} <- lift askS + AggregateE {readAggregatesM, currentTickM, self} <- ask + atomically do + writeTChan globalQueueT () + liveAggregates <- readTMVar liveAggregatesM + valueT <- do + case M.lookup self liveAggregates of + Nothing -> retry + Just LiveAggregate {valueT} -> pure valueT + dependsOn <- takeTMVar readAggregatesM + putTMVar readAggregatesM S.empty + currentTick <- takeTMVar currentTickM + putTMVar currentTickM (currentTick + 1) + let atTick = M.singleton self currentTick + let v = Value (S.insert self dependsOn) atTick (toDyn x) + writeTVar valueT (Just v) + +sensor :: (Aggregate m s a) => s -> Sensor m () a +sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do + SensorE {liveAggregatesM, startAggregateT} <- askS + atomically do writeTQueue startAggregateT (AnyAggregate s) + atomically do + liveAggregates <- readTMVar liveAggregatesM + valueT <- case M.lookup (show s) liveAggregates of + Nothing -> retry + Just LiveAggregate {valueT} -> pure valueT + v <- + readTVar valueT >>= \case + Nothing -> retry + Just v -> pure v + let stale + | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh + | otherwise = Stale + pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v)) + +data Restart = Restart String SomeException deriving (Show) + +instance Exception Restart + +-- XXX merge `sample'` and `sample` +sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m () +sample' f s = do + SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS + outputT <- newTVarIO Nothing + + _ <- forkIO $ forever do + startAggregate =<< atomically do readTQueue startAggregateT + + h <- + reactInit + ( ( arrM $ \v -> do + deadAggregates <- atomically do + case v of + Input _ -> error "Input" + Stale _ -> do + writeTVar outputT Nothing + pure M.empty + Fresh v -> do + writeTVar outputT (Just v.unwrap) + liveAggregates <- readTMVar liveAggregatesM + globalTick <- takeTMVar globalTickM + putTMVar globalTickM (globalTick + 1) + pure + ( M.filterWithKey + ( \key liveAggregate -> + not (S.member key v.dependsOn) + && liveAggregate.spawnedAt < globalTick + ) + liveAggregates + ) + mapM_ + ( \liveAggregate -> do + liftIO (killThread (liveAggregate.threadId)) + ) + deadAggregates + ) + <<< s.unSensor + <<< arr (Input . Value S.empty M.empty) + ) + let loop c = do + mask $ \restore -> + catch + ( restore do + react h + c' <- + maybe (pure c) (fmap Just . f c) + =<< atomically do readTVar outputT + _ <- atomically do readTChan globalQueueT + pure c' + ) + (\(Restart _ _) -> pure c) + >>= loop + loop Nothing + +runSensorT :: (MonadIO m) => SensorT m a -> m a +runSensorT m = do + runReaderT m.unSensorT =<< sensorE + +sensorE :: (MonadIO m) => m (SensorE (t m)) +sensorE = do + liveAggregatesM <- newTMVarIO M.empty + startAggregateT <- newTQueueIO + globalQueueT <- newTChanIO + globalTickM <- newTMVarIO 0 + mainThread <- myThreadId + pure SensorE {..} + +sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a] +sample n s = do + SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS + outputsT <- newTVarIO [] + + _ <- forkIO $ forever do + startAggregate =<< atomically do readTQueue startAggregateT + + h <- + reactInit + ( ( arrM $ \v -> do + case v of + Input _ -> error "Input" + Stale _ -> pure () + Fresh v -> do + deadAggregates <- atomically do + writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT + liveAggregates <- readTMVar liveAggregatesM + globalTick <- takeTMVar globalTickM + putTMVar globalTickM (globalTick + 1) + pure + ( M.filterWithKey + ( \key liveAggregate -> + not (S.member key v.dependsOn) + && liveAggregate.spawnedAt < globalTick + ) + liveAggregates + ) + mapM_ (liftIO . killThread . (.threadId)) deadAggregates + ) + <<< s.unSensor + <<< arr (Input . Value S.empty M.empty) + ) + let loop = do + react h + xs <- atomically do readTVar outputsT + if length xs >= n + then pure (reverse xs) + else do + _ <- atomically do readTChan globalQueueT + loop + loop + +newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a} + +deriving instance (Functor m) => Functor (SensorT m) + +deriving instance (Applicative m) => Applicative (SensorT m) + +deriving instance (Monad m) => Monad (SensorT m) + +deriving instance (MonadIO m) => MonadIO (SensorT m) + +instance MonadTrans SensorT where + lift = SensorT . lift + +deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m) + +deriving instance (MonadFail m) => MonadFail (SensorT m) + +deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m) + +data SensorE m = SensorE + { liveAggregatesM :: TMVar (Map String (LiveAggregate m Dynamic)), + startAggregateT :: TQueue (AnyAggregate m), + globalQueueT :: TChan (), + globalTickM :: TMVar Int, + mainThread :: ThreadId + } + +class (MonadUnliftIO m) => MonadSensor m where + askS :: m (SensorE m) + +instance (MonadUnliftIO m) => MonadSensor (SensorT m) where + askS = ask + +data AnyAggregate m where + AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m + +data LiveAggregate m a = LiveAggregate + { valueT :: TVar (Maybe (Value a)), + threadId :: ThreadId, + spawnedAt :: Int + } + +startAggregate :: (MonadSensor m) => AnyAggregate m -> m () +startAggregate (AnyAggregate s) = do + let self = show s + SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS + liveAggregates <- atomically do takeTMVar liveAggregatesM + if M.member self liveAggregates + then atomically do putTMVar liveAggregatesM liveAggregates + else do + readAggregatesM <- newTMVarIO S.empty + currentTickM <- newTMVarIO 0 + localQueueT <- atomically do dupTChan globalQueueT + + threadId <- + forkFinally + (runReaderT (aggregate s).unAggregateT AggregateE {..}) + ( either + ( \e -> + if + | fromException e == Just ThreadKilled -> do + dropAggregate self + | otherwise -> do + dropAggregate self + liftIO (E.throwTo mainThread (Restart self e)) + ) + pure + ) + valueT <- newTVarIO Nothing + atomically do + spawnedAt <- readTMVar globalTickM + putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates) + +dropAggregate :: (MonadSensor m) => String -> m () +dropAggregate self = do + SensorE {liveAggregatesM} <- askS + atomically do + liveAggregates <- takeTMVar liveAggregatesM + putTMVar liveAggregatesM (M.delete self liveAggregates) + +-- XXX we should depend on `dunai` +data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)} + +instance (Monad m) => Category (MSF m) where + id = go + where + go = MSF $ \a -> return (a, go) + + sf2 . sf1 = MSF $ \a -> do + (b, sf1') <- unMSF sf1 a + (c, sf2') <- unMSF sf2 b + let sf' = sf2' . sf1' + c `seq` return (c, sf') + +instance (Monad m) => Arrow (MSF m) where + arr f = arrM (return . f) + + first = + morphGS $ \f (a, c) -> do + (b, msf') <- f a + return ((b, c), msf') + +instance (Monad m) => Functor (MSF m a) where + fmap f msf = msf >>> arr f + +instance (Functor m, Monad m) => Applicative (MSF m a) where + pure = arr . const + + fs <*> bs = (fs &&& bs) >>> arr (uncurry ($)) + +morphGS :: + (Monad m2) => + (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) -> + MSF m1 a1 b1 -> + MSF m2 a2 b2 +morphGS morph msf = MSF $ \a2 -> do + (b2, msf') <- morph (unMSF msf) a2 + return (b2, morphGS morph msf') + +feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b +feedback c sf = MSF $ \a -> do + ((b', c'), sf') <- unMSF sf (a, c) + return (b', feedback c' sf') + +embed :: (Monad m) => MSF m a b -> [a] -> m [b] +embed _ [] = return [] +embed sf (a : as) = do + (b, sf') <- unMSF sf a + bs <- embed sf' as + return (b : bs) + +reactimate :: (Monad m) => MSF m () () -> m () +reactimate sf = do + (_, sf') <- unMSF sf () + reactimate sf' + +arrM :: (Monad m) => (a -> m b) -> MSF m a b +arrM f = + morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id + +type ReactHandle m = IORef (MSF m () ()) + +reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m) +reactInit = liftIO . newIORef + +react :: (MonadIO m) => ReactHandle m -> m () +react handle = do + msf <- liftIO $ readIORef handle + (_, msf') <- unMSF msf () + liftIO $ writeIORef handle msf' + +instance (Monad m) => ArrowChoice (MSF m) where + left :: MSF m a b -> MSF m (Either a c) (Either b c) + left sf = MSF f + where + f (Left a) = do + (b, sf') <- unMSF sf a + return (Left b, left sf') + f (Right c) = return (Right c, left sf) -- cgit v1.2.3