杭州政府网站建设关键词排名优化工具
要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展
implementation 'io.reactivex:rxandroid:1.2.1'implementation 'io.reactivex:rxjava:1.2.0'
observer 是一个观察者接口,泛型T为观察者观察数据的类型,里面只有三个方法,其中onError()和onCompleted()最后只能调用其中一个,调用了此二方法后onNext()将不会在调用
onNext()方法可以调用0到多次,观察到的数据处理在此实现。
/*** Provides a mechanism for receiving push-based notifications.* <p>* After an Observer calls an {@link Observable}'s {@link Observable#subscribe subscribe} method, the* {@code Observable} calls the Observer's {@link #onNext} method to provide notifications. A well-behaved* {@code Observable} will call an Observer's {@link #onCompleted} method exactly once or the Observer's* {@link #onError} method exactly once.** @see <a href="http://reactivex.io/documentation/observable.html">ReactiveX documentation: Observable</a>* @param <T>* the type of item the Observer expects to observe*/
public interface Observer<T> {/*** Notifies the Observer that the {@link Observable} has finished sending push-based notifications.* <p>* The {@link Observable} will not call this method if it calls {@link #onError}.*/void onCompleted();/*** Notifies the Observer that the {@link Observable} has experienced an error condition.* <p>* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or* {@link #onCompleted}.** @param e* the exception encountered by the Observable*/void onError(Throwable e);/*** Provides the Observer with a new item to observe.* <p>* The {@link Observable} may call this method 0 or more times.* <p>* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or* {@link #onError}.** @param t* the item emitted by the Observable*/void onNext(T t);}
使用Observer接口创建一个观察者
Observer<String> observer = new Observer<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("observer onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("observer onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug("observer onNext is come in s= "+s);}};
Subscriber 为Observer的一个抽象实现类
public abstract class Subscriber<T> implements Observer<T>, Subscription{}
使用Subscriber 创建一个观察者
Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onCompleted() {ILog.LogDebug("subscriber onCompleted is come in");}@Overridepublic void onError(Throwable e) {ILog.LogDebug("subscriber onError is come in");}@Overridepublic void onNext(String s) {ILog.LogDebug("subscriber onNext is come in s = "+s);}@Overridepublic void onStart() {super.onStart();}};
其中onStart()方法,它会在事件还未发送之前被调用,可以用于做一些准备工作。例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。
Observable被观察者也叫事件发生源,它决定什么时候触发事件以及触发怎样的事件。
public class Observable<T> {
}
Observable 可以使用Observable.create()方法创建
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("observable call onNext0");subscriber.onStart();subscriber.onNext("observable call onNext");subscriber.onCompleted();subscriber.onNext("observable call onNext1");}});
也可以使用just和from方法创建
Observable<String> observable = Observable.just("observable call onNext","observable call onNext1");String[] array = {"observable call onNext","observable call onNext1"};
Observable<String> observable2 = Observable.from(array);
那么just,from之间的区别是什么呢,通过查看Observable源码,just方法内部也是调用的from方法。
public class Observable<T> {....public static <T> Observable<T> just(T t1, T t2) {return from((T[])new Object[] { t1, t2 });}....
}
观察者和被观察者之间的绑定
observable.subscribe(subscriber);
Rxjava的不完整回调Action
Action后的数字代表回调的参数类型数量
Action1<String> onNextAction = new Action1<String>() {@Overridepublic void call(String s) {ILog.LogDebug("onNextAction onNext s = "+s);}};Action2<String,String> action2 = new Action2<String, String>() {@Overridepublic void call(String s, String s2) {}};Action1<Throwable> throwableAction1 = new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {ILog.LogDebug("throwableAction1 call");}};Action0 onCompleteAction = new Action0() {@Overridepublic void call() {ILog.LogDebug("onCompleteAction is come in");}};//调用方法observable.subscribe(onNextAction);observable.subscribe(onNextAction,throwableAction1);observable.subscribe(onNextAction,throwableAction1,onCompleteAction);
那么Rxjava的内部是怎么使用action的呢
通过源码可以看到
public class Observable<T> {......public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {if (onNext == null) {throw new IllegalArgumentException("onNext can not be null");}if (onError == null) {throw new IllegalArgumentException("onError can not be null");}Action0 onCompleted = Actions.empty();return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));}......public final Subscription subscribe(Subscriber<? super T> subscriber) {return Observable.subscribe(subscriber, this);}......
}
最后调用了 subscribe()方法 而 subscribe()方法参数是Subscriber,通过查看ActionSubscriber源码,可知还是将action作为参数 最后转成了Subscriber对象
public final class ActionSubscriber<T> extends Subscriber<T> {final Action1<? super T> onNext;final Action1<Throwable> onError;final Action0 onCompleted;public ActionSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) {this.onNext = onNext;this.onError = onError;this.onCompleted = onCompleted;}@Overridepublic void onNext(T t) {onNext.call(t);}@Overridepublic void onError(Throwable e) {onError.call(e);}@Overridepublic void onCompleted() {onCompleted.call();}
}
同理当我们使用Observer接口生成匿名类时,然后再调用 observable.subscribe(observer);
进行绑定也是将Observer转换成Subscriber对象,源代码如下:
public class Observable<T> {......public final Subscription subscribe(final Observer<? super T> observer) {if (observer instanceof Subscriber) {return subscribe((Subscriber<? super T>)observer);}if (observer == null) {throw new NullPointerException("observer is null");}return subscribe(new ObserverSubscriber<T>(observer));}......}
public final class ObserverSubscriber<T> extends Subscriber<T> {final Observer<? super T> observer;public ObserverSubscriber(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {observer.onNext(t);}@Overridepublic void onError(Throwable e) {observer.onError(e);}@Overridepublic void onCompleted() {observer.onCompleted();}
}