Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to disable RxJavaAssemblyTracking in specific observable #104

Open
snorkel123 opened this issue Feb 9, 2021 · 9 comments
Open

Comments

@snorkel123
Copy link

To make RxJavaAssemblyTracking work everywhere except high-load places.

Smth like

Observable.
   ...
   .compose( removeHookHere() )
@akarnokd
Copy link
Owner

akarnokd commented Feb 9, 2021

I can't think of any good way of doing this. If you can make sure you create chains on a single thread, you can turn the tracking on and off.

@snorkel123
Copy link
Author

snorkel123 commented Feb 9, 2021

Correct me if I am wrong. Some rough approach.

RxJavaAssemblyTracking.enable() works via setOnObservableAssembly.

We can create custom rx operator that adds current Observable's hashCode to designated list.
If setOnObservableAssembly listener encounters Observable that is in that list, it unwraps ObservableOnAssembly and replaces hashCode with new one, so that next time it encounters this object, it does not wrap it into ObservableOnAssembly.

@akarnokd
Copy link
Owner

How would you remove from that list?

You could experiment with it by taking the hooks from RxJavaPlugins after enabling the tracking and replacing them with whatever scheme you like:

RxJavaAssemblyTracking.enable();

var set = new ConcurrentHashMap<Integer, Object>();
var observableHook = RxJavaPlugins.getOnObservableAssembly();

RxJavaPlugins.setOnObservableAssembly(o -> {
    if (set.containsKey(o.hashCode()) {
       return observableHook.apply(o);
    }
   return o;
});

@snorkel123
Copy link
Author

snorkel123 commented Feb 10, 2021

Thanks fot your snippet! I will depart from it.

How would you remove from that list?

When onSubscribe called, one can remove hashCode from list.

@snorkel123
Copy link
Author

snorkel123 commented Feb 12, 2021

This is what I came up with. I rely on observable's source.

Current blockers in RxJava3 ver 3.0.10:

  • compose does not report source, so chain is broken.
  • scalar flatMap does not store source (non-scalar does). See Observable::9123

I am glad for suggestions on where it may fail as well as general comments.

Working snippet:

Observable
                .just(1)
                .compose(AssemblyStop.stopAssemblyTrackingDownstream())
                // .flatMap (__ -> Observable.just(2) ) //<--- Breaks chain
                // .compose (__ -> Observable.just(1) ) //<--- Breaks chain
                .subscribeOn(Schedulers.io())
                .map (it -> it + 1)
                .observeOn(Schedulers.computation())
                .subscribe();

Implementation:

import java.util.HashSet;
import java.util.Set;

import hu.akarnokd.rxjava3.debug.RxJavaAssemblyTracking;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamObservableSource;
import io.reactivex.rxjava3.internal.observers.LambdaObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

// Java 8
// Uses two Rx hooks: setOnObservableAssembly and setOnObservableSubscribe
// Note: switching to different observable (compose, flatMap) will yield IllegalStateException in subscribe
public class AssemblyStop {

    private static final Set<String> registry = new HashSet<>();

    static <T> ObservableTransformer<T, T> stopAssemblyTrackingDownstream() {
        return observable -> {
            registry.add(observable.toString());
            return observable;
        };
    }

    @SuppressWarnings("rawtypes")
    static void init() {
        RxJavaAssemblyTracking.enable();

        Function observableHook = RxJavaPlugins.getOnObservableAssembly();

        RxJavaPlugins.setOnObservableAssembly(o -> {
            if(registry.contains(o.toString())) return o; // This is result of our compose operator

            if(!(o instanceof HasUpstreamObservableSource)) {
                // Operator w/o source like Just or Zip with multiple sources
                return (Observable) observableHook.apply(o);
            }

            Object source = ((HasUpstreamObservableSource) o).source();
            String sourceHash = source.toString();

            if(registry.contains(sourceHash)) {
                // Request to stop assembly tracking
                // We continue to track this object
                registry.remove(sourceHash);
                registry.add(o.toString());
                return o;
            } else return (Observable) observableHook.apply(o);

        });

        RxJavaPlugins.setOnObservableSubscribe( (observable, observer) -> {
            if(observer instanceof LambdaObserver) {
                // Check if our chain was broken by some compose
                String hash = observable.toString();
                if(!registry.contains(hash)) throw new IllegalStateException("AssemblyStop chain was broken. Did you switch observable (compose, flatMap) ?");

                registry.remove(observable.toString());
            }

            return observer;
        });
    }
}

@akarnokd
Copy link
Owner

compose

You have disconnected output, there is no way to link it to the original inside the operator.

scalar flatMap

You may have to prevent optimizations by using hide() on sources.

In general, what you try to achieve is highly contextual and as such would need a different architecture for RxJava.

Alternative would be an RxJava-backed custom factory for operators so you control the application of operators and thus can attach extra context to flow. Basically as if you'd write a MyObservable with mirrored operators.

@snorkel123
Copy link
Author

snorkel123 commented Feb 12, 2021

Thanks for thorough comment!

You have disconnected output, there is no way to link it to the original inside the operator.

Yeah, switching observables! What do you think about adding extra hook inside compose? Does not seem intrusive, looks like RxJavaPlugins.onAssembly.

You may have to prevent optimizations by using hide() on sources.

Thanks, I will research on hide.
As of scalar flat map, if you apply it from ScalarSupplier, it returns scalarXMap w/o source. Otherwise it uses ObservableFlatMap with source (Observable::9117-9125).
It can be fixed by adding extra source field to scalarXMap, also not something intrusive.

if (this instanceof ScalarSupplier) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarSupplier<T>)this).get();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize));

In general, what you try to achieve is highly contextual and as such would need a different architecture for RxJava.

I was inspired by following comment in RxJava issues. Quote: "RxJavaAssemblyTracking ... comes with performance drawbacks. It's a global on/off switch. ..."
I also faced need to stop RxJavaAssemblyTracking at too long or highly repetitive chains.

Could you give concrete examples when different architecture will be needed (i.e. relying on HasUpstreamObservableSource interface fails, except for switching observables of course) ?

My goal is to create approach that fits into current RxJava architecture or requires very minor additions (or achievable with simple bytecode manipulation on complile time via Gradle plugin).

Basically as if you'd write a MyObservable with mirrored operators.

I hope I will not need this :) Ok, it will be last resort. Gradle plugin that will replace rxjava imports with mine.

@akarnokd
Copy link
Owner

It can be fixed by adding extra source field

I'm not fond of keeping a reference to something otherwise not used.
You could use some ThreadLocal trickery to remember what the hook saw previously and match up scalars with scalarXMaps, assuming you don't use multiple threads to assemble a flow in the first place.

Could you give concrete examples

Anything that would otherwise cause multi-wrapping. Also anything relying on internal components.

@snorkel123
Copy link
Author

snorkel123 commented Feb 12, 2021

I decided to go with this approach:

Alternative would be an RxJava-backed custom factory for operators so you control the application of operators and thus can attach extra context to flow. Basically as if you'd write a MyObservable with mirrored operators.

Basically my operator will wrap into my proxy observable, that wraps all observable methods.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants