diff --git a/.github/workflows/haskell.yml b/.github/workflows/haskell.yml index 9f4a516..3a89dd3 100644 --- a/.github/workflows/haskell.yml +++ b/.github/workflows/haskell.yml @@ -15,7 +15,7 @@ jobs: # ------------------------------------------------------------------------ CABAL_REINIT_CONFIG: y LC_ALL: C.UTF-8 - DISABLE_SDIST_BUILD: n + DISABLE_SDIST_BUILD: ${{ matrix.disable_sdist_build }} # ------------------------------------------------------------------------ # cabal options @@ -42,8 +42,6 @@ jobs: fail-fast: false matrix: name: - - 8.2.2 - - 8.4.4 - 8.6.5 - 8.8.4 - 8.10.7+fusion-plugin @@ -54,54 +52,57 @@ jobs: include: - - name: 8.2.2 - ghc_version: 8.2.2 - runner: ubuntu-latest - cabal_version: 3.2 - - - name: 8.4.4 - ghc_version: 8.4.4 - runner: ubuntu-latest - cabal_version: 3.2 - - name: 8.6.5 ghc_version: 8.6.5 runner: ubuntu-latest - cabal_version: 3.2 + cabal_version: 3.2.0.0 + cabal_project: cabal.project.user - name: 8.8.4 ghc_version: 8.8.4 runner: ubuntu-latest - cabal_version: 3.2 + cabal_version: 3.2.0.0 + cabal_project: cabal.project.user - name: 8.10.7+fusion-plugin ghc_version: 8.10.7 pack_options: CABAL_BUILD_OPTIONS="-f fusion-plugin" runner: ubuntu-latest - cabal_version: 3.2 + cabal_version: 3.2.0.0 + cabal_project: cabal.project.user - name: 9.0.1+werror ghc_version: 9.0.1 runner: ubuntu-latest cabal_project: cabal.project.Werror - cabal_version: 3.2 + cabal_version: 3.2.0.0 - name: hlint ghc_version: 8.8.4 pack_options: HLINT_OPTIONS="lint" HLINT_TARGETS="src" runner: ubuntu-latest - cabal_version: 3.2 + cabal_version: 3.2.0.0 + cabal_project: cabal.project.user - name: 8.10.7+macos ghc_version: 8.10.7 runner: macos-latest - cabal_version: 3.2 + cabal_version: 3.2.0.0 + cabal_project: cabal.project.user - name: 9.2.1 ghc_version: 9.2.1 runner: ubuntu-latest - cabal_version: 3.6 + cabal_version: 3.6.0.0 pack_options: DISABLE_BENCH=y + cabal_project: cabal.project.user + + - name: 9.4.2 + ghc_version: 9.4.2 + runner: ubuntu-latest + cabal_project: cabal.project.user + cabal_version: 3.8.1.0 + disable_sdist_build: "y" steps: - uses: actions/checkout@v2 diff --git a/.packcheck.ignore b/.packcheck.ignore index e1f1ec3..8f62628 100644 --- a/.packcheck.ignore +++ b/.packcheck.ignore @@ -4,5 +4,6 @@ stack.yaml appveyor.yml .gitignore cabal.project.Werror +cabal.project.user default.nix cabal.project diff --git a/appveyor.yml b/appveyor.yml index 5237d85..ca49666 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -60,7 +60,7 @@ environment: # If you have not committed packcheck.sh in your repo at PACKCHECK_LOCAL_PATH # then it is automatically pulled from this URL. PACKCHECK_GITHUB_URL: "https://raw.githubusercontent.com/composewell/packcheck" - PACKCHECK_GITHUB_COMMIT: "v0.6.0" + PACKCHECK_GITHUB_COMMIT: "35efa99b2082d13722b8a0183ac6455df98e91b9" # Override the temp directory to avoid sed escaping issues # See https://github.com/haskell/cabal/issues/5386 diff --git a/benchmark/Main.hs b/benchmark/Main.hs index 54541cc..d967d9d 100644 --- a/benchmark/Main.hs +++ b/benchmark/Main.hs @@ -9,17 +9,17 @@ module Main (main) where import Control.Monad (unless) -import Data.Semigroup (cycle1) -import Data.Word (Word8) import Data.Function ((&)) -import Streamly.Internal.Data.Array.Foreign (Array) -import Streamly.Internal.Data.Stream.IsStream.Type (fromStreamD, toStreamD) -import Streamly.Prelude (SerialT) +import Data.Word (Word8) +import Streamly.Data.Array (Array) +import Streamly.Data.Stream (Stream) import System.Directory (getCurrentDirectory, doesFileExist) import System.Environment (lookupEnv) -import qualified Streamly.Internal.Data.Stream.IsStream as Stream -import qualified Streamly.Internal.Data.Array.Foreign as Array +import qualified Streamly.Data.Array as Array +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Internal.Data.Stream as Stream (parseD) import qualified Streamly.Internal.FileSystem.File as File import qualified Streamly.Internal.LZ4 as LZ4 import qualified Streamly.Internal.LZ4.Config as LZ4 @@ -76,11 +76,11 @@ bootstrap fp = do fileExists <- doesFileExist normalizedFp unless fileExists $ do putStrLn $ "Normalizing " ++ fp - let fileStream = Stream.unfold File.read fp + let fileStream = Stream.unfold File.reader (cycle fp) combinedStream = - Stream.arraysOf _64KB + Stream.foldMany (Array.writeN _64KB) $ Stream.take _10MB - $ cycle1 fileStream + $ fileStream combinedStream & File.fromChunks normalizedFp combinedStream & LZ4.compressChunks LZ4.defaultBlockConfig 65537 & File.fromChunks compressedFpBig @@ -97,10 +97,13 @@ bootstrap fp = do headerList = magicLE ++ [flg, bd, headerChk] header = Stream.fromList headerList headerArr <- Stream.fold (Array.writeN (length headerList)) header - (bf, ff) <- Stream.parseD LZ4.simpleFrameParserD header - combinedStream & compressChunksFrame bf ff 65537 - & Stream.cons headerArr - & File.fromChunks compressedWith + x0 <- Stream.parseD LZ4.simpleFrameParserD header + case x0 of + Right (bf, ff) -> + combinedStream & compressChunksFrame bf ff 65537 + & Stream.cons headerArr + & File.fromChunks compressedWith + Left _ -> return () where @@ -112,29 +115,29 @@ bootstrap fp = do :: LZ4.BlockConfig -> LZ4.FrameConfig -> Int - -> Stream.SerialT IO (Array.Array Word8) - -> Stream.SerialT IO (Array.Array Word8) + -> Stream IO (Array.Array Word8) + -> Stream IO (Array.Array Word8) compressChunksFrame bf ff i_ strm = if LZ4.hasEndMark ff - then (fromStreamD . LZ4.compressChunksD bf i_ . toStreamD) strm + then (LZ4.compressChunksD bf i_) strm `Stream.append` Stream.fromPure endMarkArr - else (fromStreamD . LZ4.compressChunksD bf i_ . toStreamD) strm + else (LZ4.compressChunksD bf i_) strm -------------------------------------------------------------------------------- -- Benchmark helpers -------------------------------------------------------------------------------- -type Combinator = SerialT IO (Array Word8) -> SerialT IO (Array Word8) +type Combinator = Stream IO (Array Word8) -> Stream IO (Array Word8) {-# INLINE benchCorpus #-} benchCorpus :: Int -> String -> String -> Combinator -> Benchmark benchCorpus bufsize name corpus combinator = let bname = ("bufsize(" ++ show bufsize ++ ")/" ++ name ++ "/" ++ corpus) in bench bname $ nfIO $ do - Stream.unfold File.readChunksWithBufferOf (bufsize, corpus) + Stream.unfold File.chunkReaderWith (bufsize, corpus) & combinator - & Stream.drain + & Stream.fold Fold.drain -------------------------------------------------------------------------------- -- Benchmarks @@ -156,16 +159,14 @@ decompress bufsize corpus = decompressWith :: Int -> String -> Benchmark decompressWith bufsize corpus = benchCorpus bufsize "decompressWith" corpus - $ fromStreamD - . LZ4.decompressChunksWithD LZ4.simpleFrameParserD . toStreamD + $ LZ4.decompressChunksWithD LZ4.simpleFrameParserD {-# INLINE resize #-} resize :: Int -> String -> Benchmark resize bufsize corpus = benchCorpus bufsize "resize" corpus - $ fromStreamD - . LZ4.resizeChunksD LZ4.defaultBlockConfig LZ4.defaultFrameConfig - . toStreamD + $ LZ4.resizeChunksD LZ4.defaultBlockConfig LZ4.defaultFrameConfig + -------------------------------------------------------------------------------- -- Reading environment diff --git a/cabal.project.Werror b/cabal.project.Werror index 1ef081e..22ffaf9 100644 --- a/cabal.project.Werror +++ b/cabal.project.Werror @@ -2,3 +2,14 @@ packages: streamly-lz4.cabal package streamly-lz4 ghc-options: -Werror + +source-repository-package + type: git + location: https://github.com/composewell/streamly.git + tag: master + +source-repository-package + type: git + location: https://github.com/composewell/streamly.git + tag: master + subdir: core diff --git a/cabal.project.user b/cabal.project.user new file mode 100644 index 0000000..65475c4 --- /dev/null +++ b/cabal.project.user @@ -0,0 +1,14 @@ +packages: . + +source-repository-package + type: git + location: https://github.com/composewell/streamly.git + tag: master + +source-repository-package + type: git + location: https://github.com/composewell/streamly.git + tag: master + subdir: core + +jobs: 1 diff --git a/default.nix b/default.nix index c038126..a8f6309 100644 --- a/default.nix +++ b/default.nix @@ -1,91 +1,46 @@ # CAUTION! a spelling mistake in arg string is ignored silently. # -# To use ghc-8.6.5 -# nix-shell --argstr compiler "ghc865" +# To use ghc-8.10.7 +# nix-shell --argstr compiler "ghc8107" { nixpkgs ? - import (builtins.fetchTarball https://github.com/NixOS/nixpkgs/archive/refs/tags/21.05.tar.gz) + import (builtins.fetchTarball + https://github.com/NixOS/nixpkgs/archive/refs/tags/22.05.tar.gz) {} -, compiler ? "default" -, c2nix ? "" # cabal2nix CLI options -# TODO -#, sources ? [] # e.g. [./. ./benchmark] -#, hdeps ? [] # e.g. [time, mtl] -#, deps ? [] # e.g. [SDL2] +, compiler ? "ghc922" }: -let haskellPackages = - if compiler == "default" - then nixpkgs.haskellPackages - else nixpkgs.haskell.packages.${compiler}; - - # we can possibly avoid adding our package to HaskellPackages like - # in the case of nix-shell for a single package? - mkPackage = super: pkg: path: opts: inShell: - let orig = super.callCabal2nixWithOptions pkg path opts {}; - in if inShell - # Avoid copying the source directory to nix store by using - # src = null. - then orig.overrideAttrs (oldAttrs: { src = null; }) - else orig; - - flags = "--benchmark --flag fusion-plugin" + " " + c2nix; +let + utils = + let src = fetchGit { + url = "git@github.com:composewell/composewell-haskell.git"; + ref = "update-some"; + }; in (import "${src}/utils.nix") { inherit nixpkgs; }; + + + haskellPackages = + let src = fetchGit { + url = "git@github.com:composewell/composewell-haskell.git"; + ref = "update-some"; + }; in (import "${src}/haskellPackages.nix") + { inherit nixpkgs; + inherit compiler; }; mkHaskellPackages = inShell: - haskellPackages.override { - overrides = self: super: - with nixpkgs.haskell.lib; - { - streamly-lz4 = mkPackage super "streamly-lz4" ./. flags inShell; - - streamly = - nixpkgs.haskell.lib.overrideCabal - (super.callHackageDirect - { pkg = "streamly"; - ver = "0.8.2"; - sha256 = "0jhsdd71kqw0k0aszg1qb1l0wbxl1r73hsmkdgch4vlx43snlc8a"; - } {}) - (old: - { librarySystemDepends = - if builtins.currentSystem == "x86_64-darwin" - then [nixpkgs.darwin.apple_sdk.frameworks.Cocoa] - else []; - enableLibraryProfiling = false; - doHaddock = false; - }); - - unicode-data = - super.callHackageDirect - { pkg = "unicode-data"; - ver = "0.3.0"; - sha256 = "0izxxk7qgq22ammzmwc4cs4nlhzp7y55gzyas2a8bzhdpac1j7yx"; - } {}; + haskellPackages.override (old: { + overrides = + nixpkgs.lib.composeExtensions + (old.overrides or (_: _: {})) + (with nixpkgs.haskell.lib; self: super: { + streamly-lz4 = + utils.local super "streamly-lz4" ./. "--benchmark" inShell; + }); + }); - tasty-bench = - super.callHackageDirect - { pkg = "tasty-bench"; - ver = "0.3"; - sha256 = "0na1q52zr8p1zz8hby4242yjr2zma3js4n91avl7ibsa2y51vrc4"; - } {}; - }; - }; + shellDrv = mkHaskellPackages true; - drv = mkHaskellPackages true; + shell = utils.mkShell shellDrv (p: [p.streamly-lz4]) true; - shell = drv.shellFor { - packages = p: - [ p.streamly-lz4 - ]; - doBenchmark = true; - # Use a better prompt - shellHook = '' - export CABAL_DIR="$(pwd)/.cabal.nix" - if test -n "$PS_SHELL" - then - export PS1="$PS_SHELL\[$bldred\](nix)\[$txtrst\] " - fi - ''; - }; in if nixpkgs.lib.inNixShell then shell else (mkHaskellPackages false).streamly-lz4 diff --git a/src/Streamly/Internal/LZ4.hs b/src/Streamly/Internal/LZ4.hs index 43773b5..11c87cc 100644 --- a/src/Streamly/Internal/LZ4.hs +++ b/src/Streamly/Internal/LZ4.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE NamedFieldPuns, TypeApplications #-} -- | -- Module : Streamly.Internal.LZ4 -- Copyright : (c) 2020 Composewell Technologies @@ -40,7 +40,6 @@ where import Control.Monad (when) import Control.Monad.Catch (MonadThrow) import Control.Monad.IO.Class (MonadIO(..)) -import Data.Bifunctor (second) import Data.Bits (Bits(..)) import Data.Coerce (coerce) import Data.Int (Int32) @@ -51,15 +50,27 @@ import Foreign.Storable (peek, poke) import Fusion.Plugin.Types (Fuse (..)) import System.IO.Unsafe (unsafePerformIO) -import qualified Streamly.Internal.Data.Array.Foreign as Array -import qualified Streamly.Internal.Data.Array.Foreign.Type as Array -import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray +import qualified Streamly.Data.Array as Array +import qualified Streamly.Data.MutArray as MArray +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Internal.Data.Unboxed as Unbox + +import qualified Streamly.Internal.Data.Fold.Chunked as ArrayFold + ( fromParserD ) +import qualified Streamly.Internal.Data.Stream.Chunked as ArrayStream + ( runArrayFoldBreak ) +import qualified Streamly.Internal.Data.Array as Array + ( asPtrUnsafe, castUnsafe, unsafeFreeze, unsafeThaw ) +import qualified Streamly.Internal.Data.Array.Mut.Type as MArray + ( MutArray(..), asPtrUnsafe, rightSize, touch ) +import qualified Streamly.Internal.Data.Array.Type as Array + ( Array(..), byteLength, splice ) import qualified Streamly.Internal.Data.Parser.ParserD as Parser -import qualified Streamly.Internal.Data.Fold as Fold -import qualified Streamly.Internal.Data.Stream.StreamD as Stream -import qualified Streamly.Internal.Data.Stream.IsStream as IsStream -import qualified Streamly.Internal.Data.Array.Stream.Foreign as ArrayStream -import qualified Streamly.Internal.Data.Array.Stream.Fold.Foreign as ArrayFold + ( Parser, ParseError(..), die, fromPure, satisfy, takeEQ ) +import qualified Streamly.Internal.Data.Stream as Stream + ( concatMapM, fromPure, fromStreamK, toStreamK ) +import qualified Streamly.Internal.Data.Stream.StreamD as StreamD + ( Step(..), Stream(..)) import Streamly.Internal.LZ4.Config @@ -230,7 +241,7 @@ compressChunk :: -> Array.Array Word8 -> IO (Array.Array Word8) compressChunk cfg speed ctx arr = do - Array.asPtrUnsafe (Array.unsafeCast arr) + Array.asPtrUnsafe (Array.castUnsafe arr) $ \src -> do let uncompLen = Array.byteLength arr speedC = unsafeIntToCInt speed @@ -247,8 +258,10 @@ compressChunk cfg speed ctx arr = do $ error $ "compressChunk: compressed length <= 0." ++ " maxCompLenC: " ++ show maxCompLenC ++ " uncompLenC: " ++ show uncompLenC - (MArray.Array cont dstBegin_ dstBegin dstMax) <- - MArray.newArray (maxCompLen + metaSize_) + newarr@(MArray.MutArray cont arrStart arrEnd arrBound) <- + MArray.newPinned @IO @Word8 (maxCompLen + metaSize_) + dstBegin_ <- MArray.asPtrUnsafe newarr return + let dstBegin = dstBegin_ `plusPtr` arrEnd let hdrCompLen = dstBegin `plusPtr` compSizeOffset_ compData = dstBegin `plusPtr` dataOffset_ compLenC <- @@ -261,8 +274,8 @@ compressChunk cfg speed ctx arr = do setUncompSize_ dstBegin (cIntToI32 uncompLenC) poke hdrCompLen (toLittleEndian (cIntToI32 compLenC)) let compLen = cIntToInt compLenC - dstEnd = dstBegin `plusPtr` (compLen + metaSize_) - compArr = MArray.Array cont dstBegin_ dstEnd dstMax + dstEnd = arrStart + compLen + metaSize_ + compArr = MArray.MutArray cont arrStart dstEnd arrBound -- It is safe to shrink here as we need to hold the last 64KB of -- the previous uncompressed array and not the compressed one. Array.unsafeFreeze <$> MArray.rightSize compArr @@ -294,7 +307,7 @@ decompressChunk :: -> Array.Array Word8 -> IO (Array.Array Word8) decompressChunk cfg ctx arr = do - Array.asPtrUnsafe (Array.unsafeCast arr) + Array.asPtrUnsafe (Array.castUnsafe arr) $ \src -> do let hdrCompLen :: Ptr Int32 = src `plusPtr` compSizeOffset cfg compData = src `plusPtr` dataOffset cfg @@ -317,8 +330,10 @@ decompressChunk cfg ctx arr = do error $ "decompressChunk: compressed data length is more " ++ "than the max limit: " ++ show maxCompLenC - (MArray.Array cont dstBegin_ dstBegin dstMax) - <- MArray.newArray uncompLen + newarr@(MArray.MutArray cont arrStart arrEnd arrBound) + <- MArray.newPinned @IO @Word8 uncompLen + dstBegin_ <- MArray.asPtrUnsafe newarr return + let dstBegin = dstBegin_ `plusPtr` arrEnd decompLenC <- c_decompressSafeContinue ctx compData dstBegin compLenC uncompLenC @@ -329,8 +344,8 @@ decompressChunk cfg ctx arr = do ++ "\nuncompLenC = " ++ show uncompLenC ++ "\ndecompLenC = " ++ show decompLenC let decompLen = cIntToInt decompLenC - dstEnd = dstBegin `plusPtr` decompLen - decompArr = MArray.Array cont dstBegin_ dstEnd dstMax + dstEnd = arrStart + decompLen + decompArr = MArray.MutArray cont arrStart dstEnd arrBound -- We cannot shrink the array here, because that would reallocate -- the array invalidating the cached dictionary. return $ Array.unsafeFreeze decompArr @@ -354,10 +369,10 @@ compressChunksD :: MonadIO m => BlockConfig -> Int - -> Stream.Stream m (Array.Array Word8) - -> Stream.Stream m (Array.Array Word8) -compressChunksD cfg speed0 (Stream.Stream step0 state0) = - Stream.Stream step (CompressInit state0) + -> StreamD.Stream m (Array.Array Word8) + -> StreamD.Stream m (Array.Array Word8) +compressChunksD cfg speed0 (StreamD.Stream step0 state0) = + StreamD.Stream step (CompressInit state0) where @@ -373,11 +388,11 @@ compressChunksD cfg speed0 (Stream.Stream step0 state0) = -- if the chunk size is bigger we would be holding a lot more -- data than required. Also, the perf advantage does not seem -- much. - return $ Stream.Skip $ CompressDo st ctx Nothing + return $ StreamD.Skip $ CompressDo st ctx Nothing step gst (CompressDo st ctx prev) = do r <- step0 gst st case r of - Stream.Yield arr st1 -> + StreamD.Yield arr st1 -> -- The compression primitives use 32-bit signed int (CInt) to -- represent the length of the array. The maximum value of a -- 32-bit signed int is 2GB. @@ -386,12 +401,12 @@ compressChunksD cfg speed0 (Stream.Stream step0 state0) = else do arr1 <- liftIO $ compressChunk cfg speed ctx arr -- XXX touch the "prev" array to keep it alive? - return $ Stream.Yield arr1 (CompressDo st1 ctx (Just arr)) - Stream.Skip st1 -> - return $ Stream.Skip $ CompressDo st1 ctx prev - Stream.Stop -> return $ Stream.Skip $ CompressDone ctx + return $ StreamD.Yield arr1 (CompressDo st1 ctx (Just arr)) + StreamD.Skip st1 -> + return $ StreamD.Skip $ CompressDo st1 ctx prev + StreamD.Stop -> return $ StreamD.Skip $ CompressDone ctx step _ (CompressDone ctx) = - liftIO $ c_freeStream ctx >> return Stream.Stop + liftIO $ c_freeStream ctx >> return StreamD.Stop -------------------------------------------------------------------------------- -- Stream decompression @@ -433,10 +448,10 @@ resizeChunksD :: MonadIO m => BlockConfig -> FrameConfig - -> Stream.Stream m (Array.Array Word8) - -> Stream.Stream m (Array.Array Word8) -resizeChunksD cfg conf (Stream.Stream step0 state0) = - Stream.Stream step (RInit state0) + -> StreamD.Stream m (Array.Array Word8) + -> StreamD.Stream m (Array.Array Word8) +resizeChunksD cfg conf (StreamD.Stream step0 state0) = + StreamD.Stream step (RInit state0) where @@ -449,9 +464,9 @@ resizeChunksD cfg conf (Stream.Stream step0 state0) = -- Unsafe function {-# INLINE isEndMark #-} - isEndMark src + isEndMark arrContents i | hasEndMark_ = do - em <- peek (castPtr src :: Ptr Int32) + em <- Unbox.peekWith arrContents i return $ em == endMark | otherwise = return False @@ -459,50 +474,53 @@ resizeChunksD cfg conf (Stream.Stream step0 state0) = process st arr@(Array.Array cont b e) = do let len = Array.byteLength arr if len < 4 - then return $ Stream.Skip $ RAccumulate st arr + then return $ StreamD.Skip $ RAccumulate st arr else do - res <- isEndMark b + res <- isEndMark (MArray.arrContents $ Array.unsafeThaw arr) b if res - then return $ Stream.Skip $ RFooter st arr + then return $ StreamD.Skip $ RFooter st arr else do if len <= metaSize_ - then return $ Stream.Skip $ RAccumulate st arr + then return $ StreamD.Skip $ RAccumulate st arr else do - let compLenPtr = castPtr (b `plusPtr` compSizeOffset_) + let compLenPtr = b + compSizeOffset_ compressedSize <- - i32ToInt . fromLittleEndian <$> peek compLenPtr + i32ToInt . fromLittleEndian <$> + Unbox.peekWith + (MArray.arrContents $ Array.unsafeThaw arr) + compLenPtr let required = compressedSize + metaSize_ if len == required - then return $ Stream.Skip $ RYield arr $ RInit st + then return $ StreamD.Skip $ RYield arr $ RInit st else if len < required - then return $ Stream.Skip $ RAccumulate st arr + then return $ StreamD.Skip $ RAccumulate st arr else do - let arr1E = b `plusPtr` required + let arr1E = b + required arr1 = Array.Array cont b arr1E arr2 = Array.Array cont arr1E e MArray.touch cont - return $ Stream.Skip $ RYield arr1 $ RProcess st arr2 + return $ StreamD.Skip $ RYield arr1 $ RProcess st arr2 {-# INLINE_LATE step #-} - step _ (RYield r next) = return $ Stream.Yield r next + step _ (RYield r next) = return $ StreamD.Yield r next step gst (RInit st) = do r <- step0 gst st case r of - Stream.Yield arr st1 -> liftIO $ process st1 arr - Stream.Skip st1 -> return $ Stream.Skip $ RInit st1 - Stream.Stop -> + StreamD.Yield arr st1 -> liftIO $ process st1 arr + StreamD.Skip st1 -> return $ StreamD.Skip $ RInit st1 + StreamD.Stop -> if hasEndMark_ then error "resizeChunksD: No end mark found" - else return Stream.Stop + else return StreamD.Stop step _ (RProcess st arr) = liftIO $ process st arr step gst (RAccumulate st buf) = do r <- step0 gst st case r of - Stream.Yield arr st1 -> do + StreamD.Yield arr st1 -> do arr1 <- Array.splice buf arr liftIO $ process st1 arr1 - Stream.Skip st1 -> return $ Stream.Skip $ RAccumulate st1 buf - Stream.Stop -> error "resizeChunksD: Incomplete block" + StreamD.Skip st1 -> return $ StreamD.Skip $ RAccumulate st1 buf + StreamD.Stop -> error "resizeChunksD: Incomplete block" step gst (RFooter st buf) = do -- Warn if len > footerSize let len = Array.byteLength buf @@ -510,17 +528,17 @@ resizeChunksD cfg conf (Stream.Stream step0 state0) = then do r <- step0 gst st case r of - Stream.Yield arr st1 -> do + StreamD.Yield arr st1 -> do arr1 <- Array.splice buf arr - return $ Stream.Skip $ RFooter st1 arr1 - Stream.Skip st1 -> return $ Stream.Skip $ RFooter st1 buf - Stream.Stop -> error "resizeChunksD: Incomplete footer" + return $ StreamD.Skip $ RFooter st1 arr1 + StreamD.Skip st1 -> return $ StreamD.Skip $ RFooter st1 buf + StreamD.Stop -> error "resizeChunksD: Incomplete footer" else do res <- liftIO $ validateFooter_ buf if res - then return Stream.Stop + then return StreamD.Stop else error "resizeChunksD: Invalid footer" - step _ RDone = return Stream.Stop + step _ RDone = return StreamD.Stop {-# ANN type DecompressState Fuse #-} data DecompressState st ctx prev @@ -539,10 +557,10 @@ data DecompressState st ctx prev decompressChunksRawD :: MonadIO m => BlockConfig - -> Stream.Stream m (Array.Array Word8) - -> Stream.Stream m (Array.Array Word8) -decompressChunksRawD cfg (Stream.Stream step0 state0) = - Stream.Stream step (DecompressInit state0) + -> StreamD.Stream m (Array.Array Word8) + -> StreamD.Stream m (Array.Array Word8) +decompressChunksRawD cfg (StreamD.Stream step0 state0) = + StreamD.Stream step (DecompressInit state0) where @@ -551,30 +569,42 @@ decompressChunksRawD cfg (Stream.Stream step0 state0) = liftIO $ do lz4Ctx <- c_createStreamDecode - return $ Stream.Skip $ DecompressDo st lz4Ctx Nothing + return $ StreamD.Skip $ DecompressDo st lz4Ctx Nothing step _ (DecompressDone lz4Ctx) = - liftIO $ c_freeStreamDecode lz4Ctx >> return Stream.Stop + liftIO $ c_freeStreamDecode lz4Ctx >> return StreamD.Stop step gst (DecompressDo st lz4Ctx prev) = do r <- step0 gst st case r of - Stream.Yield arr st1 -> do + StreamD.Yield arr st1 -> do arr1 <- liftIO $ decompressChunk cfg lz4Ctx arr -- Instead of the input array chunk we need to hold the output -- array chunk here. - return $ Stream.Yield arr1 (DecompressDo st1 lz4Ctx (Just arr1)) - Stream.Skip st1 -> - return $ Stream.Skip $ DecompressDo st1 lz4Ctx prev - Stream.Stop -> return $ Stream.Skip $ DecompressDone lz4Ctx + return $ StreamD.Yield arr1 (DecompressDo st1 lz4Ctx (Just arr1)) + StreamD.Skip st1 -> + return $ StreamD.Skip $ DecompressDo st1 lz4Ctx prev + StreamD.Stop -> return $ StreamD.Skip $ DecompressDone lz4Ctx decompressChunksWithD :: (MonadThrow m, MonadIO m) - => Parser.Parser m Word8 (BlockConfig, FrameConfig) - -> Stream.Stream m (Array.Array Word8) - -> Stream.Stream m (Array.Array Word8) -decompressChunksWithD p s = do - ((cfg, config), next) <- Stream.fromEffect $ second IsStream.toStreamD - <$> ArrayStream.foldArr_ (ArrayFold.fromParserD p) (IsStream.fromStreamD s) - decompressChunksRawD cfg (resizeChunksD cfg config next) + => Parser.Parser Word8 m (BlockConfig, FrameConfig) + -> StreamD.Stream m (Array.Array Word8) + -> StreamD.Stream m (Array.Array Word8) +decompressChunksWithD p s = + Stream.concatMapM (\() -> generator) (Stream.fromPure ()) + + where + + generator = do + (res, next) <- + ArrayStream.runArrayFoldBreak + (ArrayFold.fromParserD p) + (Stream.toStreamK s) + case res of + Left (Parser.ParseError err) -> error $ "parser error" ++ err + Right (cfg, config) -> return $ + decompressChunksRawD + cfg + (resizeChunksD cfg config (Stream.fromStreamK next)) -- XXX Merge this with BlockConfig? data FLG = @@ -589,7 +619,7 @@ data FLG = -- XXX Support Skippable frames simpleFrameParserD :: (Monad m, MonadThrow m) - => Parser.Parser m Word8 (BlockConfig, FrameConfig) + => Parser.Parser Word8 m (BlockConfig, FrameConfig) simpleFrameParserD = do _ <- assertMagic _flg <- parseFLG diff --git a/src/Streamly/LZ4.hs b/src/Streamly/LZ4.hs index 9fba797..5cd4e54 100644 --- a/src/Streamly/LZ4.hs +++ b/src/Streamly/LZ4.hs @@ -54,10 +54,8 @@ where import Control.Monad.IO.Class (MonadIO) import Data.Word (Word8) -import Streamly.Internal.Data.Array.Foreign (Array) -import Streamly.Prelude (SerialT) -import Streamly.Internal.Data.Stream.IsStream.Type (fromStreamD, toStreamD) - +import Streamly.Data.Array (Array) +import Streamly.Data.Stream (Stream) import Streamly.Internal.LZ4.Config import Streamly.Internal.LZ4 @@ -95,9 +93,9 @@ compressChunks :: MonadIO m => BlockConfig -> Int - -> SerialT m (Array Word8) - -> SerialT m (Array Word8) -compressChunks cfg i m = fromStreamD (compressChunksD cfg i (toStreamD m)) + -> Stream m (Array Word8) + -> Stream m (Array Word8) +compressChunks cfg i m = (compressChunksD cfg i (m)) -------------------------------------------------------------------------------- -- Decompression @@ -114,9 +112,8 @@ compressChunks cfg i m = fromStreamD (compressChunksD cfg i (toStreamD m)) decompressChunks :: MonadIO m => BlockConfig - -> SerialT m (Array Word8) - -> SerialT m (Array Word8) + -> Stream m (Array Word8) + -> Stream m (Array Word8) decompressChunks bf = - fromStreamD - . decompressChunksRawD bf - . resizeChunksD bf defaultFrameConfig . toStreamD + decompressChunksRawD bf + . resizeChunksD bf defaultFrameConfig diff --git a/stack.yaml b/stack.yaml index 31575be..5940041 100644 --- a/stack.yaml +++ b/stack.yaml @@ -3,7 +3,15 @@ packages: resolver: lts-18.18 extra-deps: - unicode-data-0.3.0 -- streamly-0.8.2 +- lockfree-queue-0.2.4 + +- github: composewell/streamly + commit: master + +- github: composewell/streamly + commit: master + subdirs: + - core # Look at https://stackoverflow.com/questions/70045586/could-not-find-module-system-console-mintty-win32-when-compiling-test-framework flags: diff --git a/streamly-lz4.cabal b/streamly-lz4.cabal index e4d76e4..be93eed 100644 --- a/streamly-lz4.cabal +++ b/streamly-lz4.cabal @@ -1,6 +1,6 @@ cabal-version: 2.2 name: streamly-lz4 -version: 0.1.2 +version: 0.2.0 synopsis: Streamly combinators for LZ4 compression description: Compress and decompress streams of data using LZ4 compression. See @@ -14,13 +14,12 @@ maintainer: streamly@composewell.com copyright: 2020 Composewell Technologies category: Codec, Compression, Streamly build-type: Simple -tested-with: GHC==8.2.2 - , GHC==8.4.4 - , GHC==8.6.5 +tested-with: GHC==8.6.5 , GHC==8.8.4 , GHC==8.10.7 , GHC==9.0.1 , GHC==9.2.1 + , GHC==9.4.2 extra-source-files: CHANGELOG.md , NOTICE , README.md @@ -59,7 +58,8 @@ library build-depends: base >= 4 && < 5 , fusion-plugin-types >= 0.1 && < 0.2 , exceptions >= 0.8 && < 0.11 - , streamly == 0.8.2.* + , streamly == 0.9.0.* + , streamly-core == 0.1.0.* ghc-options: -Wall default-language: Haskell2010 default-extensions: @@ -77,10 +77,11 @@ test-suite test-lz4 hs-source-dirs: test main-is: Main.hs build-depends: streamly-lz4 - , streamly == 0.8.2.* + , streamly == 0.9.0.* + , streamly-core == 0.1.0.* , base >= 4 && < 5 , QuickCheck >= 2.13.1 && < 2.15 - , hspec >= 2.7 && < 2.9 + , hspec >= 2.7 && < 2.11 , temporary >= 1.3 && < 1.4 ghc-options: -fno-ignore-asserts -Wall -rtsopts +RTS -M200M -RTS default-language: Haskell2010 @@ -92,10 +93,11 @@ benchmark bench-lz4 hs-source-dirs: benchmark main-is: Main.hs build-depends: streamly-lz4 - , streamly == 0.8.2.* + , streamly == 0.9.0.* + , streamly-core == 0.1.0.* , base >= 4 && < 5 , gauge >= 0.2.5 && < 0.2.6 - , directory >= 1.3.0 && < 1.3.8 + , directory >= 1.3 && < 1.4 ghc-options: -Wall default-language: Haskell2010 default-extensions: TypeInType diff --git a/test/Main.hs b/test/Main.hs index 06f7261..cbed65c 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -12,7 +12,6 @@ import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO(..)) import Data.Word (Word8) import Data.Function ((&)) -import Streamly.Internal.Data.Stream.IsStream.Type (fromStreamD, toStreamD) import System.IO (IOMode(..), openFile, hClose) import System.IO.Temp (withSystemTempFile) import Test.Hspec (describe, hspec, it, shouldBe) @@ -21,9 +20,12 @@ import Test.QuickCheck.Gen ( Gen, choose, elements, frequency, generate, listOf, vectorOf ) import Test.QuickCheck.Monadic (monadicIO) -import qualified Streamly.Internal.Data.Array.Foreign.Type as Array -import qualified Streamly.Internal.Data.Stream.IsStream as Stream -import qualified Streamly.Internal.FileSystem.Handle as Handle +import qualified Streamly.Data.Array as Array +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.FileSystem.Handle as Handle +import qualified Streamly.Internal.Data.Stream as Stream (parseD) +import qualified Streamly.Internal.FileSystem.Handle as Handle (putChunks) import Streamly.Internal.LZ4.Config import Streamly.Internal.LZ4 @@ -81,12 +83,12 @@ decompressResizedcompress :: BlockConfig -> Int -> [Array.Array Word8] -> IO () decompressResizedcompress conf i lst = let strm = Stream.fromList lst in do lst1 <- - Stream.toList $ decompressChunksRaw $ compressChunks conf i strm + Stream.fold Fold.toList $ decompressChunksRaw $ compressChunks conf i strm lst `shouldBe` lst1 where - decompressChunksRaw = fromStreamD . decompressChunksRawD conf . toStreamD + decompressChunksRaw = decompressChunksRawD conf decompressCompress :: BlockConfig -> Int -> Int -> [Array.Array Word8] -> IO () decompressCompress conf bufsize i lst = do @@ -95,10 +97,10 @@ decompressCompress conf bufsize i lst = do compressChunks conf i strm & Handle.putChunks tmpH hClose tmpH lst1 <- - Stream.toList - $ Stream.bracket_ (openFile tmp ReadMode) hClose + Stream.fold Fold.toList + $ Stream.bracketIO (openFile tmp ReadMode) hClose $ \h -> - Stream.unfold Handle.readChunksWithBufferOf (bufsize, h) + Stream.unfold Handle.chunkReaderWith (bufsize, h) & decompressChunks conf lst1 `shouldBe` lst @@ -114,29 +116,29 @@ decompressCompressFrame bf ff bufsize i lst = do compressChunksFrame strm & Handle.putChunks tmpH hClose tmpH lst1 <- - Stream.toList - $ Stream.bracket_ (openFile tmp ReadMode) hClose + Stream.fold Fold.toList + $ Stream.bracketIO (openFile tmp ReadMode) hClose $ \h -> - Stream.unfold Handle.readChunksWithBufferOf (bufsize, h) + Stream.unfold Handle.chunkReaderWith (bufsize, h) & decompressChunksFrame lst1 `shouldBe` lst where decompressChunksFrame - :: Stream.SerialT IO (Array.Array Word8) - -> Stream.SerialT IO (Array.Array Word8) + :: Stream.Stream IO (Array.Array Word8) + -> Stream.Stream IO (Array.Array Word8) decompressChunksFrame = - fromStreamD . decompressChunksRawD bf . resizeChunksD bf ff . toStreamD + decompressChunksRawD bf . resizeChunksD bf ff compressChunksFrame - :: Stream.SerialT IO (Array.Array Word8) - -> Stream.SerialT IO (Array.Array Word8) + :: Stream.Stream IO (Array.Array Word8) + -> Stream.Stream IO (Array.Array Word8) compressChunksFrame strm = if hasEndMark ff - then (fromStreamD . compressChunksD bf i . toStreamD) strm + then (compressChunksD bf i ) strm `Stream.append` Stream.fromPure endMarkArr - else (fromStreamD . compressChunksD bf i . toStreamD) strm + else (compressChunksD bf i ) strm decompressWithCompress :: Int -> Int -> [Array.Array Word8] -> IO () decompressWithCompress bufsize i lst = do @@ -151,20 +153,23 @@ decompressWithCompress bufsize i lst = do headerList = magicLE ++ [flg, bd, headerChk] header = Stream.fromList headerList headerArr <- Stream.fold (Array.writeN (length headerList)) header - (bf, ff) <- Stream.parseD simpleFrameParserD header + x0 <- Stream.parseD simpleFrameParserD header let strm = Stream.fromList lst - withSystemTempFile "LZ4" $ \tmp tmpH -> do - compressChunksFrame bf ff i strm - & Stream.cons headerArr - & Handle.putChunks tmpH - hClose tmpH - lst1 <- - Stream.toList - $ Stream.bracket_ (openFile tmp ReadMode) hClose - $ \h -> - Stream.unfold Handle.readChunksWithBufferOf (bufsize, h) - & decompressWith_ - lst1 `shouldBe` lst + case x0 of + Right (bf, ff) -> + withSystemTempFile "LZ4" $ \tmp tmpH -> do + compressChunksFrame bf ff i strm + & Stream.cons headerArr + & Handle.putChunks tmpH + hClose tmpH + lst1 <- + Stream.fold Fold.toList + $ Stream.bracketIO (openFile tmp ReadMode) hClose + $ \h -> + Stream.unfold Handle.chunkReaderWith (bufsize, h) + & decompressWith_ + lst1 `shouldBe` lst + Left _ -> return () where @@ -172,33 +177,33 @@ decompressWithCompress bufsize i lst = do :: BlockConfig -> FrameConfig -> Int - -> Stream.SerialT IO (Array.Array Word8) - -> Stream.SerialT IO (Array.Array Word8) + -> Stream.Stream IO (Array.Array Word8) + -> Stream.Stream IO (Array.Array Word8) compressChunksFrame bf ff i_ strm = if hasEndMark ff - then (fromStreamD . compressChunksD bf i_ . toStreamD) strm + then (compressChunksD bf i_ ) strm `Stream.append` Stream.fromPure endMarkArr - else (fromStreamD . compressChunksD bf i_ . toStreamD) strm + else (compressChunksD bf i_ ) strm decompressWith_ :: - Stream.SerialT IO (Array.Array Word8) - -> Stream.SerialT IO (Array.Array Word8) + Stream.Stream IO (Array.Array Word8) + -> Stream.Stream IO (Array.Array Word8) decompressWith_ = - fromStreamD . decompressChunksWithD simpleFrameParserD . toStreamD + decompressChunksWithD simpleFrameParserD resizeIdempotence :: BlockConfig -> Property resizeIdempotence conf = forAll ((,) <$> genAcceleration <*> genArrayW8List) $ \(acc, w8List) -> do let strm = compressChunks conf acc $ Stream.fromList w8List - f1 <- Stream.toList $ resizeChunks strm - f2 <- Stream.toList $ foldr ($) strm $ replicate acc resizeChunks + f1 <- Stream.fold Fold.toList $ resizeChunks strm + f2 <- Stream.fold Fold.toList $ foldr ($) strm $ replicate acc resizeChunks f1 `shouldBe` f2 where resizeChunks = - fromStreamD . resizeChunksD conf defaultFrameConfig . toStreamD + resizeChunksD conf defaultFrameConfig main :: IO () main = do