diff options
author | Alexander Foremny <aforemny@posteo.de> | 2024-07-25 09:42:11 +0200 |
---|---|---|
committer | Alexander Foremny <aforemny@posteo.de> | 2024-08-08 15:46:58 +0200 |
commit | 272b3ace747857729171780edae898819d211832 (patch) | |
tree | 5e791779a318521175104a7e9140dc186c4b3d54 |
init
-rw-r--r-- | .envrc | 1 | ||||
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | LICENSE | 30 | ||||
-rw-r--r-- | default.nix | 17 | ||||
-rw-r--r-- | nix/sources.json | 14 | ||||
-rw-r--r-- | nix/sources.nix | 198 | ||||
-rw-r--r-- | sensors.cabal | 47 | ||||
-rw-r--r-- | shell.nix | 1 | ||||
-rw-r--r-- | src/Data/Sensor.hs | 491 | ||||
-rw-r--r-- | test/AggregateSpec.hs | 102 | ||||
-rw-r--r-- | test/PureSpec.hs | 10 | ||||
-rw-r--r-- | test/Spec.hs | 1 |
12 files changed, 913 insertions, 0 deletions
@@ -0,0 +1 @@ +use nix diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8075013 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/dist-newstyle @@ -0,0 +1,30 @@ +Copyright (c) 2024, Alexander Foremny + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of Alexander Foremny nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/default.nix b/default.nix new file mode 100644 index 0000000..3de5581 --- /dev/null +++ b/default.nix @@ -0,0 +1,17 @@ +{ pkgs ? import (import ./nix/sources.nix).nixpkgs { } }: rec { + haskellPackages = pkgs.haskellPackages.override { + overrides = self: super: { + sensors = self.callCabal2nix "sensors" ./. { }; + }; + }; + inherit (haskellPackages) sensors; + shell = haskellPackages.shellFor { + packages = _: [ sensors ]; + buildInputs = [ + haskellPackages.cabal-install + haskellPackages.ormolu + pkgs.niv + ]; + withHoogle = true; + }; +} diff --git a/nix/sources.json b/nix/sources.json new file mode 100644 index 0000000..e36b267 --- /dev/null +++ b/nix/sources.json @@ -0,0 +1,14 @@ +{ + "nixpkgs": { + "branch": "release-24.05", + "description": "Nix Packages collection", + "homepage": null, + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "36097d5a3f3ce6334a35717edeb90dba184d7081", + "sha256": "0ygi382h7sslzpzpcacbi9pdplldr5mqn5qq97lcwlxs9ksk1z64", + "type": "tarball", + "url": "https://github.com/NixOS/nixpkgs/archive/36097d5a3f3ce6334a35717edeb90dba184d7081.tar.gz", + "url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz" + } +} diff --git a/nix/sources.nix b/nix/sources.nix new file mode 100644 index 0000000..fe3dadf --- /dev/null +++ b/nix/sources.nix @@ -0,0 +1,198 @@ +# This file has been generated by Niv. + +let + + # + # The fetchers. fetch_<type> fetches specs of type <type>. + # + + fetch_file = pkgs: name: spec: + let + name' = sanitizeName name + "-src"; + in + if spec.builtin or true then + builtins_fetchurl { inherit (spec) url sha256; name = name'; } + else + pkgs.fetchurl { inherit (spec) url sha256; name = name'; }; + + fetch_tarball = pkgs: name: spec: + let + name' = sanitizeName name + "-src"; + in + if spec.builtin or true then + builtins_fetchTarball { name = name'; inherit (spec) url sha256; } + else + pkgs.fetchzip { name = name'; inherit (spec) url sha256; }; + + fetch_git = name: spec: + let + ref = + spec.ref or ( + if spec ? branch then "refs/heads/${spec.branch}" else + if spec ? tag then "refs/tags/${spec.tag}" else + abort "In git source '${name}': Please specify `ref`, `tag` or `branch`!" + ); + submodules = spec.submodules or false; + submoduleArg = + let + nixSupportsSubmodules = builtins.compareVersions builtins.nixVersion "2.4" >= 0; + emptyArgWithWarning = + if submodules + then + builtins.trace + ( + "The niv input \"${name}\" uses submodules " + + "but your nix's (${builtins.nixVersion}) builtins.fetchGit " + + "does not support them" + ) + { } + else { }; + in + if nixSupportsSubmodules + then { inherit submodules; } + else emptyArgWithWarning; + in + builtins.fetchGit + ({ url = spec.repo; inherit (spec) rev; inherit ref; } // submoduleArg); + + fetch_local = spec: spec.path; + + fetch_builtin-tarball = name: throw + ''[${name}] The niv type "builtin-tarball" is deprecated. You should instead use `builtin = true`. + $ niv modify ${name} -a type=tarball -a builtin=true''; + + fetch_builtin-url = name: throw + ''[${name}] The niv type "builtin-url" will soon be deprecated. You should instead use `builtin = true`. + $ niv modify ${name} -a type=file -a builtin=true''; + + # + # Various helpers + # + + # https://github.com/NixOS/nixpkgs/pull/83241/files#diff-c6f540a4f3bfa4b0e8b6bafd4cd54e8bR695 + sanitizeName = name: + ( + concatMapStrings (s: if builtins.isList s then "-" else s) + ( + builtins.split "[^[:alnum:]+._?=-]+" + ((x: builtins.elemAt (builtins.match "\\.*(.*)" x) 0) name) + ) + ); + + # The set of packages used when specs are fetched using non-builtins. + mkPkgs = sources: system: + let + sourcesNixpkgs = + import (builtins_fetchTarball { inherit (sources.nixpkgs) url sha256; }) { inherit system; }; + hasNixpkgsPath = builtins.any (x: x.prefix == "nixpkgs") builtins.nixPath; + hasThisAsNixpkgsPath = <nixpkgs> == ./.; + in + if builtins.hasAttr "nixpkgs" sources + then sourcesNixpkgs + else if hasNixpkgsPath && ! hasThisAsNixpkgsPath then + import <nixpkgs> { } + else + abort + '' + Please specify either <nixpkgs> (through -I or NIX_PATH=nixpkgs=...) or + add a package called "nixpkgs" to your sources.json. + ''; + + # The actual fetching function. + fetch = pkgs: name: spec: + + if ! builtins.hasAttr "type" spec then + abort "ERROR: niv spec ${name} does not have a 'type' attribute" + else if spec.type == "file" then fetch_file pkgs name spec + else if spec.type == "tarball" then fetch_tarball pkgs name spec + else if spec.type == "git" then fetch_git name spec + else if spec.type == "local" then fetch_local spec + else if spec.type == "builtin-tarball" then fetch_builtin-tarball name + else if spec.type == "builtin-url" then fetch_builtin-url name + else + abort "ERROR: niv spec ${name} has unknown type ${builtins.toJSON spec.type}"; + + # If the environment variable NIV_OVERRIDE_${name} is set, then use + # the path directly as opposed to the fetched source. + replace = name: drv: + let + saneName = stringAsChars (c: if (builtins.match "[a-zA-Z0-9]" c) == null then "_" else c) name; + ersatz = builtins.getEnv "NIV_OVERRIDE_${saneName}"; + in + if ersatz == "" then drv else + # this turns the string into an actual Nix path (for both absolute and + # relative paths) + if builtins.substring 0 1 ersatz == "/" then /. + ersatz else /. + builtins.getEnv "PWD" + "/${ersatz}"; + + # Ports of functions for older nix versions + + # a Nix version of mapAttrs if the built-in doesn't exist + mapAttrs = builtins.mapAttrs or ( + f: set: with builtins; + listToAttrs (map (attr: { name = attr; value = f attr set.${attr}; }) (attrNames set)) + ); + + # https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/lists.nix#L295 + range = first: last: if first > last then [ ] else builtins.genList (n: first + n) (last - first + 1); + + # https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L257 + stringToCharacters = s: map (p: builtins.substring p 1 s) (range 0 (builtins.stringLength s - 1)); + + # https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L269 + stringAsChars = f: s: concatStrings (map f (stringToCharacters s)); + concatMapStrings = f: list: concatStrings (map f list); + concatStrings = builtins.concatStringsSep ""; + + # https://github.com/NixOS/nixpkgs/blob/8a9f58a375c401b96da862d969f66429def1d118/lib/attrsets.nix#L331 + optionalAttrs = cond: as: if cond then as else { }; + + # fetchTarball version that is compatible between all the versions of Nix + builtins_fetchTarball = { url, name ? null, sha256 }@attrs: + let + inherit (builtins) lessThan nixVersion fetchTarball; + in + if lessThan nixVersion "1.12" then + fetchTarball ({ inherit url; } // (optionalAttrs (name != null) { inherit name; })) + else + fetchTarball attrs; + + # fetchurl version that is compatible between all the versions of Nix + builtins_fetchurl = { url, name ? null, sha256 }@attrs: + let + inherit (builtins) lessThan nixVersion fetchurl; + in + if lessThan nixVersion "1.12" then + fetchurl ({ inherit url; } // (optionalAttrs (name != null) { inherit name; })) + else + fetchurl attrs; + + # Create the final "sources" from the config + mkSources = config: + mapAttrs + ( + name: spec: + if builtins.hasAttr "outPath" spec + then + abort + "The values in sources.json should not have an 'outPath' attribute" + else + spec // { outPath = replace name (fetch config.pkgs name spec); } + ) + config.sources; + + # The "config" used by the fetchers + mkConfig = + { sourcesFile ? if builtins.pathExists ./sources.json then ./sources.json else null + , sources ? if sourcesFile == null then { } else builtins.fromJSON (builtins.readFile sourcesFile) + , system ? builtins.currentSystem + , pkgs ? mkPkgs sources system + }: rec { + # The sources, i.e. the attribute set of spec name to spec + inherit sources; + + # The "pkgs" (evaluated nixpkgs) to use for e.g. non-builtin fetchers + inherit pkgs; + }; + +in +mkSources (mkConfig { }) // { __functor = _: settings: mkSources (mkConfig settings); } diff --git a/sensors.cabal b/sensors.cabal new file mode 100644 index 0000000..844aae2 --- /dev/null +++ b/sensors.cabal @@ -0,0 +1,47 @@ +cabal-version: 3.4 +name: sensors +version: 0.1.0.0 +license: BSD-3-Clause +license-file: LICENSE +maintainer: aforemny@posteo.de +author: Alexander Foremny +build-type: Simple + +library + exposed-modules: Data.Sensor + hs-source-dirs: src + default-language: GHC2021 + default-extensions: + StrictData AllowAmbiguousTypes Arrows BlockArguments + DuplicateRecordFields FunctionalDependencies ImpredicativeTypes + LambdaCase MultiWayIf OverloadedRecordDot PartialTypeSignatures + RecordWildCards ViewPatterns + + ghc-options: -Wall -fno-warn-name-shadowing + build-depends: + base, + containers, + hspec, + mtl, + stm, + unliftio + +test-suite sensors-test + type: exitcode-stdio-1.0 + main-is: Spec.hs + hs-source-dirs: test + other-modules: + AggregateSpec + PureSpec + + default-language: GHC2021 + default-extensions: + Arrows BlockArguments NoFieldSelectors OverloadedRecordDot + UndecidableInstances + + ghc-options: -Wall + build-depends: + base, + hspec, + mtl, + sensors diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..a6bdf20 --- /dev/null +++ b/shell.nix @@ -0,0 +1 @@ +(import ./. { }).shell diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs new file mode 100644 index 0000000..29ead27 --- /dev/null +++ b/src/Data/Sensor.hs @@ -0,0 +1,491 @@ +module Data.Sensor where + +import Control.Applicative +import Control.Arrow +import Control.Category (Category (..)) +import Control.Category qualified as C +import Control.Concurrent.STM (retry) +import Control.Exception (AsyncException (ThreadKilled)) +import Control.Exception qualified as E +import Control.Monad +import Control.Monad.Reader +import Data.Dynamic +import Data.List +import Data.Map (Map) +import Data.Map qualified as M +import Data.Maybe +import Data.Set (Set) +import Data.Set qualified as S +import UnliftIO +import UnliftIO.Concurrent hiding (yield) +import Prelude hiding (id, (.)) + +newtype Sensor m a b = Sensor' + { unSensor :: MSF m (Stale (Value a)) (Stale (Value b)) + } + +mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b +mkSensor = Sensor' . ignoreStaleInput + +ignoreStaleInput :: + (Monad m) => + MSF m (Value a) (Stale (Value b)) -> + MSF m (Stale (Value a)) (Stale (Value b)) +ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do + case (i, x') of + (Stale _, Just x) -> returnA -< (Stale x, x') + _ -> do + x <- sf -< i.unwrap + returnA -< (x, Just x.unwrap) + +instance (Monad m) => Functor (Sensor m a) where + f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh) + +(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b) +f <$$> x = fmap f <$> x + +infixl 4 <$$> + +instance (Monad m) => Applicative (Sensor m a) where + pure x = mkSensor do loop Fresh + where + loop stale = + MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale) + + f <*> x = + mkSensor + ( liftA2 + (combineStaleValue ($)) + (f.unSensor <<< arr Fresh) + (x.unSensor <<< arr Fresh) + ) + +instance (Monad m) => Category (Sensor m) where + id = mkSensor (C.id <<< arr Fresh) + + sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh)) + +instance (Monad m) => Arrow (Sensor m) where + arr f = mkSensor (arr (f <$$>) <<< arr Fresh) + + first :: Sensor m a b -> Sensor m (a, d) (b, d) + first sf = + mkSensor + ( arr (\(x, d) -> (,d) <$$> x) + <<< first (sf.unSensor <<< arr Fresh) + <<< arr (\x -> (fst <$> x, snd (x.unwrap))) + ) + +feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b +feedbackS c sf = + mkSensor . feedback c $ + arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap))) + <<< (sf.unSensor <<< arr Fresh) + <<< arr (\(x, d) -> (,d) <$> x) + +data Stale a + = Stale {unwrap :: a} + | Fresh {unwrap :: a} + | Input {unwrap :: a} + deriving (Functor, Show) + +instance (Monad m) => Monad (Sensor m a) where + m >>= f = + mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v -> + (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh) + +switchOnFresh :: + (Monad m) => + MSF m a (Stale (Value b)) -> + MSF m a (Stale (Value b), Bool) +switchOnFresh = + fmap + ( \v -> + ( v, + case v of + Fresh _ -> True + _ -> False + ) + ) + +switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b +switch m f = feedback Nothing $ go m f + where + go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b)) + go m f = MSF $ \(i, k') -> do + ((x, me), m') <- m.unMSF i + let k = case (me, k') of + (False, Just k) -> k + _ -> f x + (z, k') <- k.unMSF i + pure ((z, (Just k')), go m' f) + +data Value a = Value + { dependsOn :: Set String, + atTick :: Map String Int, + unwrap :: a + } + deriving (Show, Functor, Eq) + +combineStaleValue :: + (a -> b -> c) -> + Stale (Value a) -> + Stale (Value b) -> + Stale (Value c) +combineStaleValue f = + combineStale (combineValue f) + +combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c +combineValue f a b = + Value + (S.union a.dependsOn b.dependsOn) + (M.unionWith max a.atTick b.atTick) + (f a.unwrap b.unwrap) + +combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c +combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap) +combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap) +combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap) +combineStale f a b = Fresh (f a.unwrap b.unwrap) + +class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where + aggregate :: s -> AggregateT s m () + +newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a} + +deriving instance (Functor m) => Functor (AggregateT s m) + +deriving instance (Applicative m) => Applicative (AggregateT s m) + +deriving instance (Monad m) => Monad (AggregateT s m) + +deriving instance (MonadIO m) => MonadIO (AggregateT s m) + +instance MonadTrans (AggregateT s) where + lift = AggregateT . lift + +deriving instance (MonadFail m) => MonadFail (AggregateT s m) + +deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m) + +deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m) + +-- XXX aggregates should be keyed on something else than `String` +data AggregateE m = AggregateE + { readAggregatesM :: TMVar (S.Set String), + currentTickM :: TMVar Int, + localQueueT :: TChan (), + self :: String + } + +yield :: (Aggregate m s a) => a -> AggregateT s m () +yield x = do + SensorE {liveAggregatesM, globalQueueT} <- lift askS + AggregateE {readAggregatesM, currentTickM, self} <- ask + atomically do + writeTChan globalQueueT () + liveAggregates <- readTMVar liveAggregatesM + valueT <- do + case M.lookup self liveAggregates of + Nothing -> retry + Just LiveAggregate {valueT} -> pure valueT + dependsOn <- takeTMVar readAggregatesM + putTMVar readAggregatesM S.empty + currentTick <- takeTMVar currentTickM + putTMVar currentTickM (currentTick + 1) + let atTick = M.singleton self currentTick + let v = Value (S.insert self dependsOn) atTick (toDyn x) + writeTVar valueT (Just v) + +sensor :: (Aggregate m s a) => s -> Sensor m () a +sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do + SensorE {liveAggregatesM, startAggregateT} <- askS + atomically do writeTQueue startAggregateT (AnyAggregate s) + atomically do + liveAggregates <- readTMVar liveAggregatesM + valueT <- case M.lookup (show s) liveAggregates of + Nothing -> retry + Just LiveAggregate {valueT} -> pure valueT + v <- + readTVar valueT >>= \case + Nothing -> retry + Just v -> pure v + let stale + | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh + | otherwise = Stale + pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v)) + +data Restart = Restart String SomeException deriving (Show) + +instance Exception Restart + +-- XXX merge `sample'` and `sample` +sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m () +sample' f s = do + SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS + outputT <- newTVarIO Nothing + + _ <- forkIO $ forever do + startAggregate =<< atomically do readTQueue startAggregateT + + h <- + reactInit + ( ( arrM $ \v -> do + deadAggregates <- atomically do + case v of + Input _ -> error "Input" + Stale _ -> do + writeTVar outputT Nothing + pure M.empty + Fresh v -> do + writeTVar outputT (Just v.unwrap) + liveAggregates <- readTMVar liveAggregatesM + globalTick <- takeTMVar globalTickM + putTMVar globalTickM (globalTick + 1) + pure + ( M.filterWithKey + ( \key liveAggregate -> + not (S.member key v.dependsOn) + && liveAggregate.spawnedAt < globalTick + ) + liveAggregates + ) + mapM_ + ( \liveAggregate -> do + liftIO (killThread (liveAggregate.threadId)) + ) + deadAggregates + ) + <<< s.unSensor + <<< arr (Input . Value S.empty M.empty) + ) + let loop c = do + mask $ \restore -> + catch + ( restore do + react h + c' <- + maybe (pure c) (fmap Just . f c) + =<< atomically do readTVar outputT + _ <- atomically do readTChan globalQueueT + pure c' + ) + (\(Restart _ _) -> pure c) + >>= loop + loop Nothing + +runSensorT :: (MonadIO m) => SensorT m a -> m a +runSensorT m = do + runReaderT m.unSensorT =<< sensorE + +sensorE :: (MonadIO m) => m (SensorE (t m)) +sensorE = do + liveAggregatesM <- newTMVarIO M.empty + startAggregateT <- newTQueueIO + globalQueueT <- newTChanIO + globalTickM <- newTMVarIO 0 + mainThread <- myThreadId + pure SensorE {..} + +sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a] +sample n s = do + SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS + outputsT <- newTVarIO [] + + _ <- forkIO $ forever do + startAggregate =<< atomically do readTQueue startAggregateT + + h <- + reactInit + ( ( arrM $ \v -> do + case v of + Input _ -> error "Input" + Stale _ -> pure () + Fresh v -> do + deadAggregates <- atomically do + writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT + liveAggregates <- readTMVar liveAggregatesM + globalTick <- takeTMVar globalTickM + putTMVar globalTickM (globalTick + 1) + pure + ( M.filterWithKey + ( \key liveAggregate -> + not (S.member key v.dependsOn) + && liveAggregate.spawnedAt < globalTick + ) + liveAggregates + ) + mapM_ (liftIO . killThread . (.threadId)) deadAggregates + ) + <<< s.unSensor + <<< arr (Input . Value S.empty M.empty) + ) + let loop = do + react h + xs <- atomically do readTVar outputsT + if length xs >= n + then pure (reverse xs) + else do + _ <- atomically do readTChan globalQueueT + loop + loop + +newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a} + +deriving instance (Functor m) => Functor (SensorT m) + +deriving instance (Applicative m) => Applicative (SensorT m) + +deriving instance (Monad m) => Monad (SensorT m) + +deriving instance (MonadIO m) => MonadIO (SensorT m) + +instance MonadTrans SensorT where + lift = SensorT . lift + +deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m) + +deriving instance (MonadFail m) => MonadFail (SensorT m) + +deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m) + +data SensorE m = SensorE + { liveAggregatesM :: TMVar (Map String (LiveAggregate m Dynamic)), + startAggregateT :: TQueue (AnyAggregate m), + globalQueueT :: TChan (), + globalTickM :: TMVar Int, + mainThread :: ThreadId + } + +class (MonadUnliftIO m) => MonadSensor m where + askS :: m (SensorE m) + +instance (MonadUnliftIO m) => MonadSensor (SensorT m) where + askS = ask + +data AnyAggregate m where + AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m + +data LiveAggregate m a = LiveAggregate + { valueT :: TVar (Maybe (Value a)), + threadId :: ThreadId, + spawnedAt :: Int + } + +startAggregate :: (MonadSensor m) => AnyAggregate m -> m () +startAggregate (AnyAggregate s) = do + let self = show s + SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS + liveAggregates <- atomically do takeTMVar liveAggregatesM + if M.member self liveAggregates + then atomically do putTMVar liveAggregatesM liveAggregates + else do + readAggregatesM <- newTMVarIO S.empty + currentTickM <- newTMVarIO 0 + localQueueT <- atomically do dupTChan globalQueueT + + threadId <- + forkFinally + (runReaderT (aggregate s).unAggregateT AggregateE {..}) + ( either + ( \e -> + if + | fromException e == Just ThreadKilled -> do + dropAggregate self + | otherwise -> do + dropAggregate self + liftIO (E.throwTo mainThread (Restart self e)) + ) + pure + ) + valueT <- newTVarIO Nothing + atomically do + spawnedAt <- readTMVar globalTickM + putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates) + +dropAggregate :: (MonadSensor m) => String -> m () +dropAggregate self = do + SensorE {liveAggregatesM} <- askS + atomically do + liveAggregates <- takeTMVar liveAggregatesM + putTMVar liveAggregatesM (M.delete self liveAggregates) + +-- XXX we should depend on `dunai` +data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)} + +instance (Monad m) => Category (MSF m) where + id = go + where + go = MSF $ \a -> return (a, go) + + sf2 . sf1 = MSF $ \a -> do + (b, sf1') <- unMSF sf1 a + (c, sf2') <- unMSF sf2 b + let sf' = sf2' . sf1' + c `seq` return (c, sf') + +instance (Monad m) => Arrow (MSF m) where + arr f = arrM (return . f) + + first = + morphGS $ \f (a, c) -> do + (b, msf') <- f a + return ((b, c), msf') + +instance (Monad m) => Functor (MSF m a) where + fmap f msf = msf >>> arr f + +instance (Functor m, Monad m) => Applicative (MSF m a) where + pure = arr . const + + fs <*> bs = (fs &&& bs) >>> arr (uncurry ($)) + +morphGS :: + (Monad m2) => + (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) -> + MSF m1 a1 b1 -> + MSF m2 a2 b2 +morphGS morph msf = MSF $ \a2 -> do + (b2, msf') <- morph (unMSF msf) a2 + return (b2, morphGS morph msf') + +feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b +feedback c sf = MSF $ \a -> do + ((b', c'), sf') <- unMSF sf (a, c) + return (b', feedback c' sf') + +embed :: (Monad m) => MSF m a b -> [a] -> m [b] +embed _ [] = return [] +embed sf (a : as) = do + (b, sf') <- unMSF sf a + bs <- embed sf' as + return (b : bs) + +reactimate :: (Monad m) => MSF m () () -> m () +reactimate sf = do + (_, sf') <- unMSF sf () + reactimate sf' + +arrM :: (Monad m) => (a -> m b) -> MSF m a b +arrM f = + morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id + +type ReactHandle m = IORef (MSF m () ()) + +reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m) +reactInit = liftIO . newIORef + +react :: (MonadIO m) => ReactHandle m -> m () +react handle = do + msf <- liftIO $ readIORef handle + (_, msf') <- unMSF msf () + liftIO $ writeIORef handle msf' + +instance (Monad m) => ArrowChoice (MSF m) where + left :: MSF m a b -> MSF m (Either a c) (Either b c) + left sf = MSF f + where + f (Left a) = do + (b, sf') <- unMSF sf a + return (Left b, left sf') + f (Right c) = return (Right c, left sf) diff --git a/test/AggregateSpec.hs b/test/AggregateSpec.hs new file mode 100644 index 0000000..71bf420 --- /dev/null +++ b/test/AggregateSpec.hs @@ -0,0 +1,102 @@ +module AggregateSpec (spec) where + +import Control.Arrow +import Control.Concurrent (threadDelay) +import Control.Monad (forM_) +import Control.Monad.Trans (MonadIO, liftIO) +import Data.List (intercalate) +import Data.Sensor qualified as S +import Test.Hspec + +spec :: SpecWith () +spec = do + describe "current time" do + it "date" do + S.runSensorT (S.sample 1 date) >>= (`shouldBe` ["1970-01-01"]) + it "time" do + S.runSensorT (S.sample 1 time) >>= (`shouldBe` ["00:00:00"]) + it "date and time" do + S.runSensorT (S.sample 1 (intercalate " " <$> sequence [date, time])) + >>= (`shouldBe` ["1970-01-01 00:00:00"]) + describe "timing" do + it "fast, fast" do + S.runSensorT (S.sample 2 ((,) <$> count <*> count)) + >>= (`shouldBe` [(1, 1), (2, 2)]) + it "fast, slow" do + S.runSensorT (S.sample 4 ((,) <$> count <*> slowCount)) + >>= (`shouldBe` [(1, 1), (2, 1), (2, 2), (3, 2)]) + S.runSensorT (S.sample 3 (const <$> count <*> slowCount)) + >>= (`shouldBe` [1, 2, 2]) + S.runSensorT (S.sample 3 (flip const <$> count <*> slowCount)) + >>= (`shouldBe` [1, 1, 2]) + describe "square count" do + it "count" do + S.runSensorT (S.sample 2 count) >>= (`shouldBe` [1, 2]) + it "square" do + S.runSensorT (S.sample 2 (count >>= square)) >>= (`shouldBe` [1, 4]) + describe "diagram" do + it "count diagram" do + S.runSensorT (S.sample 3 (diagram 2 count)) + >>= (`shouldBe` [[1 :: Int], [1, 2], [2, 3]]) + S.runSensorT (S.sample 5 (diagram 2 slowCount <* count)) + >>= (`shouldBe` [[1 :: Int], [1], [1, 2], [1, 2], [2, 3]]) + +diagram :: (Monad m) => Int -> S.Sensor m () a -> S.Sensor m () [a] +diagram n sf = S.feedbackS [] $ proc ((), xs) -> do + x <- sf -< () + returnA -< (reverse (x : xs), take (n - 1) (x : xs)) + +count :: (S.MonadSensor m) => S.Sensor m () Int +count = S.sensor Count + +data Count = Count deriving (Show) + +instance (S.MonadSensor m) => S.Aggregate m Count Int where + aggregate _ = forM_ [1, 2 ..] $ \n -> do + S.yield n + sleep + +slowCount :: (S.MonadSensor m) => S.Sensor m () Int +slowCount = S.sensor SlowCount + +data SlowCount = SlowCount deriving (Show) + +instance (S.MonadSensor m) => S.Aggregate m SlowCount Int where + aggregate _ = forM_ [1, 2 ..] $ \n -> do + S.yield n + sleepLong + +sleep :: (MonadIO m) => m () +sleep = liftIO (threadDelay 2_000) + +sleepLong :: (MonadIO m) => m () +sleepLong = liftIO (threadDelay 3_000) + +square :: (S.MonadSensor m) => Int -> S.Sensor m () Int +square = S.sensor . Square + +data Square = Square Int deriving (Show) + +instance (S.MonadSensor m) => S.Aggregate m Square Int where + aggregate (Square n) = do + S.yield (n * n) + +currentTime :: (S.MonadSensor m) => S.Sensor m () UTCTime +currentTime = S.sensor CurrentTime + +data CurrentTime = CurrentTime deriving (Show) + +data UTCTime = UTCTime + { date :: String, + time :: String + } + deriving (Show) + +instance (S.MonadSensor m) => S.Aggregate m CurrentTime UTCTime where + aggregate _ = S.yield (UTCTime "1970-01-01" "00:00:00") + +date :: (S.MonadSensor m) => S.Sensor m () String +date = (.date) <$> currentTime + +time :: (S.MonadSensor m) => S.Sensor m () String +time = (.time) <$> currentTime diff --git a/test/PureSpec.hs b/test/PureSpec.hs new file mode 100644 index 0000000..5f718e7 --- /dev/null +++ b/test/PureSpec.hs @@ -0,0 +1,10 @@ +module PureSpec (spec) where + +import Data.Sensor qualified as S +import Test.Hspec + +spec :: Spec +spec = do + describe "Pure" do + it "pure" do + S.runSensorT (S.sample 1 (pure "")) >>= (`shouldBe` [""]) diff --git a/test/Spec.hs b/test/Spec.hs new file mode 100644 index 0000000..a824f8c --- /dev/null +++ b/test/Spec.hs @@ -0,0 +1 @@ +{-# OPTIONS_GHC -F -pgmF hspec-discover #-} |