Sharing a dynamically started worker among several consumers











up vote
2
down vote

favorite












I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.



My requirements are as follow:




  • The worker instance is created dynamically first time a consumer requires it.

  • When other consumers then require it, they re-use the same worker instance.

  • When the last consumer closes the stream, it cleans up its resources.


This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:



class Stream:
async def start(self, *, timeout=DEFAULT_TIMEOUT):
pass
async def stop(self):
pass


I have the following scenarios:



Scenario 1 - exception at startup




  • Consumer 1 requests worker to start.

  • Worker startup sequence begins

  • Consumer 2 requests worker to start.

  • Worker startup sequence raises an exception.

  • Both consumer should see the exception as the result of their call to start().


Scenario 2 - partial asynchronous cancellation




  • Consumer 1 requests worker to start.

  • Worker startup sequence begins

  • Consumer 2 requests worker to start.

  • Consumer 1 gets cancelled.

  • Worker startup sequence completes.

  • Consumer 2 should see a successful start.


Scenario 3 - complete asynchronous cancellation




  • Consumer 1 requests worker to start.

  • Worker startup sequence begins

  • Consumer 2 requests worker to start.

  • Consumer 1 gets cancelled.

  • Consumer 2 gets cancelled.

  • Worker startup sequence must be cancelled as a result.


I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.





Here is an attempt at writing start(). It relies on _worker() setting an asyncio.Event named self._worker_ready when it completes the startup sequence:



async def start(self, timeout=None):
assert not self.closing
if not self._task:
self._task = asyncio.ensure_future(self._worker())

# Wait until worker is ready, has failed, or timeout triggers
try:
self._waiting_start += 1
wait_ready = asyncio.ensure_future(self._worker_ready.wait())

done, pending = await asyncio.wait(
[self._task, wait_ready],
return_when=asyncio.FIRST_COMPLETED, timeout=timeout
)
except asyncio.CancelledError:
wait_ready.cancel()
if self._waiting_start == 1:
self.closing = True
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task # let worker shutdown
raise
finally:
self._waiting_start -= 1

# worker failed to start - either throwing or timeout triggering
if not self._worker_ready.is_set():
self.closing = True
self._task.cancel()
wait_ready.cancel()

try:
await self._task # let worker shutdown
except asyncio.CancelledError:
raise FeedTimeoutError('stream failed to start within %ss' % timeout)
else:
assert False, 'worker must propagate the exception'


That seems to work, but it seems too complex, and is really hard to test: the worker has many await points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.



I need a better way. I am thus wondering:




  • Are my requirements reasonable?

  • Is there a common pattern to do this?

  • Does my question raise some code smell?










