Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alternative since StreamController.broadcast({sync:true}) is not reentrant #22240

Open
Hixie opened this issue Feb 2, 2015 · 14 comments
Open
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. core-2 library-async P2 A bug or feature request we're likely to work on type-enhancement A request for a change that isn't a bug

Comments

@Hixie
Copy link
Contributor

Hixie commented Feb 2, 2015

// TEST CODE:

import 'dart:async';

void main() {
  var controller = new StreamController.broadcast(sync:true);
  var stream = controller.stream;
  stream.listen((data) {
    print("$data");
    if (data == 'test4')
      controller.add('test5');
  });
  stream.listen((data) { print("(second listener: $data)"); });
  print("test1");
  controller.add('test2');
  print("test3");
  controller.add('test4');
  print("test6");
  controller.add('test7');
  print("test8");
}
// EXPECTED RESULTS:
test1
test2
(second listener: test2)
test3
test4
test5
(second listener: test5)
(second listener: test4)
test6
test7
(second listener: test7)
test8

// ACTUAL RESULTS
test1
test2
(second listener: test2)
test3
test4
(second listener: test4)
test6
test7
(second listener: test7)
test8
Unhandled exception:
Uncaught Error: Bad state: Cannot fire new event. Controller is already firing an event
Stack Trace:
# ­0      _BroadcastStreamController._forEachListener (dart:async/broadcast_stream_controller.dart:293)
# ­1      _SyncBroadcastStreamController._sendData (dart:async/broadcast_stream_controller.dart:358)
# ­2      _BroadcastStreamController.add (dart:async/broadcast_stream_controller.dart:237)
# ­3      main.<anonymous closure> (file:///usr/local/google/home/ianh/dev/scratch/test.dart:9:21)
# ­4      _RootZone.runUnaryGuarded (dart:async/zone.dart:1093)
# ­5      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:341)
# ­6      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:270)
# ­7      _SyncBroadcastStreamController._sendData.<anonymous closure> (dart:async/broadcast_stream_controller.dart:359)
# ­8      _BroadcastStreamController._forEachListener (dart:async/broadcast_stream_controller.dart:312)
# ­9      _SyncBroadcastStreamController._sendData (dart:async/broadcast_stream_controller.dart:358)
# ­10     _BroadcastStreamController.add (dart:async/broadcast_stream_controller.dart:237)
# ­11     main (file:///usr/local/google/home/ianh/dev/scratch/test.dart:15:17)
# ­12     _startIsolate (dart:isolate-patch/isolate_patch.dart:239)
# ­13     _startMainIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:192)
# ­14     _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:130)
# ­0      _rootHandleUncaughtError.<anonymous closure> (dart:async/zone.dart:886)
# ­1      _asyncRunCallbackLoop (dart:async/schedule_microtask.dart:41)
# ­2      _asyncRunCallback (dart:async/schedule_microtask.dart:48)
# ­3      _runPendingImmediateCallback (dart:isolate-patch/isolate_patch.dart:84)
# ­4      _startIsolate (dart:isolate-patch/isolate_patch.dart:244)
# ­5      _startMainIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:192)
# ­6      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:130)
@Hixie
Copy link
Contributor Author

Hixie commented Feb 2, 2015

If you comment out the second listener line, the result is instead out of order callbacks, with no exception:

test1
test2
test3
test4
test6
test8
test5
test7

@floitschG
Copy link
Contributor

cc @lrhn.
Added Area-Library, Library-Async, Triaged labels.

@sethladd
Copy link
Contributor

sethladd commented Feb 2, 2015

Removed Priority-Unassigned label.
Added Priority-Medium label.

@nex3
Copy link
Member

nex3 commented Feb 2, 2015

According to floitsch and lrn in issue #21076, it's invalid to use synchronous streams/futures for anything other than forwarding already-asynchronous events.

I would also prefer that this be allowed, but it sounds like it's not in the cards.

@Hixie
Copy link
Contributor Author

Hixie commented Feb 2, 2015

Why not?

@nex3
Copy link
Member

nex3 commented Feb 2, 2015

You'd have to ask them, I'm afraid.

@Hixie
Copy link
Contributor Author

Hixie commented Feb 2, 2015

That's what I was doing. :-)

@lrhn
Copy link
Member

lrhn commented Feb 3, 2015

Synchronous stream controllers are not reentrant - you may not call add, addError or close in the callback of another event. I admit that this isn't clearly documented, and I'll update the documentation.

The reason is simple: It is not possible to both satisfy the requirement that events are delivered in the correct order, and that they are delivered synchronously (before the "add" call returns), if you can fire new events during an event callback.

Example: A controller has two listeners, A and B. An event is added, and immediately it is delivered to first A and then B. However, in A's event callback it now adds another event. The new event should be delivered immediately to both A and B. If it is delivered synchronously, then B receives the second event before the first. If not, well, then it's not delivered synchronously.

