[email protected]
[Top] [All Lists]

[Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files

Subject: [Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files
From: Maciej Piechotka
Date: Sun, 03 Jan 2010 17:34:52 +0100
I have following problem: I'd like to operate on big files so I'd
prefere to operate on 'stream' instead of whole file at a time to avoid
keeping too much in memory. I need to calculate MD5 and compress file. 

I tried to use something like that but I'm afraid that I'd need to patch
zlib package as it results in deadlock:

> {-# LANGUAGE GADTs #-}
> import Codec.Compression.GZip
> import Control.Applicative
> import Control.Concurrent.CHP
> import qualified Control.Concurrent.CHP.Common as CHP
> import Control.Concurrent.CHP.Enroll
> import Control.Concurrent.CHP.Utils
> import Control.Monad.State.Strict
> import Data.Digest.Pure.MD5
> import Data.Maybe
> import qualified Data.ByteString.Char8 as BS
> import qualified Data.ByteString.Lazy.Char8 as LBS
> import qualified Data.ByteString.Lazy.Internal as LBS
> import System.Environment
> import System.IO
> import System.IO.Unsafe
> 
> 
> calculateMD5 :: (ReadableChannel r,
>                  Poisonable (r (Maybe BS.ByteString)),
>                  WriteableChannel w,
>                  Poisonable (w MD5Digest))
>              => r (Maybe BS.ByteString)
>              -> w MD5Digest
>              -> CHP ()
> calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext
>                        `onPoisonRethrow` (poison in_ >> poison out)
>                        where loop = liftCHP (readChannel in_) >>= 
>                                     calc'
>                              calc' Nothing  = gets md5Finalize >>=
>                                               liftCHP .
>                                               writeChannel out >>
>                                               put md5InitialContext
>                              calc' (Just b) = modify (flip md5Update 
>                                                 $ LBS.fromChunks [b])

Calculate MD5 hash of input stream. Nothing indicates EOF.

> unsafeInterleaveCHP :: CHP a -> CHP a
> unsafeInterleaveCHP = fromJust <.> liftIO <=<
>                       unsafeInterleaveIO <.> embedCHP

Helper function. It is suppose to move the execution in time - just as
unsafeInterleaveIO. I belive that the main problem lives here.

Especially that Maybe.fromJust: Nothing is the error.

> chan2List :: (ReadableChannel r, Poisonable (r a))
>           => r a -> CHP [a]
> chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_)
>                                                  (chan2List in_))
>                                      `onPoisonTrap` return [])

Changes channel to lazy read list.

> chanMaybe2List :: (ReadableChannel r,
>                    Poisonable (r (Maybe a)))
>                => r (Maybe a)
>                -> CHP [[a]]
> chanMaybe2List in_ = splitByMaybe <$> chan2List
>                      where splitByMaybe [] = []
>                            splitByMaybe (Nothing:xs) =
>                              []:splitByMaybe xs
>                            splitByMaybe (Just v :[]) = [[v]]
>                            splitByMaybe (Just v :xs) =
>                              let (y:ys) = splitByMaybe xs
>                              in (v:y):ys

Reads lazyly from channel o list of list

> compressCHP :: (ReadableChannel r,
>                 Poisonable (r (Maybe BS.ByteString)),
>                 WriteableChannel w,
>                 Poisonable (w (Maybe BS.ByteString)))
>             => r (Maybe BS.ByteString)
>             -> w (Maybe BS.ByteString)
>             -> CHP ()
> compressCHP in_ out = toOut >>= mapM_ sendBS
>                       where in_' :: CHP [LBS.ByteString]
>                             in_' = fmap LBS.fromChunks <$> 
>                                    chanMaybe2List in_
>                             toOut :: CHP [LBS.ByteString]
>                             toOut = fmap compress <$> in_'
>                             sendBS :: LBS.ByteString -> CHP ()
>                             sendBS LBS.Empty       = writeChannel out
>                                                               Nothing
>                             sendBS (LBS.Chunk c r) = writeChannel out
>                                                              (Just c) 
>                                                      >> sendBS r

Compress process 

