Dispose inner subscription of merge












1















!!warning: Rx newbie!!



We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)



 public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;

private static double _throttleFrequency = 1000;

public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}

private static IDisposable _subscriber;

private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}

public static void NotifyClient(PriceTick tick)
{
//Do some action
}

}


The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below



IObservable<PriceTick> feed1;

FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further


This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?










share|improve this question



























    1















    !!warning: Rx newbie!!



    We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)



     public static class FeedHandler
    {
    private static IObservable<PriceTick> _combinedPriceFeed = null;

    private static double _throttleFrequency = 1000;

    public static void AddToCombinedFeed(IObservable<PriceTick> feed)
    {
    _combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
    AddFeed(_combinedPriceFeed);
    }

    private static IDisposable _subscriber;

    private static void AddFeed(IObservable<PriceTick> feed)
    {
    _subscriber?.Dispose();
    _subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
    }

    public static void NotifyClient(PriceTick tick)
    {
    //Do some action
    }

    }


    The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below



    IObservable<PriceTick> feed1;

    FeedHandler.AddToCombinedFeed(feed1);//1 stream
    FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further


    This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?










    share|improve this question

























      1












      1








      1








      !!warning: Rx newbie!!



      We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)



       public static class FeedHandler
      {
      private static IObservable<PriceTick> _combinedPriceFeed = null;

      private static double _throttleFrequency = 1000;

      public static void AddToCombinedFeed(IObservable<PriceTick> feed)
      {
      _combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
      AddFeed(_combinedPriceFeed);
      }

      private static IDisposable _subscriber;

      private static void AddFeed(IObservable<PriceTick> feed)
      {
      _subscriber?.Dispose();
      _subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
      }

      public static void NotifyClient(PriceTick tick)
      {
      //Do some action
      }

      }


      The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below



      IObservable<PriceTick> feed1;

      FeedHandler.AddToCombinedFeed(feed1);//1 stream
      FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further


      This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?










      share|improve this question














      !!warning: Rx newbie!!



      We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)



       public static class FeedHandler
      {
      private static IObservable<PriceTick> _combinedPriceFeed = null;

      private static double _throttleFrequency = 1000;

      public static void AddToCombinedFeed(IObservable<PriceTick> feed)
      {
      _combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
      AddFeed(_combinedPriceFeed);
      }

      private static IDisposable _subscriber;

      private static void AddFeed(IObservable<PriceTick> feed)
      {
      _subscriber?.Dispose();
      _subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
      }

      public static void NotifyClient(PriceTick tick)
      {
      //Do some action
      }

      }


      The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below



      IObservable<PriceTick> feed1;

      FeedHandler.AddToCombinedFeed(feed1);//1 stream
      FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further


      This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?







      c# reactive-programming system.reactive






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 14 '18 at 14:59









      JimmyJimmy

      1,71221937




      1,71221937
























          1 Answer
          1






          active

          oldest

          votes


















          3















          1. Eradicate the need to resubscribe by applying Switch() technique.
            Your _combinedPriceFeed just switches to the next observable that
            will be supplied by _combinePriceFeedChange.

          2. Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.

          3. You should get the logic of an corresponding remove method.

          4. If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.


          Code:



          public class FeedHandler
          {
          private readonly IDisposable _subscriber;
          private readonly IObservable<PriceTick> _combinedPriceFeed;
          private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
          private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
          private readonly double _throttleFrequency = 1000;

          public FeedHandler()
          {
          _combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
          _subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
          }

          public void AddFeed(IObservable<PriceTick> feed)
          {
          _feeds.Add(feed);
          _combinedPriceFeedChange.OnNext(_feeds.Merge());
          }


          public void NotifyClient(PriceTick tick)
          {
          //Do some action
          }
          }





          share|improve this answer


























          • Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?

            – Jimmy
            Nov 16 '18 at 9:19











          • That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.

            – Felix Keil
            Nov 16 '18 at 10:53













          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53303099%2fdispose-inner-subscription-of-merge%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          3















          1. Eradicate the need to resubscribe by applying Switch() technique.
            Your _combinedPriceFeed just switches to the next observable that
            will be supplied by _combinePriceFeedChange.

          2. Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.

          3. You should get the logic of an corresponding remove method.

          4. If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.


          Code:



          public class FeedHandler
          {
          private readonly IDisposable _subscriber;
          private readonly IObservable<PriceTick> _combinedPriceFeed;
          private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
          private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
          private readonly double _throttleFrequency = 1000;

          public FeedHandler()
          {
          _combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
          _subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
          }

          public void AddFeed(IObservable<PriceTick> feed)
          {
          _feeds.Add(feed);
          _combinedPriceFeedChange.OnNext(_feeds.Merge());
          }


          public void NotifyClient(PriceTick tick)
          {
          //Do some action
          }
          }





          share|improve this answer


























          • Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?

            – Jimmy
            Nov 16 '18 at 9:19











          • That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.

            – Felix Keil
            Nov 16 '18 at 10:53


















          3















          1. Eradicate the need to resubscribe by applying Switch() technique.
            Your _combinedPriceFeed just switches to the next observable that
            will be supplied by _combinePriceFeedChange.

          2. Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.

          3. You should get the logic of an corresponding remove method.

          4. If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.


          Code:



          public class FeedHandler
          {
          private readonly IDisposable _subscriber;
          private readonly IObservable<PriceTick> _combinedPriceFeed;
          private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
          private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
          private readonly double _throttleFrequency = 1000;

          public FeedHandler()
          {
          _combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
          _subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
          }

          public void AddFeed(IObservable<PriceTick> feed)
          {
          _feeds.Add(feed);
          _combinedPriceFeedChange.OnNext(_feeds.Merge());
          }


          public void NotifyClient(PriceTick tick)
          {
          //Do some action
          }
          }





          share|improve this answer


























          • Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?

            – Jimmy
            Nov 16 '18 at 9:19











          • That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.

            – Felix Keil
            Nov 16 '18 at 10:53
















          3












          3








          3








          1. Eradicate the need to resubscribe by applying Switch() technique.
            Your _combinedPriceFeed just switches to the next observable that
            will be supplied by _combinePriceFeedChange.

          2. Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.

          3. You should get the logic of an corresponding remove method.

          4. If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.


          Code:



          public class FeedHandler
          {
          private readonly IDisposable _subscriber;
          private readonly IObservable<PriceTick> _combinedPriceFeed;
          private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
          private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
          private readonly double _throttleFrequency = 1000;

          public FeedHandler()
          {
          _combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
          _subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
          }

          public void AddFeed(IObservable<PriceTick> feed)
          {
          _feeds.Add(feed);
          _combinedPriceFeedChange.OnNext(_feeds.Merge());
          }


          public void NotifyClient(PriceTick tick)
          {
          //Do some action
          }
          }





          share|improve this answer
















          1. Eradicate the need to resubscribe by applying Switch() technique.
            Your _combinedPriceFeed just switches to the next observable that
            will be supplied by _combinePriceFeedChange.

          2. Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.

          3. You should get the logic of an corresponding remove method.

          4. If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.


          Code:



          public class FeedHandler
          {
          private readonly IDisposable _subscriber;
          private readonly IObservable<PriceTick> _combinedPriceFeed;
          private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
          private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
          private readonly double _throttleFrequency = 1000;

          public FeedHandler()
          {
          _combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
          _subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
          }

          public void AddFeed(IObservable<PriceTick> feed)
          {
          _feeds.Add(feed);
          _combinedPriceFeedChange.OnNext(_feeds.Merge());
          }


          public void NotifyClient(PriceTick tick)
          {
          //Do some action
          }
          }






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 15 '18 at 13:44

























          answered Nov 15 '18 at 11:27









          Felix KeilFelix Keil

          1,4741318




          1,4741318













          • Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?

            – Jimmy
            Nov 16 '18 at 9:19











          • That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.

            – Felix Keil
            Nov 16 '18 at 10:53





















          • Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?

            – Jimmy
            Nov 16 '18 at 9:19











          • That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.

            – Felix Keil
            Nov 16 '18 at 10:53



















          Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?

          – Jimmy
          Nov 16 '18 at 9:19





          Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?

          – Jimmy
          Nov 16 '18 at 9:19













          That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.

          – Felix Keil
          Nov 16 '18 at 10:53







          That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.

          – Felix Keil
          Nov 16 '18 at 10:53






















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53303099%2fdispose-inner-subscription-of-merge%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Florida Star v. B. J. F.

          Danny Elfman

          Lugert, Oklahoma