RxJava在Android移动端开发中的实战应用之二
发布日期:2021-07-01 03:06:07 浏览次数:3 分类:技术文章

本文共 10654 字,大约阅读时间需要 35 分钟。

响应式编程是以异步和数据流来构建事务关系的编程模型,异步和数据流是以构建事务关系而存在,异步是为了区分无关的事务,数据流是为了联系有关的事务

版权声明:本文出自门心叼龙的博客,属于原创内容,转载请注明出处

文章目录

1.实现简单的网络请求

Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull ObservableEmitter
e) throws Exception { Builder builder = new Builder() .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function
() { @Override public MobileAddress apply(@NonNull Response response) throws Exception { Log.e(TAG, "map 线程:" + Thread.currentThread().getName() + "\n"); if (response.isSuccessful()) { ResponseBody body = response.body(); if (body != null) { Log.e(TAG, "map:转换前:" + response.body()); return new Gson().fromJson(body.string(), MobileAddress.class); } } return null; } }).observeOn(AndroidSchedulers.mainThread()).doOnNext(new Consumer
() { @Override public void accept(@NonNull MobileAddress s) throws Exception { Log.e(TAG, "doOnNext 线程:" + Thread.currentThread().getName() + "\n"); mRxOperatorsText.append("\ndoOnNext 线程:" + Thread.currentThread().getName() + "\n"); Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n"); mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n"); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer
() { @Override public void accept(@NonNull MobileAddress data) throws Exception { Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n"); mRxOperatorsText.append("\nsubscribe 线程:" + Thread.currentThread().getName() + "\n"); Log.e(TAG, "成功:" + data.toString() + "\n"); mRxOperatorsText.append("成功:" + data.toString() + "\n"); } }, new Consumer
() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n"); mRxOperatorsText.append("\nsubscribe 线程:" + Thread.currentThread().getName() + "\n"); Log.e(TAG, "失败:" + throwable.getMessage() + "\n"); mRxOperatorsText.append("失败:" + throwable.getMessage() + "\n"); } });

2.先读取缓存,如果缓存没数据再通过网络请求获取数据后更新UI

Observable
cache = Observable.create(new ObservableOnSubscribe
() { @Override public void subscribe(@NonNull ObservableEmitter
e) throws Exception { Log.e(TAG, "create当前线程:"+Thread.currentThread().getName() ); FoodList data = CacheManager.getInstance().getFoodListData(); // 在操作符 concat 中,只有调用 onComplete 之后才会执行下一个 Observable if (data != null){ // 如果缓存数据不为空,则直接读取缓存数据,而不读取网络数据 isFromNet = false; Log.e(TAG, "\nsubscribe: 读取缓存数据:" ); runOnUiThread(new Runnable() { @Override public void run() { mRxOperatorsText.append("\nsubscribe: 读取缓存数据:\n"); } }); e.onNext(data); }else { isFromNet = true; runOnUiThread(new Runnable() { @Override public void run() { mRxOperatorsText.append("\nsubscribe: 读取网络数据:\n"); } }); Log.e(TAG, "\nsubscribe: 读取网络数据:" ); e.onComplete(); } } }); Observable
network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list") .addQueryParameter("rows",10+"") .build() .getObjectObservable(FoodList.class); // 两个 Observable 的泛型应当保持一致Observable.concat(cache,network).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer
() { @Override public void accept(@NonNull FoodList tngouBeen) throws Exception { Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() ); if (isFromNet){ mRxOperatorsText.append("accept : 网络获取数据设置缓存: \n"); Log.e(TAG, "accept : 网络获取数据设置缓存: \n"+tngouBeen.toString() ); CacheManager.getInstance().setFoodListData(tngouBeen); } mRxOperatorsText.append("accept: 读取数据成功:" + tngouBeen.toString()+"\n"); Log.e(TAG, "accept: 读取数据成功:" + tngouBeen.toString()); } }, new Consumer
() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "subscribe 失败:"+Thread.currentThread().getName() ); Log.e(TAG, "accept: 读取数据失败:"+throwable.getMessage() ); mRxOperatorsText.append("accept: 读取数据失败:"+throwable.getMessage()+"\n"); } });

3.多个网络请求依次依赖

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")                .addQueryParameter("rows", 1 + "")                .build()                .getObjectObservable(FoodList.class) // 发起获取食品列表的请求,并解析到FootList                .subscribeOn(Schedulers.io())        // 在io线程进行网络请求                .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理获取食品列表的请求结果                .doOnNext(new Consumer
() { @Override public void accept(@NonNull FoodList foodList) throws Exception { // 先根据获取食品列表的响应结果做一些操作 Log.e(TAG, "accept: doOnNext :" + foodList.toString()); mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n"); } }) .observeOn(Schedulers.io()) // 回到 io 线程去处理获取食品详情的请求 .flatMap(new Function
>() { @Override public ObservableSource
apply(@NonNull FoodList foodList) throws Exception { if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) { return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show") .addBodyParameter("id", foodList.getTngou().get(0).getId() + "") .build() .getObjectObservable(FoodDetail.class); } return null; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer
() { @Override public void accept(@NonNull FoodDetail foodDetail) throws Exception { Log.e(TAG, "accept: success :" + foodDetail.toString()); mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n"); } }, new Consumer
() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "accept: error :" + throwable.getMessage()); mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n"); } });

4.结合多个接口的数据更新UI

Observable
observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .build() .getObjectObservable(MobileAddress.class); Observable
observable2 = Network.getGankApi() .getCategoryData("Android",1,1); Observable.zip(observable1, observable2, new BiFunction
() { @Override public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception { return "合并后的数据为:手机归属地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who; } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer
() { @Override public void accept(@NonNull String s) throws Exception { Log.e(TAG, "accept: 成功:" + s+"\n"); } }, new Consumer
() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "accept: 失败:" + throwable+"\n"); } });

5.间隔任务实现心跳

private Disposable mDisposable;    @Override    protected void doSomething() {        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)                .doOnNext(new Consumer
() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: doOnNext : "+aLong ); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer
() { @Override public void accept(@NonNull Long aLong) throws Exception { Log.e(TAG, "accept: 设置文本 :"+aLong ); mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n"); } }); } /** * 销毁时停止心跳 */ @Override protected void onDestroy() { super.onDestroy(); if (mDisposable != null){ mDisposable.dispose(); } }

转载地址:https://menxindiaolong.blog.csdn.net/article/details/89740000 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:RxJava线程切换之subscribeOn和observeOn详解
下一篇:RxJava在Android移动端开发中的实战应用之一

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年04月10日 18时13分53秒