Skip to content

Commit 198755a

Browse files
committed
commit 集成Kotlin Flow响应式请求支持
1 parent 006f740 commit 198755a

12 files changed

Lines changed: 303 additions & 26 deletions

File tree

JetpackMvvm/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ android {
1111
minSdk 21
1212
targetSdk 36
1313
versionCode 23
14-
versionName "2.0.0"
14+
versionName "2.0.1"
1515

1616
consumerProguardFiles "consumer-rules.pro"
1717
}
@@ -35,7 +35,7 @@ afterEvaluate {
3535
from components.release
3636
groupId = 'com.github.hegaojian'
3737
artifactId = 'JetpackMvvm'
38-
version = '2.0.0'
38+
version = '2.0.1'
3939
}
4040
}
4141
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package me.hgj.jetpackmvvm.core.data
2+
3+
import androidx.lifecycle.viewModelScope
4+
import kotlinx.coroutines.CancellationException
5+
import kotlinx.coroutines.CoroutineScope
6+
import kotlinx.coroutines.Dispatchers
7+
import kotlinx.coroutines.flow.Flow
8+
import kotlinx.coroutines.flow.MutableSharedFlow
9+
import kotlinx.coroutines.flow.SharedFlow
10+
import kotlinx.coroutines.flow.asSharedFlow
11+
import kotlinx.coroutines.flow.catch
12+
import kotlinx.coroutines.flow.onCompletion
13+
import kotlinx.coroutines.launch
14+
import kotlinx.coroutines.supervisorScope
15+
import me.hgj.jetpackmvvm.R
16+
import me.hgj.jetpackmvvm.base.vm.BaseViewModel
17+
import me.hgj.jetpackmvvm.core.net.LoadStatusEntity
18+
import me.hgj.jetpackmvvm.core.net.LoadingEntity
19+
import me.hgj.jetpackmvvm.core.net.LoadingType
20+
import me.hgj.jetpackmvvm.ext.util.code
21+
import me.hgj.jetpackmvvm.ext.util.getStringExt
22+
import me.hgj.jetpackmvvm.ext.util.logE
23+
import me.hgj.jetpackmvvm.ext.util.msg
24+
25+
/**
26+
* 基于 Flow 的网络或本地请求 DSL
27+
* 支持在 onRequest 中自由组合 Flow 操作符,支持多次数据发射
28+
*
29+
* 使用示例:
30+
* ```
31+
* ViewModel中:
32+
* fun observeUserData() = requestFlow {
33+
* onRequest {
34+
* // 可以自由组合多个 Flow
35+
* userRepository.getUserFlow()
36+
* .combine(settingsRepository.getSettingsFlow()) { user, settings ->
37+
* UserWithSettings(user, settings)
38+
* }
39+
* .map { it.toUiModel() }
40+
* }
41+
* loadingType = LoadingType.LOADING_DIALOG
42+
* loadingMessage = "正在加载用户数据..."
43+
* }
44+
*
45+
* UI层中:
46+
* mViewModel.observeUserData().obs(this) {
47+
* onSuccess { userData ->
48+
* // 支持多次触发,每次数据更新都会调用
49+
* updateUI(userData)
50+
* }
51+
* onError { status ->
52+
* showError(status.msg)
53+
* }
54+
* }
55+
* ```
56+
*/
57+
fun <T> BaseViewModel.requestFlow(
58+
requestParameterDslClass: FlowRequestParameterDsl<T>.() -> Unit
59+
): SharedFlow<ApiResult<T>> {
60+
val dsl = FlowRequestParameterDsl<T>().apply(requestParameterDslClass)
61+
return executeFlowRequestWithResult(dsl)
62+
}
63+
64+
/**
65+
* 执行 Flow 版本的请求,支持多次数据发射
66+
*/
67+
private fun <T> BaseViewModel.executeFlowRequestWithResult(
68+
requestParameterDsl: FlowRequestParameterDsl<T>
69+
): SharedFlow<ApiResult<T>> {
70+
val resultFlow = MutableSharedFlow<ApiResult<T>>(
71+
replay = requestParameterDsl.replay,
72+
extraBufferCapacity = requestParameterDsl.extraBufferCapacity
73+
)
74+
viewModelScope.launch(Dispatchers.IO) {
75+
supervisorScope {
76+
try {
77+
// 显示 loading(只在开始时显示一次)
78+
if (requestParameterDsl.loadingType != LoadingType.LOADING_NULL) {
79+
loadingChange.loading.postValue = LoadingEntity(
80+
loadingType = requestParameterDsl.loadingType,
81+
loadingMessage = requestParameterDsl.loadingMessage,
82+
isShow = true,
83+
coroutineScope = this
84+
)
85+
}
86+
// 执行 Flow 并收集结果
87+
requestParameterDsl.onRequest.invoke(this)
88+
.onCompletion { cause ->
89+
// 请求完成时隐藏 loading
90+
if (requestParameterDsl.loadingType != LoadingType.LOADING_NULL) {
91+
loadingChange.loading.postValue = LoadingEntity(
92+
loadingType = requestParameterDsl.loadingType,
93+
loadingMessage = requestParameterDsl.loadingMessage,
94+
isShow = false
95+
)
96+
}
97+
// 成功完成
98+
if (cause == null && requestParameterDsl.loadingType == LoadingType.LOADING_XML) {
99+
loadingChange.showSuccess.postValue = true
100+
}
101+
}
102+
.catch { e ->
103+
// Flow 内部的异常处理
104+
if (e is CancellationException) return@catch
105+
e.printStackTrace()
106+
"抱歉!出错了----> : ${e.message}".logE()
107+
val loadStatus = LoadStatusEntity(
108+
code = e.code,
109+
msg = e.msg,
110+
throwable = e,
111+
loadingType = requestParameterDsl.loadingType
112+
)
113+
if (loadStatus.loadingType == LoadingType.LOADING_XML) {
114+
loadingChange.showError.postValue = loadStatus
115+
}
116+
resultFlow.emit(ApiResult.Error(loadStatus))
117+
}
118+
.collect { data ->
119+
// 每次发射数据都发送成功结果
120+
resultFlow.emit(ApiResult.Success(data))
121+
}
122+
123+
} catch (e: CancellationException) {
124+
// 请求被取消
125+
return@supervisorScope
126+
} catch (e: Exception) {
127+
// 外层异常处理
128+
e.printStackTrace()
129+
"抱歉!出错了----> : ${e.message}".logE()
130+
if (requestParameterDsl.loadingType != LoadingType.LOADING_NULL) {
131+
loadingChange.loading.postValue = LoadingEntity(
132+
loadingType = requestParameterDsl.loadingType,
133+
loadingMessage = requestParameterDsl.loadingMessage,
134+
isShow = false
135+
)
136+
}
137+
val loadStatus = LoadStatusEntity(
138+
code = e.code,
139+
msg = e.msg,
140+
throwable = e,
141+
loadingType = requestParameterDsl.loadingType
142+
)
143+
if (loadStatus.loadingType == LoadingType.LOADING_XML) {
144+
loadingChange.showError.postValue = loadStatus
145+
}
146+
resultFlow.emit(ApiResult.Error(loadStatus))
147+
}
148+
}
149+
}
150+
return resultFlow.asSharedFlow()
151+
}
152+
153+
/**
154+
* 增强版 Flow 请求参数封装类
155+
*/
156+
class FlowRequestParameterDsl<T> {
157+
/**
158+
* 协程请求方法体,返回一个 Flow<T>,支持多次数据发射
159+
* 可以自由使用 Flow 的各种操作符组合
160+
*/
161+
private var _onRequest: (suspend CoroutineScope.() -> Flow<T>)? = null
162+
var onRequest: suspend CoroutineScope.() -> Flow<T>
163+
get() = _onRequest ?: { throw IllegalStateException("onRequest 必须实现哦") }
164+
set(value) {
165+
_onRequest = value
166+
}
167+
168+
/**
169+
* 执行请求封装
170+
*/
171+
fun onRequest(block: suspend CoroutineScope.() -> Flow<T>) {
172+
_onRequest = block
173+
}
174+
175+
/** 加载提示内容 */
176+
var loadingMessage: String = getStringExt(R.string.helper_loading_tip)
177+
178+
/** 请求时loading类型 默认请求时不显示loading,
179+
* 注意:如果有多次发送值的应用场景建议传LoadingType.LOADING_NULL ,比如轮询请求,实时数据流等
180+
* */
181+
@LoadingType
182+
var loadingType = LoadingType.LOADING_NULL
183+
184+
/** SharedFlow 的重放数量
185+
* ```
186+
*1.这个参数指定了将最近发射过的多少个值重放给新的订阅者。例如,如果设置为1,那么每个新的订阅者都会立即收到最近发射的一个值。如果设置为0,那么新的订阅者不会收到任何之前发射的值,只会收到订阅后新发射的值。
187+
*
188+
*2.应用场景:当你希望新订阅者能够立即获得最新状态时,可以设置replay>0。比如,在状态管理中,你希望UI组件在配置变化后重建时能够立即获得最新的状态,就可以设置replay=1。
189+
* ```
190+
* */
191+
var replay: Int = 0
192+
193+
/** SharedFlow 的额外缓冲容量
194+
* ```
195+
* 1.这个参数用于配置除了replay之外还可以缓冲多少个数。SharedFlow 的总缓冲容量 = replay + extraBufferCapacity。当有发射值而订阅者尚未消费时,这些值会被缓冲起来,直到达到缓冲容量上限。默认情况下,SharedFlow 的发射操作会挂起,直到有缓冲空间可用(除非使用tryEmit)。
196+
*
197+
* 2.应用场景:当你希望处理背压(backpressure)时,可以通过调整缓冲容量来控制。较大的缓冲容量可以允许发射者在不挂起的情况下发射更多值,但可能会消耗更多内存。注意,即使设置了额外的缓冲容量,新的订阅者仍然只能收到replay个最近的值。
198+
* ```
199+
* */
200+
var extraBufferCapacity: Int = 0
201+
}

