Multiplex akka streams source into two copies












1















I'm trying to cache a streaming source to disk while I'm also sending it out as an HttpResponse, i.e. I have a Source[ByteString,_] that I want to hand to HttpEntity, but I also want to run the same data into a FileIO.toPath sink.



                       |-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]


It seems that Broadcast is what I should use for fan-out but from the description, it writes to two sinks and while FileIO.toPath is a sink, HttpEntity expects a Source.



There's also Source.fromGraph which looks like it would create a source from a GraphStage, such as a Broadcast stage, but I can't quite figure out how I would get the FileIO sink in there.










share|improve this question























  • can you just have one source -> sink for FileIO.toPath and pass source to HttyEntity directly?

    – tomcy
    Nov 22 '18 at 6:45











  • From my experiments, using the source twice runs the source twice, i.e. i'm downloading the source twice rather than multiplexing the single download

    – Arne Claassen
    Nov 22 '18 at 18:40











  • I don't think its achievable because when your HttpEntity takes Source as part of its parameter, it will call run() eventually and at that point your source will be triggered second time.

    – tomcy
    Nov 22 '18 at 22:48
















1















I'm trying to cache a streaming source to disk while I'm also sending it out as an HttpResponse, i.e. I have a Source[ByteString,_] that I want to hand to HttpEntity, but I also want to run the same data into a FileIO.toPath sink.



                       |-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]


It seems that Broadcast is what I should use for fan-out but from the description, it writes to two sinks and while FileIO.toPath is a sink, HttpEntity expects a Source.



There's also Source.fromGraph which looks like it would create a source from a GraphStage, such as a Broadcast stage, but I can't quite figure out how I would get the FileIO sink in there.










share|improve this question























  • can you just have one source -> sink for FileIO.toPath and pass source to HttyEntity directly?

    – tomcy
    Nov 22 '18 at 6:45











  • From my experiments, using the source twice runs the source twice, i.e. i'm downloading the source twice rather than multiplexing the single download

    – Arne Claassen
    Nov 22 '18 at 18:40











  • I don't think its achievable because when your HttpEntity takes Source as part of its parameter, it will call run() eventually and at that point your source will be triggered second time.

    – tomcy
    Nov 22 '18 at 22:48














1












1








1








I'm trying to cache a streaming source to disk while I'm also sending it out as an HttpResponse, i.e. I have a Source[ByteString,_] that I want to hand to HttpEntity, but I also want to run the same data into a FileIO.toPath sink.



                       |-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]


It seems that Broadcast is what I should use for fan-out but from the description, it writes to two sinks and while FileIO.toPath is a sink, HttpEntity expects a Source.



There's also Source.fromGraph which looks like it would create a source from a GraphStage, such as a Broadcast stage, but I can't quite figure out how I would get the FileIO sink in there.










share|improve this question














I'm trying to cache a streaming source to disk while I'm also sending it out as an HttpResponse, i.e. I have a Source[ByteString,_] that I want to hand to HttpEntity, but I also want to run the same data into a FileIO.toPath sink.



                       |-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]


It seems that Broadcast is what I should use for fan-out but from the description, it writes to two sinks and while FileIO.toPath is a sink, HttpEntity expects a Source.



There's also Source.fromGraph which looks like it would create a source from a GraphStage, such as a Broadcast stage, but I can't quite figure out how I would get the FileIO sink in there.







scala akka-stream akka-http






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 22 '18 at 4:07









Arne ClaassenArne Claassen

9,03834982




9,03834982













  • can you just have one source -> sink for FileIO.toPath and pass source to HttyEntity directly?

    – tomcy
    Nov 22 '18 at 6:45











  • From my experiments, using the source twice runs the source twice, i.e. i'm downloading the source twice rather than multiplexing the single download

    – Arne Claassen
    Nov 22 '18 at 18:40











  • I don't think its achievable because when your HttpEntity takes Source as part of its parameter, it will call run() eventually and at that point your source will be triggered second time.

    – tomcy
    Nov 22 '18 at 22:48



















  • can you just have one source -> sink for FileIO.toPath and pass source to HttyEntity directly?

    – tomcy
    Nov 22 '18 at 6:45











  • From my experiments, using the source twice runs the source twice, i.e. i'm downloading the source twice rather than multiplexing the single download

    – Arne Claassen
    Nov 22 '18 at 18:40











  • I don't think its achievable because when your HttpEntity takes Source as part of its parameter, it will call run() eventually and at that point your source will be triggered second time.

    – tomcy
    Nov 22 '18 at 22:48

















can you just have one source -> sink for FileIO.toPath and pass source to HttyEntity directly?

– tomcy
Nov 22 '18 at 6:45





can you just have one source -> sink for FileIO.toPath and pass source to HttyEntity directly?

– tomcy
Nov 22 '18 at 6:45













From my experiments, using the source twice runs the source twice, i.e. i'm downloading the source twice rather than multiplexing the single download

– Arne Claassen
Nov 22 '18 at 18:40





From my experiments, using the source twice runs the source twice, i.e. i'm downloading the source twice rather than multiplexing the single download

– Arne Claassen
Nov 22 '18 at 18:40













I don't think its achievable because when your HttpEntity takes Source as part of its parameter, it will call run() eventually and at that point your source will be triggered second time.

– tomcy
Nov 22 '18 at 22:48





I don't think its achievable because when your HttpEntity takes Source as part of its parameter, it will call run() eventually and at that point your source will be triggered second time.

– tomcy
Nov 22 '18 at 22:48












1 Answer
1






active

oldest

votes


















0














You can use alsoTo:



val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)





share|improve this answer
























  • I started going down that road and it certainly is an answer to my question, unfortunately that did raise more questions: stackoverflow.com/questions/53437098/…

    – Arne Claassen
    Nov 23 '18 at 6:43











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53423746%2fmultiplex-akka-streams-source-into-two-copies%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









0














You can use alsoTo:



val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)





share|improve this answer
























  • I started going down that road and it certainly is an answer to my question, unfortunately that did raise more questions: stackoverflow.com/questions/53437098/…

    – Arne Claassen
    Nov 23 '18 at 6:43
















0














You can use alsoTo:



val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)





share|improve this answer
























  • I started going down that road and it certainly is an answer to my question, unfortunately that did raise more questions: stackoverflow.com/questions/53437098/…

    – Arne Claassen
    Nov 23 '18 at 6:43














0












0








0







You can use alsoTo:



val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)





share|improve this answer













You can use alsoTo:



val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)






share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 23 '18 at 2:34









Jeffrey ChungJeffrey Chung

13.8k62140




13.8k62140













  • I started going down that road and it certainly is an answer to my question, unfortunately that did raise more questions: stackoverflow.com/questions/53437098/…

    – Arne Claassen
    Nov 23 '18 at 6:43



















  • I started going down that road and it certainly is an answer to my question, unfortunately that did raise more questions: stackoverflow.com/questions/53437098/…

    – Arne Claassen
    Nov 23 '18 at 6:43

















I started going down that road and it certainly is an answer to my question, unfortunately that did raise more questions: stackoverflow.com/questions/53437098/…

– Arne Claassen
Nov 23 '18 at 6:43





I started going down that road and it certainly is an answer to my question, unfortunately that did raise more questions: stackoverflow.com/questions/53437098/…

– Arne Claassen
Nov 23 '18 at 6:43


















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53423746%2fmultiplex-akka-streams-source-into-two-copies%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Wiesbaden

To store a contact into the json file from server.js file using a class in NodeJS

Marschland