Close a Coroutine channel when consumer's Job is cancelled












2














I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:



class Producer {

suspend fun start(): ReceiveChannel<String> {

val channel = Channel<String>(Channel.UNLIMITED)

// Asynchronous channel.send(it) from an object callback

channel.invokeOnClose {
// Channel is closed...
}

return channel
}

}

class Consumer : CoroutineScope {

private val producer = Producer()

private val job = Job()
override val coroutineContext = job + Dispatchers.Default

fun start() {
launch {
val channel = producer.start()

for (currentValue in channel) {
// use currentValue
}
}
}

fun stop() {
job.cancel()
}

}


The Producer create a channel, then fills it with values from an async job. The Consumer iterates over it and uses the values.



My expectation was that when I call job.cancel() from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose callback is never called.



I could maintain a reference to the channel in the Consumer and do channel.close(). I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?



Edit



It looks like using



launch {
val channel = producer.start()

channel.consumeEach { currentValue ->
// use currentValue
}
}


Would do the trick. However consumeEach() is marked as Obsolete.










share|improve this question





























    2














    I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:



    class Producer {

    suspend fun start(): ReceiveChannel<String> {

    val channel = Channel<String>(Channel.UNLIMITED)

    // Asynchronous channel.send(it) from an object callback

    channel.invokeOnClose {
    // Channel is closed...
    }

    return channel
    }

    }

    class Consumer : CoroutineScope {

    private val producer = Producer()

    private val job = Job()
    override val coroutineContext = job + Dispatchers.Default

    fun start() {
    launch {
    val channel = producer.start()

    for (currentValue in channel) {
    // use currentValue
    }
    }
    }

    fun stop() {
    job.cancel()
    }

    }


    The Producer create a channel, then fills it with values from an async job. The Consumer iterates over it and uses the values.



    My expectation was that when I call job.cancel() from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose callback is never called.



    I could maintain a reference to the channel in the Consumer and do channel.close(). I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?



    Edit



    It looks like using



    launch {
    val channel = producer.start()

    channel.consumeEach { currentValue ->
    // use currentValue
    }
    }


    Would do the trick. However consumeEach() is marked as Obsolete.










    share|improve this question



























      2












      2








      2







      I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:



      class Producer {

      suspend fun start(): ReceiveChannel<String> {

      val channel = Channel<String>(Channel.UNLIMITED)

      // Asynchronous channel.send(it) from an object callback

      channel.invokeOnClose {
      // Channel is closed...
      }

      return channel
      }

      }

      class Consumer : CoroutineScope {

      private val producer = Producer()

      private val job = Job()
      override val coroutineContext = job + Dispatchers.Default

      fun start() {
      launch {
      val channel = producer.start()

      for (currentValue in channel) {
      // use currentValue
      }
      }
      }

      fun stop() {
      job.cancel()
      }

      }


      The Producer create a channel, then fills it with values from an async job. The Consumer iterates over it and uses the values.



      My expectation was that when I call job.cancel() from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose callback is never called.



      I could maintain a reference to the channel in the Consumer and do channel.close(). I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?



      Edit



      It looks like using



      launch {
      val channel = producer.start()

      channel.consumeEach { currentValue ->
      // use currentValue
      }
      }


      Would do the trick. However consumeEach() is marked as Obsolete.










      share|improve this question















      I have a simple producer and a consumer that use Coroutine channels. Here is a dumbed down version:



      class Producer {

      suspend fun start(): ReceiveChannel<String> {

      val channel = Channel<String>(Channel.UNLIMITED)

      // Asynchronous channel.send(it) from an object callback

      channel.invokeOnClose {
      // Channel is closed...
      }

      return channel
      }

      }

      class Consumer : CoroutineScope {

      private val producer = Producer()

      private val job = Job()
      override val coroutineContext = job + Dispatchers.Default

      fun start() {
      launch {
      val channel = producer.start()

      for (currentValue in channel) {
      // use currentValue
      }
      }
      }

      fun stop() {
      job.cancel()
      }

      }


      The Producer create a channel, then fills it with values from an async job. The Consumer iterates over it and uses the values.



      My expectation was that when I call job.cancel() from the consumer, the channel iterator would throw and the channel would be closed. The invokeOnClose callback is never called.



      I could maintain a reference to the channel in the Consumer and do channel.close(). I want to know if there is a more clever solution to this. Maybe another way to iterate over the channel's values? Thanks?



      Edit



      It looks like using



      launch {
      val channel = producer.start()

      channel.consumeEach { currentValue ->
      // use currentValue
      }
      }


      Would do the trick. However consumeEach() is marked as Obsolete.







      kotlin kotlinx.coroutines






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 22 '18 at 6:51







      Jonas

















      asked Nov 21 '18 at 14:27









      JonasJonas

      2,05722046




      2,05722046
























          1 Answer
          1






          active

          oldest

          votes


















          0














          You expect that job.cancel() will propagate to your producer, but Producer is actually not related to anything. Marking function as suspend doesn't make it a coroutine.



          Here's one way to fix this with structured concurrency:



          class Producer: CoroutineScope {
          override val coroutineContext: CoroutineContext
          get() = Job() + Dispatchers.Default

          suspend fun start() = produce<String> {
          channel.send("A")

          channel.invokeOnClose {
          println("Closed")
          }
          }
          }


          Now your Producer is aware of CoroutineScope.



          And since we're using produce, you don't need to initialize your channel, as you did before.






          share|improve this answer





















          • With this method, the channel is closed as soon as I leave produce(). I want to keep it alive because the channel.send() are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
            – Jonas
            Nov 22 '18 at 6:51












          • Do you want to create a new Job on each access of coroutineContext?
            – Alexey Romanov
            Nov 22 '18 at 7:10










          • @Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap channel.send() in either a loop over incoming channel or as a callback listener.
            – Alexey Soshin
            Nov 22 '18 at 10:01










          • channel.send() is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce(), the channel is closed and the callback never has a chance to send values.
            – Jonas
            Nov 23 '18 at 17:01











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53414253%2fclose-a-coroutine-channel-when-consumers-job-is-cancelled%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          You expect that job.cancel() will propagate to your producer, but Producer is actually not related to anything. Marking function as suspend doesn't make it a coroutine.



          Here's one way to fix this with structured concurrency:



          class Producer: CoroutineScope {
          override val coroutineContext: CoroutineContext
          get() = Job() + Dispatchers.Default

          suspend fun start() = produce<String> {
          channel.send("A")

          channel.invokeOnClose {
          println("Closed")
          }
          }
          }


          Now your Producer is aware of CoroutineScope.



          And since we're using produce, you don't need to initialize your channel, as you did before.






          share|improve this answer





















          • With this method, the channel is closed as soon as I leave produce(). I want to keep it alive because the channel.send() are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
            – Jonas
            Nov 22 '18 at 6:51












          • Do you want to create a new Job on each access of coroutineContext?
            – Alexey Romanov
            Nov 22 '18 at 7:10










          • @Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap channel.send() in either a loop over incoming channel or as a callback listener.
            – Alexey Soshin
            Nov 22 '18 at 10:01










          • channel.send() is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce(), the channel is closed and the callback never has a chance to send values.
            – Jonas
            Nov 23 '18 at 17:01
















          0














          You expect that job.cancel() will propagate to your producer, but Producer is actually not related to anything. Marking function as suspend doesn't make it a coroutine.



          Here's one way to fix this with structured concurrency:



          class Producer: CoroutineScope {
          override val coroutineContext: CoroutineContext
          get() = Job() + Dispatchers.Default

          suspend fun start() = produce<String> {
          channel.send("A")

          channel.invokeOnClose {
          println("Closed")
          }
          }
          }


          Now your Producer is aware of CoroutineScope.



          And since we're using produce, you don't need to initialize your channel, as you did before.






          share|improve this answer





















          • With this method, the channel is closed as soon as I leave produce(). I want to keep it alive because the channel.send() are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
            – Jonas
            Nov 22 '18 at 6:51












          • Do you want to create a new Job on each access of coroutineContext?
            – Alexey Romanov
            Nov 22 '18 at 7:10










          • @Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap channel.send() in either a loop over incoming channel or as a callback listener.
            – Alexey Soshin
            Nov 22 '18 at 10:01










          • channel.send() is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce(), the channel is closed and the callback never has a chance to send values.
            – Jonas
            Nov 23 '18 at 17:01














          0












          0








          0






          You expect that job.cancel() will propagate to your producer, but Producer is actually not related to anything. Marking function as suspend doesn't make it a coroutine.



          Here's one way to fix this with structured concurrency:



          class Producer: CoroutineScope {
          override val coroutineContext: CoroutineContext
          get() = Job() + Dispatchers.Default

          suspend fun start() = produce<String> {
          channel.send("A")

          channel.invokeOnClose {
          println("Closed")
          }
          }
          }


          Now your Producer is aware of CoroutineScope.



          And since we're using produce, you don't need to initialize your channel, as you did before.






          share|improve this answer












          You expect that job.cancel() will propagate to your producer, but Producer is actually not related to anything. Marking function as suspend doesn't make it a coroutine.



          Here's one way to fix this with structured concurrency:



          class Producer: CoroutineScope {
          override val coroutineContext: CoroutineContext
          get() = Job() + Dispatchers.Default

          suspend fun start() = produce<String> {
          channel.send("A")

          channel.invokeOnClose {
          println("Closed")
          }
          }
          }


          Now your Producer is aware of CoroutineScope.



          And since we're using produce, you don't need to initialize your channel, as you did before.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 21 '18 at 23:02









          Alexey SoshinAlexey Soshin

          6,6131512




          6,6131512












          • With this method, the channel is closed as soon as I leave produce(). I want to keep it alive because the channel.send() are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
            – Jonas
            Nov 22 '18 at 6:51












          • Do you want to create a new Job on each access of coroutineContext?
            – Alexey Romanov
            Nov 22 '18 at 7:10










          • @Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap channel.send() in either a loop over incoming channel or as a callback listener.
            – Alexey Soshin
            Nov 22 '18 at 10:01










          • channel.send() is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce(), the channel is closed and the callback never has a chance to send values.
            – Jonas
            Nov 23 '18 at 17:01


















          • With this method, the channel is closed as soon as I leave produce(). I want to keep it alive because the channel.send() are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
            – Jonas
            Nov 22 '18 at 6:51












          • Do you want to create a new Job on each access of coroutineContext?
            – Alexey Romanov
            Nov 22 '18 at 7:10










          • @Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap channel.send() in either a loop over incoming channel or as a callback listener.
            – Alexey Soshin
            Nov 22 '18 at 10:01










          • channel.send() is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce(), the channel is closed and the callback never has a chance to send values.
            – Jonas
            Nov 23 '18 at 17:01
















          With this method, the channel is closed as soon as I leave produce(). I want to keep it alive because the channel.send() are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
          – Jonas
          Nov 22 '18 at 6:51






          With this method, the channel is closed as soon as I leave produce(). I want to keep it alive because the channel.send() are asynchronous. They come from a callback from an object that I don't own. I updated my sample to make it a bit clearer.
          – Jonas
          Nov 22 '18 at 6:51














          Do you want to create a new Job on each access of coroutineContext?
          – Alexey Romanov
          Nov 22 '18 at 7:10




          Do you want to create a new Job on each access of coroutineContext?
          – Alexey Romanov
          Nov 22 '18 at 7:10












          @Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap channel.send() in either a loop over incoming channel or as a callback listener.
          – Alexey Soshin
          Nov 22 '18 at 10:01




          @Jonas Channel is closed when you tell it to close. In your case, you probably want to wrap channel.send() in either a loop over incoming channel or as a callback listener.
          – Alexey Soshin
          Nov 22 '18 at 10:01












          channel.send() is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce(), the channel is closed and the callback never has a chance to send values.
          – Jonas
          Nov 23 '18 at 17:01




          channel.send() is indeed called from a callback listener of Android Fused Location Provider. But right when the code returns from produce(), the channel is closed and the callback never has a chance to send values.
          – Jonas
          Nov 23 '18 at 17:01


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53414253%2fclose-a-coroutine-channel-when-consumers-job-is-cancelled%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Wiesbaden

          Marschland

          Dieringhausen