JetpackMvvm/src/main/java/me/hgj/jetpackmvvm/core/data/ResultState.kt

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package me.hgj.jetpackmvvm.core.data
22

3+
import androidx.lifecycle.Lifecycle
34
import androidx.lifecycle.LifecycleOwner
45
import androidx.lifecycle.LiveData
56
import androidx.lifecycle.MutableLiveData
7+
import androidx.lifecycle.lifecycleScope
8+
import androidx.lifecycle.repeatOnLifecycle
69
import com.kunminx.architecture.domain.message.MutableResult
7-
import me.hgj.jetpackmvvm.base.ui.BaseVmActivity
10+
import kotlinx.coroutines.flow.SharedFlow
11+
import kotlinx.coroutines.launch
812
import me.hgj.jetpackmvvm.core.net.LoadStatusEntity
913
import me.hgj.jetpackmvvm.core.net.LoadingType
1014
import me.hgj.jetpackmvvm.ext.util.toast
@@ -87,6 +91,37 @@ fun <T> LiveData<ApiResult<T>>.obs(
8791
}
8892
}
8993

94+
/**
95+
* 为 SharedFlow<ApiResult<T>> 添加 obs 扩展函数
96+
* 支持多次触发监听,每次数据更新都会回调
97+
*/
98+
fun <T> SharedFlow<ApiResult<T>>.obs(
99+
lifecycleOwner: LifecycleOwner,
100+
observerBuilder: ApiResultObserver<T>.() -> Unit
101+
) {
102+
val observer = ApiResultObserver<T>().apply(observerBuilder)
103+
lifecycleOwner.lifecycleScope.launch {
104+
lifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
105+
this@obs.collect { result ->
106+
when (result) {
107+
is ApiResult.Success -> observer.onSuccess(result.data)
108+
is ApiResult.Error -> {
109+
if (observer.onError != null) {
110+
observer.onError?.invoke(result.loadStatus)
111+
} else {
112+
// 不传 onError 时,走默认错误流程
113+
if (result.loadStatus.loadingType != LoadingType.LOADING_NULL) {
114+
result.loadStatus.msg.toast()
115+
}
116+
}
117+
}
118+
}
119+
}
120+
}
121+
}
122+
}
123+
124+
90125
/**
91126
* 让 MutableLiveData 支持类似属性赋值的写法:
92127
* ```kotlin
@@ -136,4 +171,3 @@ sealed class ApiResult<out T> {
136171
*/
137172
data class Error(val loadStatus: LoadStatusEntity) : ApiResult<Nothing>()
138173
}
139-

