Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream transform not respecting isBroadcast parameter #36965

Closed
frankpepermans opened this issue May 14, 2019 · 9 comments
Closed

Stream transform not respecting isBroadcast parameter #36965

frankpepermans opened this issue May 14, 2019 · 9 comments
Assignees
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries.

Comments

@frankpepermans
Copy link

frankpepermans commented May 14, 2019

(dart 2.2.0)

Below is a test which transforms a broadcast Stream using 2 different methods, one using StreamTransformer.fromHandlers, the other using an extended StreamTransformer class:

      // dummy transformer, using custom class
      final transformer = TestStreamTransformer<int>();
      // dummy transformers, using fromHandlers
      final fromHandlers = StreamTransformer<int, int>.fromHandlers();
      // base broadcast Stream
      final broadcastStream =
          Stream.fromIterable(const [1]).asBroadcastStream();

      final transformedStreamA = broadcastStream.transform(fromHandlers);
      final transformedStreamB = broadcastStream.transform(transformer);

      expect(broadcastStream.isBroadcast, isTrue); // passes
      expect(transformedStreamA.isBroadcast, isTrue); // passes
      expect(transformedStreamB.isBroadcast, isTrue); // fails



class TestStreamTransformer<T> extends StreamTransformerBase<T, T> {
  final StreamTransformer<T, T> transformer;

  TestStreamTransformer() : transformer = _buildTransformer();

  @override
  Stream<T> bind(Stream<T> stream) => transformer.bind(stream);

  static StreamTransformer<T, T> _buildTransformer<T>() {
    return StreamTransformer<T, T>((Stream<T> input, bool cancelOnError) {
      StreamController<T> controller;

      if (input.isBroadcast) {
        controller = StreamController<T>.broadcast(sync: true, onListen: () {});
      } else {
        controller = StreamController<T>(sync: true, onListen: () {});
      }

      return controller.stream.listen(null);
    });
  }
}

I would expect transform to always return a new Stream which is the same as the original Stream regarding broadcast, or not.

@frankpepermans
Copy link
Author

The problem is with the bind Function call, when using fromHandlers, bind returns a _BoundSinkStream
https://github.com/dart-lang/sdk/blob/master/sdk/lib/async/stream_transformers.dart#L177
where isBroadcast is properly handled,
however when extending StreamTransformer, bind will return a _BoundSubscriptionStream where this is not the case:
https://github.com/dart-lang/sdk/blob/master/sdk/lib/async/stream_transformers.dart#L323

@lrhn lrhn self-assigned this May 14, 2019
@mit-mit mit-mit added the area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries. label May 14, 2019
@lrhn
Copy link
Member

lrhn commented May 15, 2019

Correct, the _BoundSubscriptionStream should probably emit the same isBroadcast result as the wrapped stream. I'll fix that.

This doesn't change whether you can listen more than once or not. In practice, the onListen callback is called for each listen attempt. If that function starts out by calling stream.listen on the original stream, then you can listen multiple times when the original stream allows it.
The Boolean isBroadcast doesn't actually do anything, it's just there as a hint that the stream has a particular behavior.

@frankpepermans
Copy link
Author

Correct at it being just a hint, except at our rxdart lib, we sometimes treat incoming Streams differently based off this parameter, causing unexpected behavior.

@frankpepermans
Copy link
Author

Any updates regarding this issue?

This is causing some annoying side effects with RxDart currently

@lrhn
Copy link
Member

lrhn commented Aug 3, 2019

The _BoundSubscriptionStream class has been updated (82c8c78) so it returns the isBroadcast of the wrapped stream.

There are no other changes planned.

@shinayser
Copy link

The _BoundSubscriptionStream class has been updated (82c8c78) so it returns the isBroadcast of the wrapped stream.

There are no other changes planned.

Sorry if this is a dumb question, buf if it has been fixed 3 months ago (may/19) why it still causing problems on current stream transformers?

@shinayser
Copy link

The _BoundSubscriptionStream class has been updated (82c8c78) so it returns the isBroadcast of the wrapped stream.

There are no other changes planned.

@lrhn do you know when this fix is being released?

@rullyalves
Copy link

I'm also having problems with this, I performed a transformation and then an external API request in a stream, and the request was redone for each stream listener.

@lrhn lrhn closed this as completed Oct 2, 2020
@lrhn
Copy link
Member

lrhn commented Oct 2, 2020

It should definitely have been released by now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-core-library SDK core library issues (core, async, ...); use area-vm or area-web for platform specific libraries.
Projects
None yet
Development

No branches or pull requests

5 participants