share|improve this question




























    up vote
    2
    down vote

    favorite












    I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.



    My requirements are as follow:




    • The worker instance is created dynamically first time a consumer requires it.

    • When other consumers then require it, they re-use the same worker instance.

    • When the last consumer closes the stream, it cleans up its resources.


    This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:



    class Stream:
    async def start(self, *, timeout=DEFAULT_TIMEOUT):
    pass
    async def stop(self):
    pass


    I have the following scenarios:



    Scenario 1 - exception at startup




    • Consumer 1 requests worker to start.

    • Worker startup sequence begins

    • Consumer 2 requests worker to start.

    • Worker startup sequence raises an exception.

    • Both consumer should see the exception as the result of their call to start().


    Scenario 2 - partial asynchronous cancellation




    • Consumer 1 requests worker to start.

    • Worker startup sequence begins

    • Consumer 2 requests worker to start.

    • Consumer 1 gets cancelled.

    • Worker startup sequence completes.

    • Consumer 2 should see a successful start.


    Scenario 3 - complete asynchronous cancellation




    • Consumer 1 requests worker to start.

    • Worker startup sequence begins

    • Consumer 2 requests worker to start.

    • Consumer 1 gets cancelled.

    • Consumer 2 gets cancelled.

    • Worker startup sequence must be cancelled as a result.


    I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.





    Here is an attempt at writing start(). It relies on _worker() setting an asyncio.Event named self._worker_ready when it completes the startup sequence:



    async def start(self, timeout=None):
    assert not self.closing
    if not self._task:
    self._task = asyncio.ensure_future(self._worker())

    # Wait until worker is ready, has failed, or timeout triggers
    try:
    self._waiting_start += 1
    wait_ready = asyncio.ensure_future(self._worker_ready.wait())

    done, pending = await asyncio.wait(
    [self._task, wait_ready],
    return_when=asyncio.FIRST_COMPLETED, timeout=timeout
    )
    except asyncio.CancelledError:
    wait_ready.cancel()
    if self._waiting_start == 1:
    self.closing = True
    self._task.cancel()
    with suppress(asyncio.CancelledError):
    await self._task # let worker shutdown
    raise
    finally:
    self._waiting_start -= 1

    # worker failed to start - either throwing or timeout triggering
    if not self._worker_ready.is_set():
    self.closing = True
    self._task.cancel()
    wait_ready.cancel()

    try:
    await self._task # let worker shutdown
    except asyncio.CancelledError:
    raise FeedTimeoutError('stream failed to start within %ss' % timeout)
    else:
    assert False, 'worker must propagate the exception'


    That seems to work, but it seems too complex, and is really hard to test: the worker has many await points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.



    I need a better way. I am thus wondering:




    • Are my requirements reasonable?

    • Is there a common pattern to do this?

    • Does my question raise some code smell?










    share|improve this question


























      up vote
      2
      down vote

      favorite









      up vote
      2
      down vote

      favorite











      I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.



      My requirements are as follow:




      • The worker instance is created dynamically first time a consumer requires it.

      • When other consumers then require it, they re-use the same worker instance.

      • When the last consumer closes the stream, it cleans up its resources.


      This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:



      class Stream:
      async def start(self, *, timeout=DEFAULT_TIMEOUT):
      pass
      async def stop(self):
      pass


      I have the following scenarios:



      Scenario 1 - exception at startup




      • Consumer 1 requests worker to start.

      • Worker startup sequence begins

      • Consumer 2 requests worker to start.

      • Worker startup sequence raises an exception.

      • Both consumer should see the exception as the result of their call to start().


      Scenario 2 - partial asynchronous cancellation




      • Consumer 1 requests worker to start.

      • Worker startup sequence begins

      • Consumer 2 requests worker to start.

      • Consumer 1 gets cancelled.

      • Worker startup sequence completes.

      • Consumer 2 should see a successful start.


      Scenario 3 - complete asynchronous cancellation




      • Consumer 1 requests worker to start.

      • Worker startup sequence begins

      • Consumer 2 requests worker to start.

      • Consumer 1 gets cancelled.

      • Consumer 2 gets cancelled.

      • Worker startup sequence must be cancelled as a result.


      I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.





      Here is an attempt at writing start(). It relies on _worker() setting an asyncio.Event named self._worker_ready when it completes the startup sequence:



      async def start(self, timeout=None):
      assert not self.closing
      if not self._task:
      self._task = asyncio.ensure_future(self._worker())

      # Wait until worker is ready, has failed, or timeout triggers
      try:
      self._waiting_start += 1
      wait_ready = asyncio.ensure_future(self._worker_ready.wait())

      done, pending = await asyncio.wait(
      [self._task, wait_ready],
      return_when=asyncio.FIRST_COMPLETED, timeout=timeout
      )
      except asyncio.CancelledError:
      wait_ready.cancel()
      if self._waiting_start == 1:
      self.closing = True
      self._task.cancel()
      with suppress(asyncio.CancelledError):
      await self._task # let worker shutdown
      raise
      finally:
      self._waiting_start -= 1

      # worker failed to start - either throwing or timeout triggering
      if not self._worker_ready.is_set():
      self.closing = True
      self._task.cancel()
      wait_ready.cancel()

      try:
      await self._task # let worker shutdown
      except asyncio.CancelledError:
      raise FeedTimeoutError('stream failed to start within %ss' % timeout)
      else:
      assert False, 'worker must propagate the exception'


      That seems to work, but it seems too complex, and is really hard to test: the worker has many await points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.



      I need a better way. I am thus wondering:




      • Are my requirements reasonable?

      • Is there a common pattern to do this?

      • Does my question raise some code smell?










      share|improve this question















      I am building a worker class that connects to an external event stream using asyncio. It is a single stream, but several consumer may enable it. The goal is to only maintain the connection while one or more consumer requires it.



      My requirements are as follow:




      • The worker instance is created dynamically first time a consumer requires it.

      • When other consumers then require it, they re-use the same worker instance.

      • When the last consumer closes the stream, it cleans up its resources.


      This sounds easy enough. However, the startup sequence is causing me issues, because it is itself asynchronous. Thus, assuming this interface:



      class Stream:
      async def start(self, *, timeout=DEFAULT_TIMEOUT):
      pass
      async def stop(self):
      pass


      I have the following scenarios:



      Scenario 1 - exception at startup




      • Consumer 1 requests worker to start.

      • Worker startup sequence begins

      • Consumer 2 requests worker to start.

      • Worker startup sequence raises an exception.

      • Both consumer should see the exception as the result of their call to start().


      Scenario 2 - partial asynchronous cancellation




      • Consumer 1 requests worker to start.

      • Worker startup sequence begins

      • Consumer 2 requests worker to start.

      • Consumer 1 gets cancelled.

      • Worker startup sequence completes.

      • Consumer 2 should see a successful start.


      Scenario 3 - complete asynchronous cancellation




      • Consumer 1 requests worker to start.

      • Worker startup sequence begins

      • Consumer 2 requests worker to start.

      • Consumer 1 gets cancelled.

      • Consumer 2 gets cancelled.

      • Worker startup sequence must be cancelled as a result.


      I struggle to cover all scenarios without getting any race condition and a spaghetti mess of either bare Future or Event objects.





      Here is an attempt at writing start(). It relies on _worker() setting an asyncio.Event named self._worker_ready when it completes the startup sequence:



      async def start(self, timeout=None):
      assert not self.closing
      if not self._task:
      self._task = asyncio.ensure_future(self._worker())

      # Wait until worker is ready, has failed, or timeout triggers
      try:
      self._waiting_start += 1
      wait_ready = asyncio.ensure_future(self._worker_ready.wait())

      done, pending = await asyncio.wait(
      [self._task, wait_ready],
      return_when=asyncio.FIRST_COMPLETED, timeout=timeout
      )
      except asyncio.CancelledError:
      wait_ready.cancel()
      if self._waiting_start == 1:
      self.closing = True
      self._task.cancel()
      with suppress(asyncio.CancelledError):
      await self._task # let worker shutdown
      raise
      finally:
      self._waiting_start -= 1

      # worker failed to start - either throwing or timeout triggering
      if not self._worker_ready.is_set():
      self.closing = True
      self._task.cancel()
      wait_ready.cancel()

      try:
      await self._task # let worker shutdown
      except asyncio.CancelledError:
      raise FeedTimeoutError('stream failed to start within %ss' % timeout)
      else:
      assert False, 'worker must propagate the exception'


      That seems to work, but it seems too complex, and is really hard to test: the worker has many await points, leading to combinatoric explosion if I am to try all possible cancellation points and execution orders.



      I need a better way. I am thus wondering:




      • Are my requirements reasonable?

      • Is there a common pattern to do this?

      • Does my question raise some code smell?







      python-3.x python-asyncio






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 8 at 15:26

























      asked Nov 8 at 15:17









      spectras

      7,08511434




      7,08511434
























          2 Answers
          2






          active

          oldest

          votes

















          up vote
          2
          down vote



          accepted










          Your requirements sound reasonable. I would try to simplify start by replacing Event with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:



          class Stream:
          async def start(self, *, timeout=DEFAULT_TIMEOUT):
          loop = asyncio.get_event_loop()
          if self._worker_startup_task is None:
          self._worker_startup_task =
          loop.create_task(self._worker_startup())

          self._add_user()
          try:
          await asyncio.shield(asyncio.wait_for(
          self._worker_startup_task, timeout))
          except:
          self._rm_user()
          raise

          async def _worker_startup(self):
          loop = asyncio.get_event_loop()
          await asyncio.sleep(1) # ...
          self._worker_task = loop.create_task(self._worker())


          In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.



          Thus in case of consumer cancellation, await self._worker_startup_task will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.



          Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel() will cancel the startup sequence, as required by scenario 3.



          The rest of the code would look like this (untested):



              def __init__(self):
          self._users = 0
          self._worker_startup = None

          def _add_user(self):
          self._users += 1

          def _rm_user(self):
          self._users -= 1
          if self._users:
          return
          self._worker_startup_task.cancel()
          self._worker_startup_task = None
          if self._worker_task is not None:
          self._worker_task.cancel()
          self._worker_task = None

          async def stop(self):
          self._rm_user()

          async def _worker(self):
          # actual worker...
          while True:
          await asyncio.sleep(1)





          share|improve this answer























          • Hmmm, cancelled coroutines need to be awaited as well, and the wait_for should asyncio.shield the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
            – spectras
            Nov 11 at 16:44










          • @spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
            – user4815162342
            Nov 11 at 18:27










          • If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
            – spectras
            Nov 11 at 22:54










          • I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
            – spectras
            Nov 11 at 23:00




















          up vote
          1
          down vote













          With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:



          st = SharedTask(test())
          task1 = asyncio.ensure_future(st.wait())
          task2 = asyncio.ensure_future(st.wait(timeout=15))
          task3 = asyncio.ensure_future(st.wait())


          This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test() unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test() and wait for cancellation handling to complete.



          If passed a coroutine, it is only scheduled when first task starts waiting.



          Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).



          import asyncio
          from contextlib import suppress


          class SharedTask:
          __slots__ = ('_clients', '_task')

          def __init__(self, task):
          if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
          raise TypeError('task must be either a Future or a coroutine object')
          self._clients = 0
          self._task = task

          @property
          def started(self):
          return asyncio.isfuture(self._task)

          async def wait(self, *, timeout=None):
          self._task = asyncio.ensure_future(self._task)

          self._clients += 1
          try:
          return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
          except:
          self._clients -= 1
          if self._clients == 0 and not self._task.done():
          self._task.cancel()
          with suppress(asyncio.CancelledError):
          await self._task
          raise

          def cancel(self):
          if asyncio.iscoroutine(self._task):
          self._task.close()
          elif asyncio.isfuture(self._task):
          self._task.cancel()




          The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:



          async def my_task():
          try:
          await do_stuff()
          except asyncio.CancelledError as exc:
          await flush_some_stuff() # might raise an exception
          raise exc


          The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task is wrapped in a SharedTask or not.






          share|improve this answer























          • suppress(asyncio.CancelledError) looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
            – user4815162342
            Nov 12 at 10:04










          • @user4815162342 It is counter-intuitive, but it doesn't make it uncancellable. suppress does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
            – spectras
            Nov 12 at 11:12










          • I get that; the part that worries me is that if a CancelledError doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
            – user4815162342
            Nov 12 at 11:56








          • 1




            This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in SharedTask). Should one want another behavior, use suppress(Exception) instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
            – spectras
            Nov 12 at 12:32








          • 1




            @user4815162342 yes. Well there is a test for self._task.done() that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
            – spectras
            Nov 12 at 13:08











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














           

          draft saved


          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53210696%2fsharing-a-dynamically-started-worker-among-several-consumers%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          2 Answers
          2






          active

          oldest

          votes








          2 Answers
          2






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          2
          down vote



          accepted










          Your requirements sound reasonable. I would try to simplify start by replacing Event with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:



          class Stream:
          async def start(self, *, timeout=DEFAULT_TIMEOUT):
          loop = asyncio.get_event_loop()
          if self._worker_startup_task is None:
          self._worker_startup_task =
          loop.create_task(self._worker_startup())

          self._add_user()
          try:
          await asyncio.shield(asyncio.wait_for(
          self._worker_startup_task, timeout))
          except:
          self._rm_user()
          raise

          async def _worker_startup(self):
          loop = asyncio.get_event_loop()
          await asyncio.sleep(1) # ...
          self._worker_task = loop.create_task(self._worker())


          In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.



          Thus in case of consumer cancellation, await self._worker_startup_task will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.



          Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel() will cancel the startup sequence, as required by scenario 3.



          The rest of the code would look like this (untested):



              def __init__(self):
          self._users = 0
          self._worker_startup = None

          def _add_user(self):
          self._users += 1

          def _rm_user(self):
          self._users -= 1
          if self._users:
          return
          self._worker_startup_task.cancel()
          self._worker_startup_task = None
          if self._worker_task is not None:
          self._worker_task.cancel()
          self._worker_task = None

          async def stop(self):
          self._rm_user()

          async def _worker(self):
          # actual worker...
          while True:
          await asyncio.sleep(1)





          share|improve this answer























          • Hmmm, cancelled coroutines need to be awaited as well, and the wait_for should asyncio.shield the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
            – spectras
            Nov 11 at 16:44










          • @spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
            – user4815162342
            Nov 11 at 18:27










          • If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
            – spectras
            Nov 11 at 22:54










          • I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
            – spectras
            Nov 11 at 23:00

















          up vote
          2
          down vote



          accepted










          Your requirements sound reasonable. I would try to simplify start by replacing Event with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:



          class Stream:
          async def start(self, *, timeout=DEFAULT_TIMEOUT):
          loop = asyncio.get_event_loop()
          if self._worker_startup_task is None:
          self._worker_startup_task =
          loop.create_task(self._worker_startup())

          self._add_user()
          try:
          await asyncio.shield(asyncio.wait_for(
          self._worker_startup_task, timeout))
          except:
          self._rm_user()
          raise

          async def _worker_startup(self):
          loop = asyncio.get_event_loop()
          await asyncio.sleep(1) # ...
          self._worker_task = loop.create_task(self._worker())


          In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.



          Thus in case of consumer cancellation, await self._worker_startup_task will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.



          Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel() will cancel the startup sequence, as required by scenario 3.



          The rest of the code would look like this (untested):



              def __init__(self):
          self._users = 0
          self._worker_startup = None

          def _add_user(self):
          self._users += 1

          def _rm_user(self):
          self._users -= 1
          if self._users:
          return
          self._worker_startup_task.cancel()
          self._worker_startup_task = None
          if self._worker_task is not None:
          self._worker_task.cancel()
          self._worker_task = None

          async def stop(self):
          self._rm_user()

          async def _worker(self):
          # actual worker...
          while True:
          await asyncio.sleep(1)





          share|improve this answer























          • Hmmm, cancelled coroutines need to be awaited as well, and the wait_for should asyncio.shield the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
            – spectras
            Nov 11 at 16:44










          • @spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
            – user4815162342
            Nov 11 at 18:27










          • If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
            – spectras
            Nov 11 at 22:54










          • I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
            – spectras
            Nov 11 at 23:00















          up vote
          2
          down vote



          accepted







          up vote
          2
          down vote



          accepted






          Your requirements sound reasonable. I would try to simplify start by replacing Event with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:



          class Stream:
          async def start(self, *, timeout=DEFAULT_TIMEOUT):
          loop = asyncio.get_event_loop()
          if self._worker_startup_task is None:
          self._worker_startup_task =
          loop.create_task(self._worker_startup())

          self._add_user()
          try:
          await asyncio.shield(asyncio.wait_for(
          self._worker_startup_task, timeout))
          except:
          self._rm_user()
          raise

          async def _worker_startup(self):
          loop = asyncio.get_event_loop()
          await asyncio.sleep(1) # ...
          self._worker_task = loop.create_task(self._worker())


          In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.



          Thus in case of consumer cancellation, await self._worker_startup_task will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.



          Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel() will cancel the startup sequence, as required by scenario 3.



          The rest of the code would look like this (untested):



              def __init__(self):
          self._users = 0
          self._worker_startup = None

          def _add_user(self):
          self._users += 1

          def _rm_user(self):
          self._users -= 1
          if self._users:
          return
          self._worker_startup_task.cancel()
          self._worker_startup_task = None
          if self._worker_task is not None:
          self._worker_task.cancel()
          self._worker_task = None

          async def stop(self):
          self._rm_user()

          async def _worker(self):
          # actual worker...
          while True:
          await asyncio.sleep(1)





          share|improve this answer














          Your requirements sound reasonable. I would try to simplify start by replacing Event with a future (in this case a task), using it to both wait for the startup to finish and to propagate exceptions that occur during its course, if any. Something like:



          class Stream:
          async def start(self, *, timeout=DEFAULT_TIMEOUT):
          loop = asyncio.get_event_loop()
          if self._worker_startup_task is None:
          self._worker_startup_task =
          loop.create_task(self._worker_startup())

          self._add_user()
          try:
          await asyncio.shield(asyncio.wait_for(
          self._worker_startup_task, timeout))
          except:
          self._rm_user()
          raise

          async def _worker_startup(self):
          loop = asyncio.get_event_loop()
          await asyncio.sleep(1) # ...
          self._worker_task = loop.create_task(self._worker())


          In this code the worker startup is separated from the worker coroutine, and is also moved to a separate task. This separate task can be awaited and removes the need for a dedicated Event, but more importantly, it allows scenarios 1 and 2 to be handled by the same code. Even if someone cancels the first consumer, the worker startup task will not be canceled - the cancellation just means that there is one less consumer waiting for it.



          Thus in case of consumer cancellation, await self._worker_startup_task will work just fine for other consumers, whereas in case of an actual exception in worker startup, all other waiters will see the same exception because the task will have completed.



          Scenario 3 should work automatically because we always cancel the startup that can no longer be observed by a consumer, regardless of the reason. If the consumers are gone because the startup itself has failed, then self._worker_startup_task will have completed (with an exception) and its cancellation will be a no-op. If it is because all consumers have been themselves canceled while awaiting the startup, then self._worker_startup_task.cancel() will cancel the startup sequence, as required by scenario 3.



          The rest of the code would look like this (untested):



              def __init__(self):
          self._users = 0
          self._worker_startup = None

          def _add_user(self):
          self._users += 1

          def _rm_user(self):
          self._users -= 1
          if self._users:
          return
          self._worker_startup_task.cancel()
          self._worker_startup_task = None
          if self._worker_task is not None:
          self._worker_task.cancel()
          self._worker_task = None

          async def stop(self):
          self._rm_user()

          async def _worker(self):
          # actual worker...
          while True:
          await asyncio.sleep(1)






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 11 at 18:26

























          answered Nov 10 at 20:22









          user4815162342

          58.8k488138




          58.8k488138












          • Hmmm, cancelled coroutines need to be awaited as well, and the wait_for should asyncio.shield the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
            – spectras
            Nov 11 at 16:44










          • @spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
            – user4815162342
            Nov 11 at 18:27










          • If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
            – spectras
            Nov 11 at 22:54










          • I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
            – spectras
            Nov 11 at 23:00




















          • Hmmm, cancelled coroutines need to be awaited as well, and the wait_for should asyncio.shield the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
            – spectras
            Nov 11 at 16:44










          • @spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
            – user4815162342
            Nov 11 at 18:27










          • If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
            – spectras
            Nov 11 at 22:54










          • I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
            – spectras
            Nov 11 at 23:00


















          Hmmm, cancelled coroutines need to be awaited as well, and the wait_for should asyncio.shield the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
          – spectras
          Nov 11 at 16:44




          Hmmm, cancelled coroutines need to be awaited as well, and the wait_for should asyncio.shield the task, else cancellation will be propagated. But I like the splitting of add/rm user into separate methods.
          – spectras
          Nov 11 at 16:44












          @spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
          – user4815162342
          Nov 11 at 18:27




          @spectras I've now added the shield, but I'm not sure why cancelled coroutines should be awaited. What is the adverse effect of not awaiting them?
          – user4815162342
          Nov 11 at 18:27












          If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
          – spectras
          Nov 11 at 22:54




          If stop is called as part of the shutdown process, not awaiting them can result in the loop closing before they are done. Also, if cancellation raises an exception, it will go unnoticed. In this specific example, the worker flushes some buffers and gracefully notifies the other end of willing termination upon cancellation.
          – spectras
          Nov 11 at 22:54












          I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
          – spectras
          Nov 11 at 23:00






          I answered my own question using your help, encapsulating the result in a reusable class, then trimming it down until I am happy with it. Thanks for the ideas :)
          – spectras
          Nov 11 at 23:00














          up vote
          1
          down vote













          With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:



          st = SharedTask(test())
          task1 = asyncio.ensure_future(st.wait())
          task2 = asyncio.ensure_future(st.wait(timeout=15))
          task3 = asyncio.ensure_future(st.wait())


          This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test() unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test() and wait for cancellation handling to complete.



          If passed a coroutine, it is only scheduled when first task starts waiting.



          Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).



          import asyncio
          from contextlib import suppress


          class SharedTask:
          __slots__ = ('_clients', '_task')

          def __init__(self, task):
          if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
          raise TypeError('task must be either a Future or a coroutine object')
          self._clients = 0
          self._task = task

          @property
          def started(self):
          return asyncio.isfuture(self._task)

          async def wait(self, *, timeout=None):
          self._task = asyncio.ensure_future(self._task)

          self._clients += 1
          try:
          return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
          except:
          self._clients -= 1
          if self._clients == 0 and not self._task.done():
          self._task.cancel()
          with suppress(asyncio.CancelledError):
          await self._task
          raise

          def cancel(self):
          if asyncio.iscoroutine(self._task):
          self._task.close()
          elif asyncio.isfuture(self._task):
          self._task.cancel()




          The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:



          async def my_task():
          try:
          await do_stuff()
          except asyncio.CancelledError as exc:
          await flush_some_stuff() # might raise an exception
          raise exc


          The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task is wrapped in a SharedTask or not.






          share|improve this answer























          • suppress(asyncio.CancelledError) looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
            – user4815162342
            Nov 12 at 10:04










          • @user4815162342 It is counter-intuitive, but it doesn't make it uncancellable. suppress does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
            – spectras
            Nov 12 at 11:12










          • I get that; the part that worries me is that if a CancelledError doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
            – user4815162342
            Nov 12 at 11:56








          • 1




            This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in SharedTask). Should one want another behavior, use suppress(Exception) instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
            – spectras
            Nov 12 at 12:32








          • 1




            @user4815162342 yes. Well there is a test for self._task.done() that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
            – spectras
            Nov 12 at 13:08















          up vote
          1
          down vote













          With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:



          st = SharedTask(test())
          task1 = asyncio.ensure_future(st.wait())
          task2 = asyncio.ensure_future(st.wait(timeout=15))
          task3 = asyncio.ensure_future(st.wait())


          This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test() unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test() and wait for cancellation handling to complete.



          If passed a coroutine, it is only scheduled when first task starts waiting.



          Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).



          import asyncio
          from contextlib import suppress


          class SharedTask:
          __slots__ = ('_clients', '_task')

          def __init__(self, task):
          if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
          raise TypeError('task must be either a Future or a coroutine object')
          self._clients = 0
          self._task = task

          @property
          def started(self):
          return asyncio.isfuture(self._task)

          async def wait(self, *, timeout=None):
          self._task = asyncio.ensure_future(self._task)

          self._clients += 1
          try:
          return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
          except:
          self._clients -= 1
          if self._clients == 0 and not self._task.done():
          self._task.cancel()
          with suppress(asyncio.CancelledError):
          await self._task
          raise

          def cancel(self):
          if asyncio.iscoroutine(self._task):
          self._task.close()
          elif asyncio.isfuture(self._task):
          self._task.cancel()




          The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:



          async def my_task():
          try:
          await do_stuff()
          except asyncio.CancelledError as exc:
          await flush_some_stuff() # might raise an exception
          raise exc


          The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task is wrapped in a SharedTask or not.






          share|improve this answer























          • suppress(asyncio.CancelledError) looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
            – user4815162342
            Nov 12 at 10:04










          • @user4815162342 It is counter-intuitive, but it doesn't make it uncancellable. suppress does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
            – spectras
            Nov 12 at 11:12










          • I get that; the part that worries me is that if a CancelledError doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
            – user4815162342
            Nov 12 at 11:56








          • 1




            This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in SharedTask). Should one want another behavior, use suppress(Exception) instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
            – spectras
            Nov 12 at 12:32








          • 1




            @user4815162342 yes. Well there is a test for self._task.done() that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
            – spectras
            Nov 12 at 13:08













          up vote
          1
          down vote










          up vote
          1
          down vote









          With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:



          st = SharedTask(test())
          task1 = asyncio.ensure_future(st.wait())
          task2 = asyncio.ensure_future(st.wait(timeout=15))
          task3 = asyncio.ensure_future(st.wait())


          This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test() unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test() and wait for cancellation handling to complete.



          If passed a coroutine, it is only scheduled when first task starts waiting.



          Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).



          import asyncio
          from contextlib import suppress


          class SharedTask:
          __slots__ = ('_clients', '_task')

          def __init__(self, task):
          if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
          raise TypeError('task must be either a Future or a coroutine object')
          self._clients = 0
          self._task = task

          @property
          def started(self):
          return asyncio.isfuture(self._task)

          async def wait(self, *, timeout=None):
          self._task = asyncio.ensure_future(self._task)

          self._clients += 1
          try:
          return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
          except:
          self._clients -= 1
          if self._clients == 0 and not self._task.done():
          self._task.cancel()
          with suppress(asyncio.CancelledError):
          await self._task
          raise

          def cancel(self):
          if asyncio.iscoroutine(self._task):
          self._task.close()
          elif asyncio.isfuture(self._task):
          self._task.cancel()




          The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:



          async def my_task():
          try:
          await do_stuff()
          except asyncio.CancelledError as exc:
          await flush_some_stuff() # might raise an exception
          raise exc


          The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task is wrapped in a SharedTask or not.






          share|improve this answer














          With my previous tests and integrating suggestions from @user4815162342 I came up with a re-usable solution:



          st = SharedTask(test())
          task1 = asyncio.ensure_future(st.wait())
          task2 = asyncio.ensure_future(st.wait(timeout=15))
          task3 = asyncio.ensure_future(st.wait())


          This does the right thing: task2 cancels itself after 15s. Cancelling tasks has no effect on test() unless they all get cancelled. In that case, the last task to get cancelled will manually cancel test() and wait for cancellation handling to complete.



          If passed a coroutine, it is only scheduled when first task starts waiting.



          Lastly, awaiting the shared task after it has completed simply yields its result immediately (seems obvious, but initial version did not).



          import asyncio
          from contextlib import suppress


          class SharedTask:
          __slots__ = ('_clients', '_task')

          def __init__(self, task):
          if not (asyncio.isfuture(task) or asyncio.iscoroutine(task)):
          raise TypeError('task must be either a Future or a coroutine object')
          self._clients = 0
          self._task = task

          @property
          def started(self):
          return asyncio.isfuture(self._task)

          async def wait(self, *, timeout=None):
          self._task = asyncio.ensure_future(self._task)

          self._clients += 1
          try:
          return await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
          except:
          self._clients -= 1
          if self._clients == 0 and not self._task.done():
          self._task.cancel()
          with suppress(asyncio.CancelledError):
          await self._task
          raise

          def cancel(self):
          if asyncio.iscoroutine(self._task):
          self._task.close()
          elif asyncio.isfuture(self._task):
          self._task.cancel()




          The re-raising of task exception cancellation (mentioned in comments) is intentional. It allows this pattern:



          async def my_task():
          try:
          await do_stuff()
          except asyncio.CancelledError as exc:
          await flush_some_stuff() # might raise an exception
          raise exc


          The clients can cancel the shared task and handle an exception that might arise as a result, it will work the same whether my_task is wrapped in a SharedTask or not.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 12 at 12:41

























          answered Nov 11 at 22:59









          spectras

          7,08511434




          7,08511434












          • suppress(asyncio.CancelledError) looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
            – user4815162342
            Nov 12 at 10:04










          • @user4815162342 It is counter-intuitive, but it doesn't make it uncancellable. suppress does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
            – spectras
            Nov 12 at 11:12










          • I get that; the part that worries me is that if a CancelledError doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
            – user4815162342
            Nov 12 at 11:56








          • 1




            This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in SharedTask). Should one want another behavior, use suppress(Exception) instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
            – spectras
            Nov 12 at 12:32








          • 1




            @user4815162342 yes. Well there is a test for self._task.done() that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
            – spectras
            Nov 12 at 13:08


















          • suppress(asyncio.CancelledError) looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
            – user4815162342
            Nov 12 at 10:04










          • @user4815162342 It is counter-intuitive, but it doesn't make it uncancellable. suppress does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
            – spectras
            Nov 12 at 11:12










          • I get that; the part that worries me is that if a CancelledError doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
            – user4815162342
            Nov 12 at 11:56








          • 1




            This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in SharedTask). Should one want another behavior, use suppress(Exception) instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
            – spectras
            Nov 12 at 12:32








          • 1




            @user4815162342 yes. Well there is a test for self._task.done() that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
            – spectras
            Nov 12 at 13:08
















          suppress(asyncio.CancelledError) looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
          – user4815162342
          Nov 12 at 10:04




          suppress(asyncio.CancelledError) looks like it will make the consumer uncancellable while the originally cancelled task is cleaning up. (The cancellation occurring at that time will just be swallowed.)
          – user4815162342
          Nov 12 at 10:04












          @user4815162342 It is counter-intuitive, but it doesn't make it uncancellable. suppress does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
          – spectras
          Nov 12 at 11:12




          @user4815162342 It is counter-intuitive, but it doesn't make it uncancellable. suppress does not prevent the exception from occurring an cancelling the task, it only prevents it from bubbling up.
          – spectras
          Nov 12 at 11:12












          I get that; the part that worries me is that if a CancelledError doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
          – user4815162342
          Nov 12 at 11:56






          I get that; the part that worries me is that if a CancelledError doesn't bubble up from that point, the cancelled task will have some other exception as the result. Maybe it's not a problem in practice, but to me it's not obvious that that won't interfere with the cancellation mechanism. If I suppressed CancelledError, I'd be careful to re-raise it after doing whatever it is I need to do to handle it.
          – user4815162342
          Nov 12 at 11:56






          1




          1




          This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in SharedTask). Should one want another behavior, use suppress(Exception) instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
          – spectras
          Nov 12 at 12:32






          This is by design: if [shared task cancellation as a result of the last client being cancelled] raises an exception, then that last client gets the exception (the same it would have gotten if task was not wrapped in SharedTask). Should one want another behavior, use suppress(Exception) instead (and add some logging mechanism, one definitely does not want exceptions to go unnoticed).
          – spectras
          Nov 12 at 12:32






          1




          1




          @user4815162342 yes. Well there is a test for self._task.done() that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
          – spectras
          Nov 12 at 13:08




          @user4815162342 yes. Well there is a test for self._task.done() that prevents the third possibility, but the other two remain. Alas, as one can only raise one exception, a choice had to be made. I guess YMMV as to which is most relevant. It is one of those times exceptions show their limitations as an error handling mechanism.
          – spectras
          Nov 12 at 13:08


















           

          draft saved


          draft discarded



















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53210696%2fsharing-a-dynamically-started-worker-among-several-consumers%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Florida Star v. B. J. F.

          Danny Elfman

          Lugert, Oklahoma