app/src/main/java/me/hgj/jetpackmvvm/demo/app/core/net/parses/ResponseParser.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@ open class ResponseParser<T> : TypeParser<T> {
4040
if (data.errorCode == NetUrl.EXPIRED_CODE) {
4141
// 登录过期,清除cookie 和本地 用户信息缓存
4242
UserManager.clearUser()
43-
val iCookieJar = RxHttpPlugins.getOkHttpClient().cookieJar as ICookieJar
44-
iCookieJar.removeAllCookie()
4543
"登录信息已经过期,请重新登录".toast()
4644
throw AppException(data.errorCode.toString(), "登录信息已经过期,请重新登录")
4745
}

app/src/main/java/me/hgj/jetpackmvvm/demo/app/core/util/UserManager.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import androidx.lifecycle.MutableLiveData
44
import me.hgj.jetpackmvvm.core.data.postValue
55
import me.hgj.jetpackmvvm.demo.data.model.entity.UserInfo
66
import me.hgj.jetpackmvvm.ext.util.cacheNullable
7+
import rxhttp.RxHttpPlugins
8+
import rxhttp.wrapper.cookie.ICookieJar
79

810
/**
911
* 作者 :hegaojian
@@ -48,6 +50,8 @@ object UserManager {
4850
/** 清空用户信息 */
4951
fun clearUser() {
5052
user = null
53+
val iCookieJar = RxHttpPlugins.getOkHttpClient().cookieJar as ICookieJar
54+
iCookieJar.removeAllCookie()
5155
userLiveData.postValue = user
5256
}
5357

app/src/main/java/me/hgj/jetpackmvvm/demo/data/repository/request/SearchRepository.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import me.hgj.jetpackmvvm.demo.data.model.entity.ArticleResponse
66
import me.hgj.jetpackmvvm.demo.data.model.entity.SearchResponse
77
import me.hgj.jetpackmvvm.demo.data.model.entity.ShareResponse
88
import rxhttp.wrapper.coroutines.Await
9+
import rxhttp.wrapper.coroutines.CallFlow
910
import rxhttp.wrapper.param.RxHttp
1011
import rxhttp.wrapper.param.toAwaitResponse
12+
import rxhttp.wrapper.param.toFlowResponse
1113

1214
/**
1315
* 作者 :hegaojian
@@ -37,8 +39,8 @@ object SearchRepository {
3739
/**
3840
* 查看他人的信息
3941
*/
40-
fun getShareUserData(id: String,pageIndex: Int): Await<ShareResponse> {
42+
fun getShareUserData(id: String,pageIndex: Int): CallFlow<ShareResponse> {
4143
return RxHttp.get(NetUrl.Search.SHARE_USER_DATA,id,pageIndex)
42-
.toAwaitResponse()
44+
.toFlowResponse()
4345
}
4446
}

app/src/main/java/me/hgj/jetpackmvvm/demo/data/repository/request/UserRepository.kt

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package me.hgj.jetpackmvvm.demo.data.repository.request
22

3+
import kotlinx.coroutines.delay
4+
import kotlinx.coroutines.flow.flow
35
import me.hgj.jetpackmvvm.demo.app.core.net.NetUrl
46
import me.hgj.jetpackmvvm.demo.data.model.entity.UserInfo
7+
import rxhttp.tryAwait
58
import rxhttp.wrapper.coroutines.Await
9+
import rxhttp.wrapper.coroutines.CallFlow
610
import rxhttp.wrapper.param.RxHttp
711
import rxhttp.wrapper.param.toAwaitResponse
12+
import rxhttp.wrapper.param.toFlowResponse
813

914
/**
1015
* 作者 : hegaojian
@@ -14,17 +19,42 @@ import rxhttp.wrapper.param.toAwaitResponse
1419
object UserRepository {
1520

1621
/**
17-
* 登录
22+
* 登录 livedata版本
1823
*/
19-
fun login(userName: String, password: String): Await<UserInfo> {
24+
fun loginLivedata(userName: String, password: String): Await<UserInfo> {
2025
return RxHttp.postForm(NetUrl.User.LOGIN)
2126
.add("username", userName)
2227
.add("password", password)
2328
.toAwaitResponse()
2429
}
2530

2631
/**
27-
* 登录
32+
* 登录 flow 版本
33+
*/
34+
fun loginFlow(userName: String, password: String): CallFlow<UserInfo> {
35+
return RxHttp.postForm(NetUrl.User.LOGIN)
36+
.add("username", userName)
37+
.add("password", password)
38+
.toFlowResponse<UserInfo>()
39+
}
40+
41+
fun loginFlow2(userName: String, password: String) = flow {
42+
//简单测试一个轮询,切记:多结果发送,一定要try catch,不然会将整个flow流都中断了
43+
while (true) {
44+
val data = RxHttp.postForm(NetUrl.User.LOGIN)
45+
.add("username", userName)
46+
.add("password", password)
47+
.toAwaitResponse<UserInfo>().tryAwait() //rxhttp提供的错误处理,请求失败就返回null
48+
if (data != null) {
49+
emit(data)
50+
}
51+
delay(5000)
52+
}
53+
}
54+
55+
56+
/**
57+
* 注册
2858
*/
2959
fun register(userName: String, password: String): Await<Any> {
3060
return RxHttp.postForm(NetUrl.User.REGISTER)

0 commit comments

Comments
 (0)