Close a Coroutine channel when consumer's Job is cancelled
I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:
class Producer {
suspend fun start(): ReceiveChannel<String> {
val channel = Channel<String>(Channel.UNLIMITED)
// Asynchronous channel.send(it) from an object callback
channel.invokeOnClose {
// Channel is closed...
}
return channel
}
}
class Consumer : CoroutineScope {
private val producer = Producer()
private val job = Job()
override val coroutineContext = job + Dispatchers.Default
fun start() {
launch {
val channel = producer.start()
for (currentValue in channel) {
// use currentValue
}
}
}
fun stop() {
job.cancel()
}
}
The Producer
create a channel, then fills it with values from an async job. The Consumer
iterates over it and uses the values.
My expectation was that when I call job.cancel()
from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose
callback is never called.
I could maintain a reference to the channel in the Consumer
and do channel.close()
. I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?
Edit
It looks like using
launch {
val channel = producer.start()
channel.consumeEach { currentValue ->
// use currentValue
}
}
Would do the trick. However consumeEach()
is marked as Obsolete.
kotlin kotlinx.coroutines
add a comment |
I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:
class Producer {
suspend fun start(): ReceiveChannel<String> {
val channel = Channel<String>(Channel.UNLIMITED)
// Asynchronous channel.send(it) from an object callback
channel.invokeOnClose {
// Channel is closed...
}
return channel
}
}
class Consumer : CoroutineScope {
private val producer = Producer()
private val job = Job()
override val coroutineContext = job + Dispatchers.Default
fun start() {
launch {
val channel = producer.start()
for (currentValue in channel) {
// use currentValue
}
}
}
fun stop() {
job.cancel()
}
}
The Producer
create a channel, then fills it with values from an async job. The Consumer
iterates over it and uses the values.
My expectation was that when I call job.cancel()
from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose
callback is never called.
I could maintain a reference to the channel in the Consumer
and do channel.close()
. I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?
Edit
It looks like using
launch {
val channel = producer.start()
channel.consumeEach { currentValue ->
// use currentValue
}
}
Would do the trick. However consumeEach()
is marked as Obsolete.
kotlin kotlinx.coroutines
add a comment |
I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:
class Producer {
suspend fun start(): ReceiveChannel<String> {
val channel = Channel<String>(Channel.UNLIMITED)
// Asynchronous channel.send(it) from an object callback
channel.invokeOnClose {
// Channel is closed...
}
return channel
}
}
class Consumer : CoroutineScope {
private val producer = Producer()
private val job = Job()
override val coroutineContext = job + Dispatchers.Default
fun start() {
launch {
val channel = producer.start()
for (currentValue in channel) {
// use currentValue
}
}
}
fun stop() {
job.cancel()
}
}
The Producer
create a channel, then fills it with values from an async job. The Consumer
iterates over it and uses the values.
My expectation was that when I call job.cancel()
from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose
callback is never called.
I could maintain a reference to the channel in the Consumer
and do channel.close()
. I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?
Edit
It looks like using
launch {
val channel = producer.start()
channel.consumeEach { currentValue ->
// use currentValue
}
}
Would do the trick. However consumeEach()
is marked as Obsolete.
kotlin kotlinx.coroutines
I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:
class Producer {
suspend fun start(): ReceiveChannel<String> {
val channel = Channel<String>(Channel.UNLIMITED)
// Asynchronous channel.send(it) from an object callback
channel.invokeOnClose {
// Channel is closed...
}
return channel
}
}
class Consumer : CoroutineScope {
private val producer = Producer()
private val job = Job()
override val coroutineContext = job + Dispatchers.Default
fun start() {
launch {
val channel = producer.start()
for (currentValue in channel) {
// use currentValue
}
}
}
fun stop() {
job.cancel()
}
}
The Producer
create a channel, then fills it with values from an async job. The Consumer
iterates over it and uses the values.
My expectation was that when I call job.cancel()
from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose
callback is never called.
I could maintain a reference to the channel in the Consumer
and do channel.close()
. I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?
Edit
It looks like using
launch {
val channel = producer.start()
channel.consumeEach { currentValue ->
// use currentValue
}
}
Would do the trick. However consumeEach()
is marked as Obsolete.
kotlin kotlinx.coroutines
kotlin kotlinx.coroutines
edited Nov 22 '18 at 6:51
Jonas
asked Nov 21 '18 at 14:27
JonasJonas
2,05722046
2,05722046
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You expect that job.cancel()
will propagate to your producer, but Producer
is actually not related to anything. Marking function as suspend
doesn't make it a coroutine.
Here's one way to fix this with structured concurrency:
class Producer: CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Job() + Dispatchers.Default
suspend fun start() = produce<String> {
channel.send("A")
channel.invokeOnClose {
println("Closed")
}
}
}
Now your Producer
is aware of CoroutineScope
.
And since we're using produce
, you don't need to initialize your channel, as you did before.
With this method, the channel is closed as soon as I leaveproduce()
. I want to keep it alive because thechannel.send()
are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
– Jonas
Nov 22 '18 at 6:51
Do you want to create a newJob
on each access ofcoroutineContext
?
– Alexey Romanov
Nov 22 '18 at 7:10
@Jonas Channel is closed when you tell it to close. In your case, you probably want to wrapchannel.send()
in either a loop over incoming channel or as a callback listener.
– Alexey Soshin
Nov 22 '18 at 10:01
channel.send()
is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns fromproduce()
, the channel is closed and the callback never has a chance to send values.
– Jonas
Nov 23 '18 at 17:01
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53414253%2fclose-a-coroutine-channel-when-consumers-job-is-cancelled%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You expect that job.cancel()
will propagate to your producer, but Producer
is actually not related to anything. Marking function as suspend
doesn't make it a coroutine.
Here's one way to fix this with structured concurrency:
class Producer: CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Job() + Dispatchers.Default
suspend fun start() = produce<String> {
channel.send("A")
channel.invokeOnClose {
println("Closed")
}
}
}
Now your Producer
is aware of CoroutineScope
.
And since we're using produce
, you don't need to initialize your channel, as you did before.
With this method, the channel is closed as soon as I leaveproduce()
. I want to keep it alive because thechannel.send()
are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
– Jonas
Nov 22 '18 at 6:51
Do you want to create a newJob
on each access ofcoroutineContext
?
– Alexey Romanov
Nov 22 '18 at 7:10
@Jonas Channel is closed when you tell it to close. In your case, you probably want to wrapchannel.send()
in either a loop over incoming channel or as a callback listener.
– Alexey Soshin
Nov 22 '18 at 10:01
channel.send()
is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns fromproduce()
, the channel is closed and the callback never has a chance to send values.
– Jonas
Nov 23 '18 at 17:01
add a comment |
You expect that job.cancel()
will propagate to your producer, but Producer
is actually not related to anything. Marking function as suspend
doesn't make it a coroutine.
Here's one way to fix this with structured concurrency:
class Producer: CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Job() + Dispatchers.Default
suspend fun start() = produce<String> {
channel.send("A")
channel.invokeOnClose {
println("Closed")
}
}
}
Now your Producer
is aware of CoroutineScope
.
And since we're using produce
, you don't need to initialize your channel, as you did before.
With this method, the channel is closed as soon as I leaveproduce()
. I want to keep it alive because thechannel.send()
are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
– Jonas
Nov 22 '18 at 6:51
Do you want to create a newJob
on each access ofcoroutineContext
?
– Alexey Romanov
Nov 22 '18 at 7:10
@Jonas Channel is closed when you tell it to close. In your case, you probably want to wrapchannel.send()
in either a loop over incoming channel or as a callback listener.
– Alexey Soshin
Nov 22 '18 at 10:01
channel.send()
is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns fromproduce()
, the channel is closed and the callback never has a chance to send values.
– Jonas
Nov 23 '18 at 17:01
add a comment |
You expect that job.cancel()
will propagate to your producer, but Producer
is actually not related to anything. Marking function as suspend
doesn't make it a coroutine.
Here's one way to fix this with structured concurrency:
class Producer: CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Job() + Dispatchers.Default
suspend fun start() = produce<String> {
channel.send("A")
channel.invokeOnClose {
println("Closed")
}
}
}
Now your Producer
is aware of CoroutineScope
.
And since we're using produce
, you don't need to initialize your channel, as you did before.
You expect that job.cancel()
will propagate to your producer, but Producer
is actually not related to anything. Marking function as suspend
doesn't make it a coroutine.
Here's one way to fix this with structured concurrency:
class Producer: CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Job() + Dispatchers.Default
suspend fun start() = produce<String> {
channel.send("A")
channel.invokeOnClose {
println("Closed")
}
}
}
Now your Producer
is aware of CoroutineScope
.
And since we're using produce
, you don't need to initialize your channel, as you did before.
answered Nov 21 '18 at 23:02
Alexey SoshinAlexey Soshin
6,6131512
6,6131512
With this method, the channel is closed as soon as I leaveproduce()
. I want to keep it alive because thechannel.send()
are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
– Jonas
Nov 22 '18 at 6:51
Do you want to create a newJob
on each access ofcoroutineContext
?
– Alexey Romanov
Nov 22 '18 at 7:10
@Jonas Channel is closed when you tell it to close. In your case, you probably want to wrapchannel.send()
in either a loop over incoming channel or as a callback listener.
– Alexey Soshin
Nov 22 '18 at 10:01
channel.send()
is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns fromproduce()
, the channel is closed and the callback never has a chance to send values.
– Jonas
Nov 23 '18 at 17:01
add a comment |
With this method, the channel is closed as soon as I leaveproduce()
. I want to keep it alive because thechannel.send()
are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
– Jonas
Nov 22 '18 at 6:51
Do you want to create a newJob
on each access ofcoroutineContext
?
– Alexey Romanov
Nov 22 '18 at 7:10
@Jonas Channel is closed when you tell it to close. In your case, you probably want to wrapchannel.send()
in either a loop over incoming channel or as a callback listener.
– Alexey Soshin
Nov 22 '18 at 10:01
channel.send()
is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns fromproduce()
, the channel is closed and the callback never has a chance to send values.
– Jonas
Nov 23 '18 at 17:01
With this method, the channel is closed as soon as I leave
produce()
. I want to keep it alive because the channel.send()
are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.– Jonas
Nov 22 '18 at 6:51
With this method, the channel is closed as soon as I leave
produce()
. I want to keep it alive because the channel.send()
are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.– Jonas
Nov 22 '18 at 6:51
Do you want to create a new
Job
on each access of coroutineContext
?– Alexey Romanov
Nov 22 '18 at 7:10
Do you want to create a new
Job
on each access of coroutineContext
?– Alexey Romanov
Nov 22 '18 at 7:10
@Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap
channel.send()
in either a loop over incoming channel or as a callback listener.– Alexey Soshin
Nov 22 '18 at 10:01
@Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap
channel.send()
in either a loop over incoming channel or as a callback listener.– Alexey Soshin
Nov 22 '18 at 10:01
channel.send()
is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce()
, the channel is closed and the callback never has a chance to send values.– Jonas
Nov 23 '18 at 17:01
channel.send()
is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce()
, the channel is closed and the callback never has a chance to send values.– Jonas
Nov 23 '18 at 17:01
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53414253%2fclose-a-coroutine-channel-when-consumers-job-is-cancelled%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown