Dispose inner subscription of merge
!!warning: Rx newbie!!
We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)
public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;
private static double _throttleFrequency = 1000;
public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}
private static IDisposable _subscriber;
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}
public static void NotifyClient(PriceTick tick)
{
//Do some action
}
}
The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below
IObservable<PriceTick> feed1;
FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further
This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?
c# reactive-programming system.reactive
add a comment |
!!warning: Rx newbie!!
We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)
public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;
private static double _throttleFrequency = 1000;
public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}
private static IDisposable _subscriber;
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}
public static void NotifyClient(PriceTick tick)
{
//Do some action
}
}
The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below
IObservable<PriceTick> feed1;
FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further
This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?
c# reactive-programming system.reactive
add a comment |
!!warning: Rx newbie!!
We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)
public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;
private static double _throttleFrequency = 1000;
public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}
private static IDisposable _subscriber;
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}
public static void NotifyClient(PriceTick tick)
{
//Do some action
}
}
The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below
IObservable<PriceTick> feed1;
FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further
This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?
c# reactive-programming system.reactive
!!warning: Rx newbie!!
We have multiple price feeds. The requirement is to subscribe to all these feeds and only output the latest tick every 1 sec(throttle)
public static class FeedHandler
{
private static IObservable<PriceTick> _combinedPriceFeed = null;
private static double _throttleFrequency = 1000;
public static void AddToCombinedFeed(IObservable<PriceTick> feed)
{
_combinedPriceFeed = _combinedPriceFeed != null ? _combinedPriceFeed.Merge(feed) : feed;
AddFeed(_combinedPriceFeed);
}
private static IDisposable _subscriber;
private static void AddFeed(IObservable<PriceTick> feed)
{
_subscriber?.Dispose();
_subscriber = feed.Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).Subscribe(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()).ToObservable().Subscribe(NotifyClient));
}
public static void NotifyClient(PriceTick tick)
{
//Do some action
}
}
The code have multiple issues. If I call AddToCombinedFeed with the same feed multiple times, the streams will get duplicated to start with. Eg. below
IObservable<PriceTick> feed1;
FeedHandler.AddToCombinedFeed(feed1);//1 stream
FeedHandler.AddToCombinedFeed(feed1);//2 streams(even though the groupby and first() functions will prevent this effect to propagate further
This brings me to the question. If I want to remove one price stream from the merged stream, how can I do that?
c# reactive-programming system.reactive
c# reactive-programming system.reactive
asked Nov 14 '18 at 14:59
JimmyJimmy
1,71221937
1,71221937
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
- Eradicate the need to resubscribe by applying Switch() technique.
Your _combinedPriceFeed just switches to the next observable that
will be supplied by _combinePriceFeedChange. - Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.
- You should get the logic of an corresponding remove method.
- If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.
Code:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly IObservable<PriceTick> _combinedPriceFeed;
private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
_combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed)
{
_feeds.Add(feed);
_combinedPriceFeedChange.OnNext(_feeds.Merge());
}
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?
– Jimmy
Nov 16 '18 at 9:19
That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.
– Felix Keil
Nov 16 '18 at 10:53
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53303099%2fdispose-inner-subscription-of-merge%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
- Eradicate the need to resubscribe by applying Switch() technique.
Your _combinedPriceFeed just switches to the next observable that
will be supplied by _combinePriceFeedChange. - Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.
- You should get the logic of an corresponding remove method.
- If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.
Code:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly IObservable<PriceTick> _combinedPriceFeed;
private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
_combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed)
{
_feeds.Add(feed);
_combinedPriceFeedChange.OnNext(_feeds.Merge());
}
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?
– Jimmy
Nov 16 '18 at 9:19
That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.
– Felix Keil
Nov 16 '18 at 10:53
add a comment |
- Eradicate the need to resubscribe by applying Switch() technique.
Your _combinedPriceFeed just switches to the next observable that
will be supplied by _combinePriceFeedChange. - Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.
- You should get the logic of an corresponding remove method.
- If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.
Code:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly IObservable<PriceTick> _combinedPriceFeed;
private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
_combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed)
{
_feeds.Add(feed);
_combinedPriceFeedChange.OnNext(_feeds.Merge());
}
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?
– Jimmy
Nov 16 '18 at 9:19
That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.
– Felix Keil
Nov 16 '18 at 10:53
add a comment |
- Eradicate the need to resubscribe by applying Switch() technique.
Your _combinedPriceFeed just switches to the next observable that
will be supplied by _combinePriceFeedChange. - Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.
- You should get the logic of an corresponding remove method.
- If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.
Code:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly IObservable<PriceTick> _combinedPriceFeed;
private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
_combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed)
{
_feeds.Add(feed);
_combinedPriceFeedChange.OnNext(_feeds.Merge());
}
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
- Eradicate the need to resubscribe by applying Switch() technique.
Your _combinedPriceFeed just switches to the next observable that
will be supplied by _combinePriceFeedChange. - Keep a list to manage your multiple feeds. Create the new observable whenever the list changes and provide it via _combinePriceFeedChange.
- You should get the logic of an corresponding remove method.
- If you want a more elegant solution and want to get rid of the subject I think this is possible with Dynamic-Data (MIT-License) from RolandPheasant with Nuget. I never used it before. But I would like to hear from you if you came to an even more elegant solution with this maybe.
Code:
public class FeedHandler
{
private readonly IDisposable _subscriber;
private readonly IObservable<PriceTick> _combinedPriceFeed;
private readonly List<IObservable<PriceTick>> _feeds = new List<IObservable<PriceTick>>();
private readonly BehaviorSubject<IObservable<PriceTick>> _combinedPriceFeedChange = new BehaviorSubject<IObservable<PriceTick>>(Observable.Never<PriceTick>());
private readonly double _throttleFrequency = 1000;
public FeedHandler()
{
_combinedPriceFeed = _combinedPriceFeedChange.Switch().Buffer(TimeSpan.FromMilliseconds(_throttleFrequency)).SelectMany(buffer => buffer.GroupBy(x => x.InstrumentId, (key, result) => result.First()));
_subscriber = _combinedPriceFeed.Subscribe(NotifyClient);
}
public void AddFeed(IObservable<PriceTick> feed)
{
_feeds.Add(feed);
_combinedPriceFeedChange.OnNext(_feeds.Merge());
}
public void NotifyClient(PriceTick tick)
{
//Do some action
}
}
edited Nov 15 '18 at 13:44
answered Nov 15 '18 at 11:27
Felix KeilFelix Keil
1,4741318
1,4741318
Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?
– Jimmy
Nov 16 '18 at 9:19
That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.
– Felix Keil
Nov 16 '18 at 10:53
add a comment |
Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?
– Jimmy
Nov 16 '18 at 9:19
That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.
– Felix Keil
Nov 16 '18 at 10:53
Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?
– Jimmy
Nov 16 '18 at 9:19
Is there any practical advantage to prefer switch over re-subscription? Isn't switch doing the same thing behind the scenes?
– Jimmy
Nov 16 '18 at 9:19
That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.
– Felix Keil
Nov 16 '18 at 10:53
That a resubscription is not needed is exactly the advantage. Its a declarative way to handle the disposal/resubscription. The _combinedPriceFeed-Observable is always valid. In my code most of the time the IObservable is a public property or it is generated by a method and I don't want to break existing subscriptions from the providing side other than Error/Completion.
– Felix Keil
Nov 16 '18 at 10:53
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53303099%2fdispose-inner-subscription-of-merge%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown