summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar Alexander Foremny <aforemny@posteo.de>2024-07-25 09:42:11 +0200
committerLibravatar Alexander Foremny <aforemny@posteo.de>2024-08-08 15:46:58 +0200
commit272b3ace747857729171780edae898819d211832 (patch)
tree5e791779a318521175104a7e9140dc186c4b3d54
init
-rw-r--r--.envrc1
-rw-r--r--.gitignore1
-rw-r--r--LICENSE30
-rw-r--r--default.nix17
-rw-r--r--nix/sources.json14
-rw-r--r--nix/sources.nix198
-rw-r--r--sensors.cabal47
-rw-r--r--shell.nix1
-rw-r--r--src/Data/Sensor.hs491
-rw-r--r--test/AggregateSpec.hs102
-rw-r--r--test/PureSpec.hs10
-rw-r--r--test/Spec.hs1
12 files changed, 913 insertions, 0 deletions
diff --git a/.envrc b/.envrc
new file mode 100644
index 0000000..1d953f4
--- /dev/null
+++ b/.envrc
@@ -0,0 +1 @@
+use nix
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..8075013
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+/dist-newstyle
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..c90516a
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,30 @@
+Copyright (c) 2024, Alexander Foremny
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+
+ * Neither the name of Alexander Foremny nor the names of other
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/default.nix b/default.nix
new file mode 100644
index 0000000..3de5581
--- /dev/null
+++ b/default.nix
@@ -0,0 +1,17 @@
+{ pkgs ? import (import ./nix/sources.nix).nixpkgs { } }: rec {
+ haskellPackages = pkgs.haskellPackages.override {
+ overrides = self: super: {
+ sensors = self.callCabal2nix "sensors" ./. { };
+ };
+ };
+ inherit (haskellPackages) sensors;
+ shell = haskellPackages.shellFor {
+ packages = _: [ sensors ];
+ buildInputs = [
+ haskellPackages.cabal-install
+ haskellPackages.ormolu
+ pkgs.niv
+ ];
+ withHoogle = true;
+ };
+}
diff --git a/nix/sources.json b/nix/sources.json
new file mode 100644
index 0000000..e36b267
--- /dev/null
+++ b/nix/sources.json
@@ -0,0 +1,14 @@
+{
+ "nixpkgs": {
+ "branch": "release-24.05",
+ "description": "Nix Packages collection",
+ "homepage": null,
+ "owner": "NixOS",
+ "repo": "nixpkgs",
+ "rev": "36097d5a3f3ce6334a35717edeb90dba184d7081",
+ "sha256": "0ygi382h7sslzpzpcacbi9pdplldr5mqn5qq97lcwlxs9ksk1z64",
+ "type": "tarball",
+ "url": "https://github.com/NixOS/nixpkgs/archive/36097d5a3f3ce6334a35717edeb90dba184d7081.tar.gz",
+ "url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
+ }
+}
diff --git a/nix/sources.nix b/nix/sources.nix
new file mode 100644
index 0000000..fe3dadf
--- /dev/null
+++ b/nix/sources.nix
@@ -0,0 +1,198 @@
+# This file has been generated by Niv.
+
+let
+
+ #
+ # The fetchers. fetch_<type> fetches specs of type <type>.
+ #
+
+ fetch_file = pkgs: name: spec:
+ let
+ name' = sanitizeName name + "-src";
+ in
+ if spec.builtin or true then
+ builtins_fetchurl { inherit (spec) url sha256; name = name'; }
+ else
+ pkgs.fetchurl { inherit (spec) url sha256; name = name'; };
+
+ fetch_tarball = pkgs: name: spec:
+ let
+ name' = sanitizeName name + "-src";
+ in
+ if spec.builtin or true then
+ builtins_fetchTarball { name = name'; inherit (spec) url sha256; }
+ else
+ pkgs.fetchzip { name = name'; inherit (spec) url sha256; };
+
+ fetch_git = name: spec:
+ let
+ ref =
+ spec.ref or (
+ if spec ? branch then "refs/heads/${spec.branch}" else
+ if spec ? tag then "refs/tags/${spec.tag}" else
+ abort "In git source '${name}': Please specify `ref`, `tag` or `branch`!"
+ );
+ submodules = spec.submodules or false;
+ submoduleArg =
+ let
+ nixSupportsSubmodules = builtins.compareVersions builtins.nixVersion "2.4" >= 0;
+ emptyArgWithWarning =
+ if submodules
+ then
+ builtins.trace
+ (
+ "The niv input \"${name}\" uses submodules "
+ + "but your nix's (${builtins.nixVersion}) builtins.fetchGit "
+ + "does not support them"
+ )
+ { }
+ else { };
+ in
+ if nixSupportsSubmodules
+ then { inherit submodules; }
+ else emptyArgWithWarning;
+ in
+ builtins.fetchGit
+ ({ url = spec.repo; inherit (spec) rev; inherit ref; } // submoduleArg);
+
+ fetch_local = spec: spec.path;
+
+ fetch_builtin-tarball = name: throw
+ ''[${name}] The niv type "builtin-tarball" is deprecated. You should instead use `builtin = true`.
+ $ niv modify ${name} -a type=tarball -a builtin=true'';
+
+ fetch_builtin-url = name: throw
+ ''[${name}] The niv type "builtin-url" will soon be deprecated. You should instead use `builtin = true`.
+ $ niv modify ${name} -a type=file -a builtin=true'';
+
+ #
+ # Various helpers
+ #
+
+ # https://github.com/NixOS/nixpkgs/pull/83241/files#diff-c6f540a4f3bfa4b0e8b6bafd4cd54e8bR695
+ sanitizeName = name:
+ (
+ concatMapStrings (s: if builtins.isList s then "-" else s)
+ (
+ builtins.split "[^[:alnum:]+._?=-]+"
+ ((x: builtins.elemAt (builtins.match "\\.*(.*)" x) 0) name)
+ )
+ );
+
+ # The set of packages used when specs are fetched using non-builtins.
+ mkPkgs = sources: system:
+ let
+ sourcesNixpkgs =
+ import (builtins_fetchTarball { inherit (sources.nixpkgs) url sha256; }) { inherit system; };
+ hasNixpkgsPath = builtins.any (x: x.prefix == "nixpkgs") builtins.nixPath;
+ hasThisAsNixpkgsPath = <nixpkgs> == ./.;
+ in
+ if builtins.hasAttr "nixpkgs" sources
+ then sourcesNixpkgs
+ else if hasNixpkgsPath && ! hasThisAsNixpkgsPath then
+ import <nixpkgs> { }
+ else
+ abort
+ ''
+ Please specify either <nixpkgs> (through -I or NIX_PATH=nixpkgs=...) or
+ add a package called "nixpkgs" to your sources.json.
+ '';
+
+ # The actual fetching function.
+ fetch = pkgs: name: spec:
+
+ if ! builtins.hasAttr "type" spec then
+ abort "ERROR: niv spec ${name} does not have a 'type' attribute"
+ else if spec.type == "file" then fetch_file pkgs name spec
+ else if spec.type == "tarball" then fetch_tarball pkgs name spec
+ else if spec.type == "git" then fetch_git name spec
+ else if spec.type == "local" then fetch_local spec
+ else if spec.type == "builtin-tarball" then fetch_builtin-tarball name
+ else if spec.type == "builtin-url" then fetch_builtin-url name
+ else
+ abort "ERROR: niv spec ${name} has unknown type ${builtins.toJSON spec.type}";
+
+ # If the environment variable NIV_OVERRIDE_${name} is set, then use
+ # the path directly as opposed to the fetched source.
+ replace = name: drv:
+ let
+ saneName = stringAsChars (c: if (builtins.match "[a-zA-Z0-9]" c) == null then "_" else c) name;
+ ersatz = builtins.getEnv "NIV_OVERRIDE_${saneName}";
+ in
+ if ersatz == "" then drv else
+ # this turns the string into an actual Nix path (for both absolute and
+ # relative paths)
+ if builtins.substring 0 1 ersatz == "/" then /. + ersatz else /. + builtins.getEnv "PWD" + "/${ersatz}";
+
+ # Ports of functions for older nix versions
+
+ # a Nix version of mapAttrs if the built-in doesn't exist
+ mapAttrs = builtins.mapAttrs or (
+ f: set: with builtins;
+ listToAttrs (map (attr: { name = attr; value = f attr set.${attr}; }) (attrNames set))
+ );
+
+ # https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/lists.nix#L295
+ range = first: last: if first > last then [ ] else builtins.genList (n: first + n) (last - first + 1);
+
+ # https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L257
+ stringToCharacters = s: map (p: builtins.substring p 1 s) (range 0 (builtins.stringLength s - 1));
+
+ # https://github.com/NixOS/nixpkgs/blob/0258808f5744ca980b9a1f24fe0b1e6f0fecee9c/lib/strings.nix#L269
+ stringAsChars = f: s: concatStrings (map f (stringToCharacters s));
+ concatMapStrings = f: list: concatStrings (map f list);
+ concatStrings = builtins.concatStringsSep "";
+
+ # https://github.com/NixOS/nixpkgs/blob/8a9f58a375c401b96da862d969f66429def1d118/lib/attrsets.nix#L331
+ optionalAttrs = cond: as: if cond then as else { };
+
+ # fetchTarball version that is compatible between all the versions of Nix
+ builtins_fetchTarball = { url, name ? null, sha256 }@attrs:
+ let
+ inherit (builtins) lessThan nixVersion fetchTarball;
+ in
+ if lessThan nixVersion "1.12" then
+ fetchTarball ({ inherit url; } // (optionalAttrs (name != null) { inherit name; }))
+ else
+ fetchTarball attrs;
+
+ # fetchurl version that is compatible between all the versions of Nix
+ builtins_fetchurl = { url, name ? null, sha256 }@attrs:
+ let
+ inherit (builtins) lessThan nixVersion fetchurl;
+ in
+ if lessThan nixVersion "1.12" then
+ fetchurl ({ inherit url; } // (optionalAttrs (name != null) { inherit name; }))
+ else
+ fetchurl attrs;
+
+ # Create the final "sources" from the config
+ mkSources = config:
+ mapAttrs
+ (
+ name: spec:
+ if builtins.hasAttr "outPath" spec
+ then
+ abort
+ "The values in sources.json should not have an 'outPath' attribute"
+ else
+ spec // { outPath = replace name (fetch config.pkgs name spec); }
+ )
+ config.sources;
+
+ # The "config" used by the fetchers
+ mkConfig =
+ { sourcesFile ? if builtins.pathExists ./sources.json then ./sources.json else null
+ , sources ? if sourcesFile == null then { } else builtins.fromJSON (builtins.readFile sourcesFile)
+ , system ? builtins.currentSystem
+ , pkgs ? mkPkgs sources system
+ }: rec {
+ # The sources, i.e. the attribute set of spec name to spec
+ inherit sources;
+
+ # The "pkgs" (evaluated nixpkgs) to use for e.g. non-builtin fetchers
+ inherit pkgs;
+ };
+
+in
+mkSources (mkConfig { }) // { __functor = _: settings: mkSources (mkConfig settings); }
diff --git a/sensors.cabal b/sensors.cabal
new file mode 100644
index 0000000..844aae2
--- /dev/null
+++ b/sensors.cabal
@@ -0,0 +1,47 @@
+cabal-version: 3.4
+name: sensors
+version: 0.1.0.0
+license: BSD-3-Clause
+license-file: LICENSE
+maintainer: aforemny@posteo.de
+author: Alexander Foremny
+build-type: Simple
+
+library
+ exposed-modules: Data.Sensor
+ hs-source-dirs: src
+ default-language: GHC2021
+ default-extensions:
+ StrictData AllowAmbiguousTypes Arrows BlockArguments
+ DuplicateRecordFields FunctionalDependencies ImpredicativeTypes
+ LambdaCase MultiWayIf OverloadedRecordDot PartialTypeSignatures
+ RecordWildCards ViewPatterns
+
+ ghc-options: -Wall -fno-warn-name-shadowing
+ build-depends:
+ base,
+ containers,
+ hspec,
+ mtl,
+ stm,
+ unliftio
+
+test-suite sensors-test
+ type: exitcode-stdio-1.0
+ main-is: Spec.hs
+ hs-source-dirs: test
+ other-modules:
+ AggregateSpec
+ PureSpec
+
+ default-language: GHC2021
+ default-extensions:
+ Arrows BlockArguments NoFieldSelectors OverloadedRecordDot
+ UndecidableInstances
+
+ ghc-options: -Wall
+ build-depends:
+ base,
+ hspec,
+ mtl,
+ sensors
diff --git a/shell.nix b/shell.nix
new file mode 100644
index 0000000..a6bdf20
--- /dev/null
+++ b/shell.nix
@@ -0,0 +1 @@
+(import ./. { }).shell
diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs
new file mode 100644
index 0000000..29ead27
--- /dev/null
+++ b/src/Data/Sensor.hs
@@ -0,0 +1,491 @@
+module Data.Sensor where
+
+import Control.Applicative
+import Control.Arrow
+import Control.Category (Category (..))
+import Control.Category qualified as C
+import Control.Concurrent.STM (retry)
+import Control.Exception (AsyncException (ThreadKilled))
+import Control.Exception qualified as E
+import Control.Monad
+import Control.Monad.Reader
+import Data.Dynamic
+import Data.List
+import Data.Map (Map)
+import Data.Map qualified as M
+import Data.Maybe
+import Data.Set (Set)
+import Data.Set qualified as S
+import UnliftIO
+import UnliftIO.Concurrent hiding (yield)
+import Prelude hiding (id, (.))
+
+newtype Sensor m a b = Sensor'
+ { unSensor :: MSF m (Stale (Value a)) (Stale (Value b))
+ }
+
+mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b
+mkSensor = Sensor' . ignoreStaleInput
+
+ignoreStaleInput ::
+ (Monad m) =>
+ MSF m (Value a) (Stale (Value b)) ->
+ MSF m (Stale (Value a)) (Stale (Value b))
+ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do
+ case (i, x') of
+ (Stale _, Just x) -> returnA -< (Stale x, x')
+ _ -> do
+ x <- sf -< i.unwrap
+ returnA -< (x, Just x.unwrap)
+
+instance (Monad m) => Functor (Sensor m a) where
+ f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh)
+
+(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b)
+f <$$> x = fmap f <$> x
+
+infixl 4 <$$>
+
+instance (Monad m) => Applicative (Sensor m a) where
+ pure x = mkSensor do loop Fresh
+ where
+ loop stale =
+ MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale)
+
+ f <*> x =
+ mkSensor
+ ( liftA2
+ (combineStaleValue ($))
+ (f.unSensor <<< arr Fresh)
+ (x.unSensor <<< arr Fresh)
+ )
+
+instance (Monad m) => Category (Sensor m) where
+ id = mkSensor (C.id <<< arr Fresh)
+
+ sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh))
+
+instance (Monad m) => Arrow (Sensor m) where
+ arr f = mkSensor (arr (f <$$>) <<< arr Fresh)
+
+ first :: Sensor m a b -> Sensor m (a, d) (b, d)
+ first sf =
+ mkSensor
+ ( arr (\(x, d) -> (,d) <$$> x)
+ <<< first (sf.unSensor <<< arr Fresh)
+ <<< arr (\x -> (fst <$> x, snd (x.unwrap)))
+ )
+
+feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b
+feedbackS c sf =
+ mkSensor . feedback c $
+ arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap)))
+ <<< (sf.unSensor <<< arr Fresh)
+ <<< arr (\(x, d) -> (,d) <$> x)
+
+data Stale a
+ = Stale {unwrap :: a}
+ | Fresh {unwrap :: a}
+ | Input {unwrap :: a}
+ deriving (Functor, Show)
+
+instance (Monad m) => Monad (Sensor m a) where
+ m >>= f =
+ mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v ->
+ (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh)
+
+switchOnFresh ::
+ (Monad m) =>
+ MSF m a (Stale (Value b)) ->
+ MSF m a (Stale (Value b), Bool)
+switchOnFresh =
+ fmap
+ ( \v ->
+ ( v,
+ case v of
+ Fresh _ -> True
+ _ -> False
+ )
+ )
+
+switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b
+switch m f = feedback Nothing $ go m f
+ where
+ go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b))
+ go m f = MSF $ \(i, k') -> do
+ ((x, me), m') <- m.unMSF i
+ let k = case (me, k') of
+ (False, Just k) -> k
+ _ -> f x
+ (z, k') <- k.unMSF i
+ pure ((z, (Just k')), go m' f)
+
+data Value a = Value
+ { dependsOn :: Set String,
+ atTick :: Map String Int,
+ unwrap :: a
+ }
+ deriving (Show, Functor, Eq)
+
+combineStaleValue ::
+ (a -> b -> c) ->
+ Stale (Value a) ->
+ Stale (Value b) ->
+ Stale (Value c)
+combineStaleValue f =
+ combineStale (combineValue f)
+
+combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c
+combineValue f a b =
+ Value
+ (S.union a.dependsOn b.dependsOn)
+ (M.unionWith max a.atTick b.atTick)
+ (f a.unwrap b.unwrap)
+
+combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c
+combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
+combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
+combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap)
+combineStale f a b = Fresh (f a.unwrap b.unwrap)
+
+class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where
+ aggregate :: s -> AggregateT s m ()
+
+newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a}
+
+deriving instance (Functor m) => Functor (AggregateT s m)
+
+deriving instance (Applicative m) => Applicative (AggregateT s m)
+
+deriving instance (Monad m) => Monad (AggregateT s m)
+
+deriving instance (MonadIO m) => MonadIO (AggregateT s m)
+
+instance MonadTrans (AggregateT s) where
+ lift = AggregateT . lift
+
+deriving instance (MonadFail m) => MonadFail (AggregateT s m)
+
+deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m)
+
+deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m)
+
+-- XXX aggregates should be keyed on something else than `String`
+data AggregateE m = AggregateE
+ { readAggregatesM :: TMVar (S.Set String),
+ currentTickM :: TMVar Int,
+ localQueueT :: TChan (),
+ self :: String
+ }
+
+yield :: (Aggregate m s a) => a -> AggregateT s m ()
+yield x = do
+ SensorE {liveAggregatesM, globalQueueT} <- lift askS
+ AggregateE {readAggregatesM, currentTickM, self} <- ask
+ atomically do
+ writeTChan globalQueueT ()
+ liveAggregates <- readTMVar liveAggregatesM
+ valueT <- do
+ case M.lookup self liveAggregates of
+ Nothing -> retry
+ Just LiveAggregate {valueT} -> pure valueT
+ dependsOn <- takeTMVar readAggregatesM
+ putTMVar readAggregatesM S.empty
+ currentTick <- takeTMVar currentTickM
+ putTMVar currentTickM (currentTick + 1)
+ let atTick = M.singleton self currentTick
+ let v = Value (S.insert self dependsOn) atTick (toDyn x)
+ writeTVar valueT (Just v)
+
+sensor :: (Aggregate m s a) => s -> Sensor m () a
+sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do
+ SensorE {liveAggregatesM, startAggregateT} <- askS
+ atomically do writeTQueue startAggregateT (AnyAggregate s)
+ atomically do
+ liveAggregates <- readTMVar liveAggregatesM
+ valueT <- case M.lookup (show s) liveAggregates of
+ Nothing -> retry
+ Just LiveAggregate {valueT} -> pure valueT
+ v <-
+ readTVar valueT >>= \case
+ Nothing -> retry
+ Just v -> pure v
+ let stale
+ | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh
+ | otherwise = Stale
+ pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v))
+
+data Restart = Restart String SomeException deriving (Show)
+
+instance Exception Restart
+
+-- XXX merge `sample'` and `sample`
+sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m ()
+sample' f s = do
+ SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS
+ outputT <- newTVarIO Nothing
+
+ _ <- forkIO $ forever do
+ startAggregate =<< atomically do readTQueue startAggregateT
+
+ h <-
+ reactInit
+ ( ( arrM $ \v -> do
+ deadAggregates <- atomically do
+ case v of
+ Input _ -> error "Input"
+ Stale _ -> do
+ writeTVar outputT Nothing
+ pure M.empty
+ Fresh v -> do
+ writeTVar outputT (Just v.unwrap)
+ liveAggregates <- readTMVar liveAggregatesM
+ globalTick <- takeTMVar globalTickM
+ putTMVar globalTickM (globalTick + 1)
+ pure
+ ( M.filterWithKey
+ ( \key liveAggregate ->
+ not (S.member key v.dependsOn)
+ && liveAggregate.spawnedAt < globalTick
+ )
+ liveAggregates
+ )
+ mapM_
+ ( \liveAggregate -> do
+ liftIO (killThread (liveAggregate.threadId))
+ )
+ deadAggregates
+ )
+ <<< s.unSensor
+ <<< arr (Input . Value S.empty M.empty)
+ )
+ let loop c = do
+ mask $ \restore ->
+ catch
+ ( restore do
+ react h
+ c' <-
+ maybe (pure c) (fmap Just . f c)
+ =<< atomically do readTVar outputT
+ _ <- atomically do readTChan globalQueueT
+ pure c'
+ )
+ (\(Restart _ _) -> pure c)
+ >>= loop
+ loop Nothing
+
+runSensorT :: (MonadIO m) => SensorT m a -> m a
+runSensorT m = do
+ runReaderT m.unSensorT =<< sensorE
+
+sensorE :: (MonadIO m) => m (SensorE (t m))
+sensorE = do
+ liveAggregatesM <- newTMVarIO M.empty
+ startAggregateT <- newTQueueIO
+ globalQueueT <- newTChanIO
+ globalTickM <- newTMVarIO 0
+ mainThread <- myThreadId
+ pure SensorE {..}
+
+sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a]
+sample n s = do
+ SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS
+ outputsT <- newTVarIO []
+
+ _ <- forkIO $ forever do
+ startAggregate =<< atomically do readTQueue startAggregateT
+
+ h <-
+ reactInit
+ ( ( arrM $ \v -> do
+ case v of
+ Input _ -> error "Input"
+ Stale _ -> pure ()
+ Fresh v -> do
+ deadAggregates <- atomically do
+ writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT
+ liveAggregates <- readTMVar liveAggregatesM
+ globalTick <- takeTMVar globalTickM
+ putTMVar globalTickM (globalTick + 1)
+ pure
+ ( M.filterWithKey
+ ( \key liveAggregate ->
+ not (S.member key v.dependsOn)
+ && liveAggregate.spawnedAt < globalTick
+ )
+ liveAggregates
+ )
+ mapM_ (liftIO . killThread . (.threadId)) deadAggregates
+ )
+ <<< s.unSensor
+ <<< arr (Input . Value S.empty M.empty)
+ )
+ let loop = do
+ react h
+ xs <- atomically do readTVar outputsT
+ if length xs >= n
+ then pure (reverse xs)
+ else do
+ _ <- atomically do readTChan globalQueueT
+ loop
+ loop
+
+newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a}
+
+deriving instance (Functor m) => Functor (SensorT m)
+
+deriving instance (Applicative m) => Applicative (SensorT m)
+
+deriving instance (Monad m) => Monad (SensorT m)
+
+deriving instance (MonadIO m) => MonadIO (SensorT m)
+
+instance MonadTrans SensorT where
+ lift = SensorT . lift
+
+deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m)
+
+deriving instance (MonadFail m) => MonadFail (SensorT m)
+
+deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m)
+
+data SensorE m = SensorE
+ { liveAggregatesM :: TMVar (Map String (LiveAggregate m Dynamic)),
+ startAggregateT :: TQueue (AnyAggregate m),
+ globalQueueT :: TChan (),
+ globalTickM :: TMVar Int,
+ mainThread :: ThreadId
+ }
+
+class (MonadUnliftIO m) => MonadSensor m where
+ askS :: m (SensorE m)
+
+instance (MonadUnliftIO m) => MonadSensor (SensorT m) where
+ askS = ask
+
+data AnyAggregate m where
+ AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m
+
+data LiveAggregate m a = LiveAggregate
+ { valueT :: TVar (Maybe (Value a)),
+ threadId :: ThreadId,
+ spawnedAt :: Int
+ }
+
+startAggregate :: (MonadSensor m) => AnyAggregate m -> m ()
+startAggregate (AnyAggregate s) = do
+ let self = show s
+ SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS
+ liveAggregates <- atomically do takeTMVar liveAggregatesM
+ if M.member self liveAggregates
+ then atomically do putTMVar liveAggregatesM liveAggregates
+ else do
+ readAggregatesM <- newTMVarIO S.empty
+ currentTickM <- newTMVarIO 0
+ localQueueT <- atomically do dupTChan globalQueueT
+
+ threadId <-
+ forkFinally
+ (runReaderT (aggregate s).unAggregateT AggregateE {..})
+ ( either
+ ( \e ->
+ if
+ | fromException e == Just ThreadKilled -> do
+ dropAggregate self
+ | otherwise -> do
+ dropAggregate self
+ liftIO (E.throwTo mainThread (Restart self e))
+ )
+ pure
+ )
+ valueT <- newTVarIO Nothing
+ atomically do
+ spawnedAt <- readTMVar globalTickM
+ putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates)
+
+dropAggregate :: (MonadSensor m) => String -> m ()
+dropAggregate self = do
+ SensorE {liveAggregatesM} <- askS
+ atomically do
+ liveAggregates <- takeTMVar liveAggregatesM
+ putTMVar liveAggregatesM (M.delete self liveAggregates)
+
+-- XXX we should depend on `dunai`
+data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)}
+
+instance (Monad m) => Category (MSF m) where
+ id = go
+ where
+ go = MSF $ \a -> return (a, go)
+
+ sf2 . sf1 = MSF $ \a -> do
+ (b, sf1') <- unMSF sf1 a
+ (c, sf2') <- unMSF sf2 b
+ let sf' = sf2' . sf1'
+ c `seq` return (c, sf')
+
+instance (Monad m) => Arrow (MSF m) where
+ arr f = arrM (return . f)
+
+ first =
+ morphGS $ \f (a, c) -> do
+ (b, msf') <- f a
+ return ((b, c), msf')
+
+instance (Monad m) => Functor (MSF m a) where
+ fmap f msf = msf >>> arr f
+
+instance (Functor m, Monad m) => Applicative (MSF m a) where
+ pure = arr . const
+
+ fs <*> bs = (fs &&& bs) >>> arr (uncurry ($))
+
+morphGS ::
+ (Monad m2) =>
+ (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) ->
+ MSF m1 a1 b1 ->
+ MSF m2 a2 b2
+morphGS morph msf = MSF $ \a2 -> do
+ (b2, msf') <- morph (unMSF msf) a2
+ return (b2, morphGS morph msf')
+
+feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b
+feedback c sf = MSF $ \a -> do
+ ((b', c'), sf') <- unMSF sf (a, c)
+ return (b', feedback c' sf')
+
+embed :: (Monad m) => MSF m a b -> [a] -> m [b]
+embed _ [] = return []
+embed sf (a : as) = do
+ (b, sf') <- unMSF sf a
+ bs <- embed sf' as
+ return (b : bs)
+
+reactimate :: (Monad m) => MSF m () () -> m ()
+reactimate sf = do
+ (_, sf') <- unMSF sf ()
+ reactimate sf'
+
+arrM :: (Monad m) => (a -> m b) -> MSF m a b
+arrM f =
+ morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id
+
+type ReactHandle m = IORef (MSF m () ())
+
+reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m)
+reactInit = liftIO . newIORef
+
+react :: (MonadIO m) => ReactHandle m -> m ()
+react handle = do
+ msf <- liftIO $ readIORef handle
+ (_, msf') <- unMSF msf ()
+ liftIO $ writeIORef handle msf'
+
+instance (Monad m) => ArrowChoice (MSF m) where
+ left :: MSF m a b -> MSF m (Either a c) (Either b c)
+ left sf = MSF f
+ where
+ f (Left a) = do
+ (b, sf') <- unMSF sf a
+ return (Left b, left sf')
+ f (Right c) = return (Right c, left sf)
diff --git a/test/AggregateSpec.hs b/test/AggregateSpec.hs
new file mode 100644
index 0000000..71bf420
--- /dev/null
+++ b/test/AggregateSpec.hs
@@ -0,0 +1,102 @@
+module AggregateSpec (spec) where
+
+import Control.Arrow
+import Control.Concurrent (threadDelay)
+import Control.Monad (forM_)
+import Control.Monad.Trans (MonadIO, liftIO)
+import Data.List (intercalate)
+import Data.Sensor qualified as S
+import Test.Hspec
+
+spec :: SpecWith ()
+spec = do
+ describe "current time" do
+ it "date" do
+ S.runSensorT (S.sample 1 date) >>= (`shouldBe` ["1970-01-01"])
+ it "time" do
+ S.runSensorT (S.sample 1 time) >>= (`shouldBe` ["00:00:00"])
+ it "date and time" do
+ S.runSensorT (S.sample 1 (intercalate " " <$> sequence [date, time]))
+ >>= (`shouldBe` ["1970-01-01 00:00:00"])
+ describe "timing" do
+ it "fast, fast" do
+ S.runSensorT (S.sample 2 ((,) <$> count <*> count))
+ >>= (`shouldBe` [(1, 1), (2, 2)])
+ it "fast, slow" do
+ S.runSensorT (S.sample 4 ((,) <$> count <*> slowCount))
+ >>= (`shouldBe` [(1, 1), (2, 1), (2, 2), (3, 2)])
+ S.runSensorT (S.sample 3 (const <$> count <*> slowCount))
+ >>= (`shouldBe` [1, 2, 2])
+ S.runSensorT (S.sample 3 (flip const <$> count <*> slowCount))
+ >>= (`shouldBe` [1, 1, 2])
+ describe "square count" do
+ it "count" do
+ S.runSensorT (S.sample 2 count) >>= (`shouldBe` [1, 2])
+ it "square" do
+ S.runSensorT (S.sample 2 (count >>= square)) >>= (`shouldBe` [1, 4])
+ describe "diagram" do
+ it "count diagram" do
+ S.runSensorT (S.sample 3 (diagram 2 count))
+ >>= (`shouldBe` [[1 :: Int], [1, 2], [2, 3]])
+ S.runSensorT (S.sample 5 (diagram 2 slowCount <* count))
+ >>= (`shouldBe` [[1 :: Int], [1], [1, 2], [1, 2], [2, 3]])
+
+diagram :: (Monad m) => Int -> S.Sensor m () a -> S.Sensor m () [a]
+diagram n sf = S.feedbackS [] $ proc ((), xs) -> do
+ x <- sf -< ()
+ returnA -< (reverse (x : xs), take (n - 1) (x : xs))
+
+count :: (S.MonadSensor m) => S.Sensor m () Int
+count = S.sensor Count
+
+data Count = Count deriving (Show)
+
+instance (S.MonadSensor m) => S.Aggregate m Count Int where
+ aggregate _ = forM_ [1, 2 ..] $ \n -> do
+ S.yield n
+ sleep
+
+slowCount :: (S.MonadSensor m) => S.Sensor m () Int
+slowCount = S.sensor SlowCount
+
+data SlowCount = SlowCount deriving (Show)
+
+instance (S.MonadSensor m) => S.Aggregate m SlowCount Int where
+ aggregate _ = forM_ [1, 2 ..] $ \n -> do
+ S.yield n
+ sleepLong
+
+sleep :: (MonadIO m) => m ()
+sleep = liftIO (threadDelay 2_000)
+
+sleepLong :: (MonadIO m) => m ()
+sleepLong = liftIO (threadDelay 3_000)
+
+square :: (S.MonadSensor m) => Int -> S.Sensor m () Int
+square = S.sensor . Square
+
+data Square = Square Int deriving (Show)
+
+instance (S.MonadSensor m) => S.Aggregate m Square Int where
+ aggregate (Square n) = do
+ S.yield (n * n)
+
+currentTime :: (S.MonadSensor m) => S.Sensor m () UTCTime
+currentTime = S.sensor CurrentTime
+
+data CurrentTime = CurrentTime deriving (Show)
+
+data UTCTime = UTCTime
+ { date :: String,
+ time :: String
+ }
+ deriving (Show)
+
+instance (S.MonadSensor m) => S.Aggregate m CurrentTime UTCTime where
+ aggregate _ = S.yield (UTCTime "1970-01-01" "00:00:00")
+
+date :: (S.MonadSensor m) => S.Sensor m () String
+date = (.date) <$> currentTime
+
+time :: (S.MonadSensor m) => S.Sensor m () String
+time = (.time) <$> currentTime
diff --git a/test/PureSpec.hs b/test/PureSpec.hs
new file mode 100644
index 0000000..5f718e7
--- /dev/null
+++ b/test/PureSpec.hs
@@ -0,0 +1,10 @@
+module PureSpec (spec) where
+
+import Data.Sensor qualified as S
+import Test.Hspec
+
+spec :: Spec
+spec = do
+ describe "Pure" do
+ it "pure" do
+ S.runSensorT (S.sample 1 (pure "")) >>= (`shouldBe` [""])
diff --git a/test/Spec.hs b/test/Spec.hs
new file mode 100644
index 0000000..a824f8c
--- /dev/null
+++ b/test/Spec.hs
@@ -0,0 +1 @@
+{-# OPTIONS_GHC -F -pgmF hspec-discover #-}