Add support for streaming NAR output (#1)
Before this change the NAR contents would be buffered in memory before supplying them to the client. Now each chunk is instantly flushed to the client when it's available. This requires reworking things so that we use a `StreamingBody`, which in turn requires passing a Haskell callback to the C code to invoke.
This commit is contained in:
committed by
GitHub
parent
319e70637e
commit
0242d8bebb
@@ -159,20 +159,40 @@ void signString
|
|||||||
copyString(signature, output);
|
copyString(signature, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dumpPath(char const * const hashPart, struct string * const output) {
|
bool dumpPath
|
||||||
|
( char const * const hashPart
|
||||||
|
, bool (* const callback)(struct string const * const)
|
||||||
|
)
|
||||||
|
{
|
||||||
ref<Store> store = getStore();
|
ref<Store> store = getStore();
|
||||||
|
|
||||||
std::optional<StorePath> storePath =
|
std::optional<StorePath> storePath =
|
||||||
store->queryPathFromHashPart(hashPart);
|
store->queryPathFromHashPart(hashPart);
|
||||||
|
|
||||||
if (storePath.has_value()) {
|
if (storePath.has_value()) {
|
||||||
StringSink sink;
|
LambdaSink sink([=](std::string_view v) {
|
||||||
|
struct string s = { .data = v.data(), .size = v.size() };
|
||||||
|
|
||||||
store->narFromPath(storePath.value(), sink);
|
bool succeeded = (*callback)(&s);
|
||||||
|
|
||||||
copyString(sink.s, output);
|
if (!succeeded) {
|
||||||
|
// We don't really care about the error message. The only
|
||||||
|
// reason for throwing an exception here is that this is the
|
||||||
|
// only way that a Nix sink can exit early.
|
||||||
|
throw std::runtime_error("");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
store->narFromPath(storePath.value(), sink);
|
||||||
|
} catch (const std::runtime_error & e) {
|
||||||
|
// Intentionally do nothing. We're only using the exception as a
|
||||||
|
// short-circuiting mechanism.
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
*output = emptyString;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
28
src/Main.hs
28
src/Main.hs
@@ -12,10 +12,11 @@ import Data.CharSet.ByteSet (ByteSet(..))
|
|||||||
import Data.Function ((&))
|
import Data.Function ((&))
|
||||||
import Network.Socket (SockAddr(..))
|
import Network.Socket (SockAddr(..))
|
||||||
import Network.Wai (Application)
|
import Network.Wai (Application)
|
||||||
import Nix (PathInfo(..))
|
import Nix (NoSuchPath(..), PathInfo(..))
|
||||||
import Numeric.Natural (Natural)
|
import Numeric.Natural (Natural)
|
||||||
import Options (Options(..), Socket(..), SSL(..), Verbosity(..))
|
import Options (Options(..), Socket(..), SSL(..), Verbosity(..))
|
||||||
|
|
||||||
|
import qualified Control.Exception as Exception
|
||||||
import qualified Control.Monad as Monad
|
import qualified Control.Monad as Monad
|
||||||
import qualified Control.Monad.Except as Except
|
import qualified Control.Monad.Except as Except
|
||||||
import qualified Data.ByteString as ByteString
|
import qualified Data.ByteString as ByteString
|
||||||
@@ -112,8 +113,8 @@ makeApplication ApplicationOptions{..} request respond = do
|
|||||||
maybeStorePath <- liftIO (Nix.queryPathFromHashPart hashPart)
|
maybeStorePath <- liftIO (Nix.queryPathFromHashPart hashPart)
|
||||||
|
|
||||||
storePath <- case maybeStorePath of
|
storePath <- case maybeStorePath of
|
||||||
Nothing -> noSuchPath
|
Left NoSuchPath -> noSuchPath
|
||||||
Just storePath -> return storePath
|
Right storePath -> return storePath
|
||||||
|
|
||||||
pathInfo@PathInfo{..} <- liftIO (Nix.queryPathInfo storePath)
|
pathInfo@PathInfo{..} <- liftIO (Nix.queryPathInfo storePath)
|
||||||
|
|
||||||
@@ -230,8 +231,8 @@ makeApplication ApplicationOptions{..} request respond = do
|
|||||||
maybeStorePath <- liftIO (Nix.queryPathFromHashPart hashPart)
|
maybeStorePath <- liftIO (Nix.queryPathFromHashPart hashPart)
|
||||||
|
|
||||||
storePath <- case maybeStorePath of
|
storePath <- case maybeStorePath of
|
||||||
Nothing -> noSuchPath
|
Left NoSuchPath-> noSuchPath
|
||||||
Just storePath -> return storePath
|
Right storePath -> return storePath
|
||||||
|
|
||||||
PathInfo{ narHash } <- liftIO (Nix.queryPathInfo storePath)
|
PathInfo{ narHash } <- liftIO (Nix.queryPathInfo storePath)
|
||||||
|
|
||||||
@@ -249,18 +250,21 @@ makeApplication ApplicationOptions{..} request respond = do
|
|||||||
|
|
||||||
done response
|
done response
|
||||||
|
|
||||||
maybeBytes <- liftIO (Nix.dumpPath hashPart)
|
let streamingBody write flush = do
|
||||||
|
result <- Nix.dumpPath hashPart callback
|
||||||
|
|
||||||
bytes <- case maybeBytes of
|
case result of
|
||||||
Nothing -> noSuchPath
|
Left exception -> Exception.throwIO exception
|
||||||
Just bytes -> return bytes
|
Right x -> return x
|
||||||
|
where
|
||||||
let lazyBytes = ByteString.Lazy.fromStrict bytes
|
callback builder = do
|
||||||
|
() <- write builder
|
||||||
|
flush
|
||||||
|
|
||||||
let headers = [ ("Content-Type", "text/plain") ]
|
let headers = [ ("Content-Type", "text/plain") ]
|
||||||
|
|
||||||
let response =
|
let response =
|
||||||
Wai.responseLBS Types.status200 headers lazyBytes
|
Wai.responseStream Types.status200 headers streamingBody
|
||||||
|
|
||||||
done response
|
done response
|
||||||
|
|
||||||
|
|||||||
61
src/Nix.hsc
61
src/Nix.hsc
@@ -1,4 +1,6 @@
|
|||||||
{-# LANGUAGE BlockArguments #-}
|
{-# LANGUAGE BlockArguments #-}
|
||||||
|
{-# LANGUAGE DeriveAnyClass #-}
|
||||||
|
{-# LANGUAGE DerivingStrategies #-}
|
||||||
{-# LANGUAGE DuplicateRecordFields #-}
|
{-# LANGUAGE DuplicateRecordFields #-}
|
||||||
{-# LANGUAGE ForeignFunctionInterface #-}
|
{-# LANGUAGE ForeignFunctionInterface #-}
|
||||||
{-# LANGUAGE MultiWayIf #-}
|
{-# LANGUAGE MultiWayIf #-}
|
||||||
@@ -9,20 +11,23 @@
|
|||||||
module Nix where
|
module Nix where
|
||||||
|
|
||||||
import Control.Applicative (empty)
|
import Control.Applicative (empty)
|
||||||
|
import Control.Exception (Exception, SomeException)
|
||||||
import Control.Monad.Managed (Managed)
|
import Control.Monad.Managed (Managed)
|
||||||
import Data.ByteString (ByteString)
|
import Data.ByteString (ByteString)
|
||||||
import Data.ByteString.Builder (Builder)
|
import Data.ByteString.Builder (Builder)
|
||||||
import Data.Vector (Vector)
|
import Data.Vector (Vector)
|
||||||
import Data.Word (Word64)
|
import Data.Word (Word64)
|
||||||
import Foreign (Ptr, Storable(..))
|
import Foreign (FunPtr, Ptr, Storable(..))
|
||||||
import Foreign.C (CChar, CLong, CSize, CString)
|
import Foreign.C (CChar, CLong, CSize, CString)
|
||||||
|
|
||||||
import qualified Control.Exception as Exception
|
import qualified Control.Exception as Exception
|
||||||
|
import qualified Control.Monad as Monad
|
||||||
import qualified Control.Monad.Managed as Managed
|
import qualified Control.Monad.Managed as Managed
|
||||||
import qualified Data.ByteString as ByteString
|
import qualified Data.ByteString as ByteString
|
||||||
import qualified Data.ByteString.Base16 as Base16
|
import qualified Data.ByteString.Base16 as Base16
|
||||||
import qualified Data.ByteString.Base32 as Base32
|
import qualified Data.ByteString.Base32 as Base32
|
||||||
import qualified Data.ByteString.Builder as Builder
|
import qualified Data.ByteString.Builder as Builder
|
||||||
|
import qualified Data.IORef as IORef
|
||||||
import qualified Data.Vector as Vector
|
import qualified Data.Vector as Vector
|
||||||
import qualified Data.Vector.Storable as Vector.Storable
|
import qualified Data.Vector.Storable as Vector.Storable
|
||||||
import qualified Foreign
|
import qualified Foreign
|
||||||
@@ -153,7 +158,7 @@ data PathInfo = PathInfo
|
|||||||
, narSize :: Word64
|
, narSize :: Word64
|
||||||
, references :: Vector ByteString
|
, references :: Vector ByteString
|
||||||
, sigs :: Vector ByteString
|
, sigs :: Vector ByteString
|
||||||
} deriving (Show)
|
} deriving stock (Show)
|
||||||
|
|
||||||
fromCPathInfo :: CPathInfo -> IO PathInfo
|
fromCPathInfo :: CPathInfo -> IO PathInfo
|
||||||
fromCPathInfo CPathInfo{ deriver, narHash, narSize, references, sigs } = do
|
fromCPathInfo CPathInfo{ deriver, narHash, narSize, references, sigs } = do
|
||||||
@@ -187,10 +192,14 @@ getStoreDir =
|
|||||||
string_ <- peek output
|
string_ <- peek output
|
||||||
fromString_ string_
|
fromString_ string_
|
||||||
|
|
||||||
|
data NoSuchPath = NoSuchPath
|
||||||
|
deriving anyclass (Exception)
|
||||||
|
deriving stock (Show)
|
||||||
|
|
||||||
foreign import ccall "queryPathFromHashPart" queryPathFromHashPart_
|
foreign import ccall "queryPathFromHashPart" queryPathFromHashPart_
|
||||||
:: CString -> Ptr String_ -> IO ()
|
:: CString -> Ptr String_ -> IO ()
|
||||||
|
|
||||||
queryPathFromHashPart :: ByteString -> IO (Maybe ByteString)
|
queryPathFromHashPart :: ByteString -> IO (Either NoSuchPath ByteString)
|
||||||
queryPathFromHashPart hashPart = do
|
queryPathFromHashPart hashPart = do
|
||||||
ByteString.useAsCString hashPart \cHashPart -> do
|
ByteString.useAsCString hashPart \cHashPart -> do
|
||||||
Foreign.alloca \output -> do
|
Foreign.alloca \output -> do
|
||||||
@@ -199,8 +208,8 @@ queryPathFromHashPart hashPart = do
|
|||||||
Exception.bracket_ open close do
|
Exception.bracket_ open close do
|
||||||
string_@String_{ data_} <- peek output
|
string_@String_{ data_} <- peek output
|
||||||
if data_ == Foreign.nullPtr
|
if data_ == Foreign.nullPtr
|
||||||
then return Nothing
|
then return (Left NoSuchPath)
|
||||||
else fmap Just (fromString_ string_)
|
else fmap Right (fromString_ string_)
|
||||||
|
|
||||||
foreign import ccall "queryPathInfo" queryPathInfo_
|
foreign import ccall "queryPathInfo" queryPathInfo_
|
||||||
:: CString -> Ptr CPathInfo -> IO ()
|
:: CString -> Ptr CPathInfo -> IO ()
|
||||||
@@ -262,19 +271,36 @@ signString secretKey fingerprint =
|
|||||||
fromString_ string_
|
fromString_ string_
|
||||||
|
|
||||||
foreign import ccall "dumpPath" dumpPath_
|
foreign import ccall "dumpPath" dumpPath_
|
||||||
:: CString -> Ptr String_ -> IO ()
|
:: CString -> FunPtr (Ptr String_ -> IO Bool) -> IO Bool
|
||||||
|
|
||||||
|
dumpPath :: ByteString -> (Builder -> IO ()) -> IO (Either SomeException ())
|
||||||
|
dumpPath hashPart builderCallback = do
|
||||||
|
result <- IORef.newIORef (Right ())
|
||||||
|
|
||||||
|
let cCallback :: Ptr String_ -> IO Bool
|
||||||
|
cCallback pointer = do
|
||||||
|
string_ <- Foreign.peek pointer
|
||||||
|
|
||||||
|
byteString <- fromString_ string_
|
||||||
|
|
||||||
|
let handler :: SomeException -> IO Bool
|
||||||
|
handler exception = do
|
||||||
|
IORef.writeIORef result (Left exception)
|
||||||
|
return False
|
||||||
|
|
||||||
|
Exception.handle handler do
|
||||||
|
builderCallback (Builder.byteString byteString)
|
||||||
|
return True
|
||||||
|
|
||||||
|
wrappedCCallback <- wrapCallback cCallback
|
||||||
|
|
||||||
dumpPath :: ByteString -> IO (Maybe ByteString)
|
|
||||||
dumpPath hashPart = do
|
|
||||||
ByteString.useAsCString hashPart \cHashPart -> do
|
ByteString.useAsCString hashPart \cHashPart -> do
|
||||||
Foreign.alloca \output -> do
|
success <- dumpPath_ cHashPart wrappedCCallback
|
||||||
let open = dumpPath_ cHashPart output
|
|
||||||
let close = freeString output
|
Monad.when (not success) do
|
||||||
Exception.bracket_ open close do
|
IORef.writeIORef result (Left (Exception.toException NoSuchPath))
|
||||||
string_@String_{ data_} <- peek output
|
|
||||||
if data_ == Foreign.nullPtr
|
IORef.readIORef result
|
||||||
then return Nothing
|
|
||||||
else fmap Just (fromString_ string_)
|
|
||||||
|
|
||||||
foreign import ccall "dumpLog" dumpLog_
|
foreign import ccall "dumpLog" dumpLog_
|
||||||
:: CString -> Ptr String_ -> IO ()
|
:: CString -> Ptr String_ -> IO ()
|
||||||
@@ -290,3 +316,6 @@ dumpLog baseName = do
|
|||||||
if data_ == Foreign.nullPtr
|
if data_ == Foreign.nullPtr
|
||||||
then return Nothing
|
then return Nothing
|
||||||
else fmap Just (fromString_ string_)
|
else fmap Just (fromString_ string_)
|
||||||
|
|
||||||
|
foreign import ccall "wrapper" wrapCallback
|
||||||
|
:: (Ptr String_ -> IO Bool) -> IO (FunPtr (Ptr String_ -> IO Bool))
|
||||||
|
|||||||
Reference in New Issue
Block a user