> readFromFile :: (ReadableChannel r,
>                  Poisonable (r String),
>                  WriteableChannel w,
>                  Poisonable (w (Maybe BS.ByteString)))
>              => r String
>              -> w (Maybe BS.ByteString)
>              -> CHP ()
> readFromFile file data_ =
>   forever (do path <- readChannel file
>               hnd <- liftIO $ openFile path ReadMode
>               let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) >>=
>                          writeChannel data_ . Just
>               copy `onPoisonRethrow` liftIO (hClose hnd)
>               writeChannel data_ Nothing
>               liftIO $ hClose hnd)
>   `onPoisonRethrow` (poison file >> poison data_)

Process reading from file

> writeToFile :: (ReadableChannel r,
>                 Poisonable (r String),
>                 ReadableChannel r',
>                 Poisonable (r' (Maybe BS.ByteString)))
>             => r String
>             -> r' (Maybe BS.ByteString)
>             -> CHP ()
> writeToFile file data_ =
>   forever (do path <- readChannel file
>               hnd <- liftIO $ openFile path WriteMode
>               let writeUntilNothing = readChannel data_ >>= 
>                                       writeUntilNothing'
>                   writeUntilNothing' Nothing  = return ()
>                   writeUntilNothing' (Just v) = liftIO (BS.hPutStr
>                                                             hnd v) >>
>                                                 writeUntilNothing
>               writeUntilNothing `onPoisonFinally` liftIO (hClose hnd))
>   `onPoisonRethrow` (poison file >> poison data_)

Process writing to file
 
> getFiles :: (WriteableChannel w, Poisonable (w String))
>          => w String -> CHP ()
> getFiles out = mapM_ (writeChannel out) ["test1", "test2"] >>
>                poison (out)

Sample files. Each contains "Test1\n"

> pipeline1 :: CHP ()
> pipeline1 = do md5sum <- oneToOneChannel' $ chanLabel "MD5"
>                runParallel_ [(getFiles ->|^
>                               ("File", readFromFile) ->|^
>                               ("Data", calculateMD5))
>                              (writer md5sum),
>                              forever $ readChannel (reader md5sum) >>=
>                                        liftIO . print]

First pipeline. Output:
fa029a7f2a3ca5a03fe682d3b77c7f0d
fa029a7f2a3ca5a03fe682d3b77c7f0d
< File."test1", Data.Just "Test1\n", Data.Nothing,
MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File."test2", Data.Just "Test1\n",
Data.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d >

> pipeline2 :: CHP ()
> pipeline2 = enrolling $ do
>   file <- oneToManyChannel' $ chanLabel "File"
>   fileMD5 <- oneToOneChannel' $ chanLabel "File MD5"
>   data_ <- oneToOneChannel' $ chanLabel "Data"
>   md5 <- oneToOneChannel' $ chanLabel "MD5"
>   md5BS <- oneToOneChannel' $ chanLabel "MD5 ByteString"
>   fileMD5' <- Enroll (reader file)
>   fileData <- Enroll (reader file)
>   liftCHP $ runParallel_ [getFiles (writer file),
>                           (forever $ readChannel fileMD5' >>=
>                                      writeChannel (writer fileMD5) . 
>                                      (++".md5"))
>                           `onPoisonRethrow`
>                           (poison fileMD5' >>
>                            poison (writer fileMD5)),
>                           readFromFile fileData (writer data_),
>                           calculateMD5 (reader data_) (writer md5),
>                           (forever $ do v <- readChannel (reader md5)
>                                         let v' = Just $ BS.pack $ show
v
>                                         writeChannel (writer md5BS) v'
>                                         writeChannel (writer md5BS) 
>                                                               Nothing)
>                           `onPoisonRethrow`
>                           (poison (writer md5BS) >>
>                            poison (reader md5)),
>                           writeToFile (reader fileMD5) (reader md5BS)]

Correct pipeline (testing EnrollingT):
< _b4, File MD5."test1.md5", Data.Just "Test1\n", Data.Nothing,
MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, _b4, MD5 ByteString.Just
"fa029a7f2a3ca5a03fe682d3b77c7f0d", Data.Just "Test1\n", Data.Nothing,
MD5 ByteString.Nothing, MD5.fa029a7f2a3ca5a03fe682d3b77c7f0d, File
MD5."test2.md5", MD5 ByteString.Just "fa029a7f2a3ca5a03fe682d3b77c7f0d",
MD5 ByteString.Nothing >
% cat test1.md5 
fa029a7f2a3ca5a03fe682d3b77c7f0d%


> pipeline3 :: CHP ()
> pipeline3 = enrolling $ do
>   file <- oneToManyChannel' $ chanLabel "File"
>   fileGZ <- oneToOneChannel' $ chanLabel "File GZ"
>   data_ <- oneToManyChannel' $ chanLabel "Data"
>   compressed <- oneToManyChannel' $ chanLabel "Data Compressed"
>   md5 <- oneToOneChannel' $ chanLabel "MD5"
>   md5Compressed <- oneToOneChannel' $ chanLabel "MD5 Compressed"
>   fileGZ' <- Enroll (reader file)
>   fileData <- Enroll (reader file)
>   dataMD5 <- Enroll (reader data_)
>   dataCompress <- Enroll (reader data_)
>   compressedFile <- Enroll (reader compressed)
>   compressedMD5 <- Enroll (reader compressed)
>   liftCHP $ runParallel_ [getFiles (writer file),
>                           (forever $ readChannel fileGZ' >>=
>                                      writeChannel (writer fileGZ) . 
>                                      (++".gz"))
>                           `onPoisonRethrow`
>                           (poison fileGZ' >> poison (writer fileGZ)),
>                           readFromFile fileData (writer data_),
>                           calculateMD5 dataMD5 (writer md5),
>                           compressCHP dataCompress
>                                       (writer compressed),
>                           writeToFile (reader fileGZ) compressedFile,
>                           calculateMD5 compressedMD5
>                                        (writer md5Compressed),
>                           forever $ readChannel dataMD5 >>=
>                                     liftIO . print >>
>                                     readChannel compressedMD5 >>= 
>                                     liftIO . print]

Problems:

(CHP) Thread terminated with: thread blocked indefinitely in an STM
transaction
< _b3, _b4, File GZ."test1.gz" >

> onPoisonFinally :: CHP a -> CHP () -> CHP a
> onPoisonFinally m b = (m `onPoisonRethrow` b) <* b
>

Utility function (used for closing handles)

> (<.>) :: Functor f => (b -> c) -> (a -> f b) -> a -> f c
> f <.> g = fmap f . g

<.> is for <$> as . to $.


> instance MonadCHP m => MonadCHP (StateT s m) where
>   liftCHP = lift . liftCHP

Missing instance for strict monad

> (->|^) :: Show b
>        => (Chanout b -> CHP ()) -> (String, Chanin b -> c -> CHP ())
>        -> (c -> CHP ())
> (->|^) p (l, q) x = do c <- oneToOneChannel' $ chanLabel l
>                        runParallel_ [p (writer c), q (reader c) x]

'Missing' helper function


> data EnrollingT a where
>    Lift :: CHP a -> EnrollingT a
>    Enroll :: (Enrollable b z) => b z -> EnrollingT (Enrolled b z)
> 
> enrolling :: EnrollingT a -> CHP a
> enrolling (Lift v)   = v
> enrolling (Enroll b) = enroll b return
>
> instance Monad EnrollingT where
>    (Lift m)   >>= f = Lift $ m >>= enrolling . f
>    (Enroll b) >>= f = Lift $ enroll b (enrolling . f)
>    return           = Lift . return
> instance MonadIO EnrollingT where
>    liftIO           = Lift . liftIO
> instance MonadCHP EnrollingT where
>    liftCHP          = Lift

Helper monad for enrolling (I know T should stand for transforming but
then I realize problems).

Thanks in advance 


_______________________________________________
Haskell-Cafe mailing list
[email protected]
http://www.haskell.org/mailman/listinfo/haskell-cafe

<Prev in Thread] Current Thread [Next in Thread>