So, to prevent incorrect event ordering, adding an event during another event's dispatch is an error, and the controller throws if you try. It doesn't throw if there is only one listener, and that's probably a bug (or really: a misguided attempt to be lenient, since it can't get out-of-order delivery with only one listener, but it's still bad and breaks when you get a second listener). Why it gets the "test5" event after printing the "test8" is really weird, and I'll investigate.

We could probably do some workaround where the first event for B is delivered first, during the second call to "add" (effectively firing all pending events on any event being added, before firing the new one), but that very quickly gets very complicated (it also has to handle subscriptions being added and only later events being delivered to them, and listeners being cancelled).

Since the purpose of a synchronous stream controller is only to allow a slightly more efficient way to implement event forwarding, it is only intended for adding events as the last action of handling another event. Adding a complicated event buffering system in order to make it reentrant was not considered worth the effort, and would just add overhead to what is intended as a pure performance optimization (and used very carefully).

The implementation of a synchronous stream controller is actually pretty simple, much simpler than the normal controller, because it doesn't have to do event buffering.

If I were to design the library now, I would probably make the synchronous controller a separate class, so that it could be documented independently. A simple "sync" parameter doesn't really show how big a difference there is between normal and synchronous controllers.

In your case, I recommend just using a normal controller. It is never an error to use a normal controller, but using a synchronous controller incorrectly can lead to a stream implementation that breaks the Stream contract, which may cause some listeners to crash an burn. You have to be very careful when you add events to a synchronous stream. For example, never add events in the onListen callback!

The error is reported late because it happens in an event callback, and because you are using a sync controller in a non-tail position. Each event callback assumes that it runs from top-level event loop or microtask loop. Any uncaught errors in the callback are reported as uncaught at top leve. When calling a callback with a sync controller, it pretends that the synchronous call to the callback happens at top level, and the uncaught error thrown by calling "add" reentrantly is caught, and scheduled for the next microtask. Then the "add" returns, and because it keeps doing stuff synchronously, you get more prints before the error is reported. That's one of the reasons why you should only add events to a sync controller as the very last action of another event, because what comes after that "add" will be executed before any errors in the callbacks are reported.


Added Accepted label.

@Hixie
Copy link
Contributor Author

Hixie commented Feb 3, 2015

The semantic I'm looking for is the following:

• When an event is dispatched, all the handlers are called before the dispatch returns
• When an event is dispatched while a handler is processing the event, the entire dispatch mechanism happens nested inside the other one (so secondary handlers might get the new event before the "earlier" event)
• I want listeners to be able to use all the Stream methods to filter these events, without affecting the order of delivery
• I want minimal overhead (little to no malloc()ing for dispatch, no buffering, O(1) subscribing and unsubscribing, etc)
• Events fired with no listener present should be silently dropped — no difference in semantics based on the number of listeners, adding a no-op listener should itself be a no-op.
• The listeners should be called synchronously, in a defined order (e.g. the order in which they were added, though the precise order isn't as important as there being a defined order)

Are these requirements requirements that can be met by the Stream API in some way? It would be unfortunate to have to make a new API, given how close the Stream API appears at first glance to be to these semantics.

@lrhn
Copy link
Member

lrhn commented Feb 5, 2015

That semantics is definitely not going to work with streams.
If anything, it could be some class that has a common superclass with Stream, so both inherit the API that you want, but streams must deliver events in order, and must be asynchronous (act as if each callback is a top-level event).

@Hixie
Copy link
Contributor Author

Hixie commented Feb 5, 2015

Florian suggested a "Dispatcher" API that had similar methods as Stream but with the semantics from comment 9, which would solve this problem for me perfectly. (Should we file a bug for that?)

There should probably also be a bug for the difference in behaviour between one listener and two (see comment 1 vs the original comment here).

@lrhn
Copy link
Member

lrhn commented Mar 5, 2015

The one/two listener discrepancy has been fixed. It now throws all the time. I fixed a bug in take/skip at the same time (they are not widely used on broadcast streams for good reason).

For the "dispatcher" (do not like name!) class it should be possible to do something that's efficient and simple.
It gets more difficult to be efficient when you have to handle callbacks throwing (try/catch isn't free, especially since it causes the compilers to be inefficient when used).

@lrhn
Copy link
Member

lrhn commented Mar 18, 2015

Change to be an enhancement request. The Stream behavior is not going to change.


Removed Type-Defect label.
Added Type-Enhancement label.
Changed the title to: "Add alternative since StreamController.broadcast({sync:true}) is not reentrant".

@Hixie Hixie added Type-Enhancement area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. library-async labels Mar 18, 2015
@kevmoo kevmoo added P2 A bug or feature request we're likely to work on type-enhancement A request for a change that isn't a bug and removed accepted labels Feb 29, 2016
@floitschG floitschG added core-2 and removed core-m labels Sep 11, 2017
@TobiasHeidingsfeld
Copy link

Is this implemented or will it ever be?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. core-2 library-async P2 A bug or feature request we're likely to work on type-enhancement A request for a change that isn't a bug
Projects
None yet
Development

No branches or pull requests

7 participants