Skip to content

Commit 7b2754d

Browse files
committed
Add event broadcaster for player extension, allowing extensions to run in sequence.
Adapt test case to new mediamp architecture. Only launch play progress saver and episode switcher after media is loaded.
1 parent 0a95163 commit 7b2754d

10 files changed

Lines changed: 260 additions & 139 deletions

app/shared/app-data/src/commonMain/kotlin/domain/episode/EpisodeFetchSelectPlayState.kt

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2024-2025 OpenAni and contributors.
2+
* Copyright (C) 2024-2026 OpenAni and contributors.
33
*
44
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
55
* Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link.
@@ -43,6 +43,7 @@ import me.him188.ani.app.domain.player.PlayerExtensionManager
4343
import me.him188.ani.app.domain.player.extension.EpisodePlayerExtensionFactory
4444
import me.him188.ani.app.domain.player.extension.ExtensionBackgroundTaskScope
4545
import me.him188.ani.app.domain.player.extension.PlayerExtension
46+
import me.him188.ani.app.domain.player.extension.PlayerExtensionEvent
4647
import me.him188.ani.app.domain.usecase.GlobalKoin
4748
import me.him188.ani.utils.analytics.Analytics
4849
import me.him188.ani.utils.analytics.AnalyticsEvent.Companion.EpisodeSwitch
@@ -108,7 +109,11 @@ class EpisodeFetchSelectPlayState(
108109

109110
private val extensionManager by lazy {
110111
val intrinsicExtensions = listOf(
111-
EpisodePlayerExtensionFactory { _, _ -> LoadMediaOnSelectExtension() },
112+
EpisodePlayerExtensionFactory { context, _ ->
113+
LoadMediaOnSelectExtension { episodeId ->
114+
backgroundScope.launch { context.broadcast(MediaLoadedEvent(episodeId)) }
115+
}
116+
},
112117
)
113118

114119
PlayerExtensionManager(
@@ -278,7 +283,9 @@ class EpisodeFetchSelectPlayState(
278283
*
279284
* This extension calls [PlayerSession.loadMedia] when a new media is selected.
280285
*/
281-
private inner class LoadMediaOnSelectExtension : PlayerExtension("LoadMediaOnSelect") {
286+
private inner class LoadMediaOnSelectExtension(
287+
private val onMediaLoaded: (episodeId: Int) -> Unit = { }
288+
) : PlayerExtension("LoadMediaOnSelect") {
282289
override fun onStart(episodeSession: EpisodeSession, backgroundTaskScope: ExtensionBackgroundTaskScope) {
283290
backgroundTaskScope.launch("LoadMediaOnSelect") {
284291
episodeSessionFlow.collectLatest { episodeSession ->
@@ -287,13 +294,13 @@ class EpisodeFetchSelectPlayState(
287294

288295
// `filterNotNull()` is needed. Even when media is unselect, we should not stop the player.
289296
fetchSelect.mediaSelector.selected.filterNotNull().collectLatest { media ->
290-
playerSession.loadMedia(
291-
media,
292-
episodeSession.infoBundleFlow
293-
.filterNotNull()
294-
.first()
295-
.episodeInfo.toEpisodeMetadata(),
296-
)
297+
val episodeInfo = episodeSession.infoBundleFlow
298+
.filterNotNull()
299+
.first()
300+
.episodeInfo
301+
302+
playerSession.loadMedia(media, episodeInfo.toEpisodeMetadata())
303+
onMediaLoaded(episodeInfo.episodeId)
297304
}
298305
}
299306
}
@@ -304,6 +311,11 @@ class EpisodeFetchSelectPlayState(
304311
private companion object {
305312
private val logger = logger<EpisodeFetchSelectPlayState>()
306313
}
314+
315+
/**
316+
* Event of intrinsic [LoadMediaOnSelectExtension] to indicate that the current media is loaded into player.
317+
*/
318+
class MediaLoadedEvent(val episodeId: Int) : PlayerExtensionEvent
307319
}
308320

309321
/**
Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2024 OpenAni and contributors.
2+
* Copyright (C) 2024-2026 OpenAni and contributors.
33
*
44
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
55
* Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link.
@@ -9,7 +9,9 @@
99

1010
package me.him188.ani.app.domain.player
1111

12+
import kotlinx.coroutines.channels.BufferOverflow
1213
import kotlinx.coroutines.flow.Flow
14+
import kotlinx.coroutines.flow.MutableSharedFlow
1315
import me.him188.ani.app.domain.episode.EpisodeFetchSelectPlayState
1416
import me.him188.ani.app.domain.episode.EpisodeSession
1517
import me.him188.ani.app.domain.episode.UnsafeEpisodeSessionApi
@@ -18,34 +20,17 @@ import me.him188.ani.app.domain.episode.player
1820
import me.him188.ani.app.domain.player.extension.EpisodePlayerExtensionFactory
1921
import me.him188.ani.app.domain.player.extension.PlayerExtension
2022
import me.him188.ani.app.domain.player.extension.PlayerExtensionContext
23+
import me.him188.ani.app.domain.player.extension.PlayerExtensionEvent
2124
import org.koin.core.Koin
2225
import org.openani.mediamp.MediampPlayer
2326
import kotlin.coroutines.cancellation.CancellationException
2427

2528
class PlayerExtensionManager(
26-
val extensions: List<PlayerExtension>,
27-
) {
28-
inline fun call(block: (PlayerExtension) -> Unit) {
29-
extensions.forEach {
30-
try {
31-
block(it)
32-
} catch (e: Throwable) {
33-
if (e is CancellationException) {
34-
throw e
35-
}
36-
37-
throw ExtensionException("Error calling extension ${it.name}, see cause", e)
38-
}
39-
}
40-
}
41-
}
42-
43-
fun PlayerExtensionManager(
44-
extensions: List<EpisodePlayerExtensionFactory<*>>,
29+
factories: List<EpisodePlayerExtensionFactory<*>>,
4530
state: EpisodeFetchSelectPlayState,
4631
koin: Koin,
47-
): PlayerExtensionManager {
48-
val context = object : PlayerExtensionContext {
32+
) {
33+
private val context = object : PlayerExtensionContext {
4934
override val subjectId: Int
5035
get() = state.subjectId
5136

@@ -56,6 +41,9 @@ fun PlayerExtensionManager(
5641
override val sessionFlow: Flow<EpisodeSession>
5742
get() = state.episodeSessionFlow
5843

44+
override val broadcastEvent: MutableSharedFlow<PlayerExtensionEvent> =
45+
MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST)
46+
5947
@UnsafeEpisodeSessionApi
6048
override suspend fun getCurrentEpisodeId(): Int {
6149
return state.getCurrentEpisodeId()
@@ -69,9 +57,29 @@ fun PlayerExtensionManager(
6957

7058
state.switchEpisode(newEpisodeId)
7159
}
60+
61+
override suspend fun broadcast(event: PlayerExtensionEvent) {
62+
broadcastEvent.emit(event)
63+
}
7264
}
7365

74-
return PlayerExtensionManager(extensions.map { it.create(context, koin) })
66+
val extensions: List<PlayerExtension> by lazy {
67+
factories.map { it.create(context, koin) }
68+
}
69+
70+
inline fun call(block: (PlayerExtension) -> Unit) {
71+
extensions.forEach {
72+
try {
73+
block(it)
74+
} catch (e: Throwable) {
75+
if (e is CancellationException) {
76+
throw e
77+
}
78+
79+
throw ExtensionException("Error calling extension ${it.name}, see cause", e)
80+
}
81+
}
82+
}
7583
}
7684

7785
class ExtensionException(message: String? = null, cause: Throwable? = null) : Exception(message, cause)

app/shared/app-data/src/commonMain/kotlin/domain/player/extension/PlayerExtension.kt

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2024-2025 OpenAni and contributors.
2+
* Copyright (C) 2024-2026 OpenAni and contributors.
33
*
44
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
55
* Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link.
@@ -12,6 +12,8 @@ package me.him188.ani.app.domain.player.extension
1212
import kotlinx.coroutines.CoroutineScope
1313
import kotlinx.coroutines.Job
1414
import kotlinx.coroutines.flow.Flow
15+
import kotlinx.coroutines.flow.SharedFlow
16+
import kotlinx.coroutines.flow.filterIsInstance
1517
import me.him188.ani.app.domain.episode.EpisodeFetchSelectPlayState
1618
import me.him188.ani.app.domain.episode.EpisodeSession
1719
import me.him188.ani.app.domain.episode.UnsafeEpisodeSessionApi
@@ -70,11 +72,24 @@ interface PlayerExtensionContext {
7072

7173
val sessionFlow: Flow<EpisodeSession>
7274

75+
/**
76+
* A shared event channel for all extensions.
77+
*/
78+
val broadcastEvent: SharedFlow<PlayerExtensionEvent>
79+
7380
@UnsafeEpisodeSessionApi
7481
suspend fun getCurrentEpisodeId(): Int
7582
suspend fun switchEpisode(newEpisodeId: Int)
83+
84+
suspend fun broadcast(event: PlayerExtensionEvent)
7685
}
7786

87+
inline fun <reified T : PlayerExtensionEvent> PlayerExtensionContext.subscribeEvents(): Flow<T> {
88+
return broadcastEvent.filterIsInstance<T>()
89+
}
90+
91+
interface PlayerExtensionEvent
92+
7893
fun interface EpisodePlayerExtensionFactory<T : PlayerExtension> {
7994
fun create(context: PlayerExtensionContext, koin: Koin): T
8095
}

app/shared/app-data/src/commonMain/kotlin/domain/player/extension/RememberPlayProgressExtension.kt

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@
99

1010
package me.him188.ani.app.domain.player.extension
1111

12+
import kotlinx.coroutines.CompletableDeferred
1213
import kotlinx.coroutines.Dispatchers
14+
import kotlinx.coroutines.NonCancellable
1315
import kotlinx.coroutines.flow.collectLatest
1416
import kotlinx.coroutines.flow.filter
1517
import kotlinx.coroutines.flow.firstOrNull
1618
import kotlinx.coroutines.withContext
1719
import me.him188.ani.app.data.repository.player.EpisodePlayHistoryRepository
20+
import me.him188.ani.app.domain.episode.EpisodeFetchSelectPlayState
1821
import me.him188.ani.app.domain.episode.EpisodeSession
1922
import me.him188.ani.app.domain.episode.UnsafeEpisodeSessionApi
2023
import me.him188.ani.utils.logging.info
@@ -37,23 +40,30 @@ class RememberPlayProgressExtension(
3740
private val playProgressRepository: EpisodePlayHistoryRepository by koin.inject()
3841

3942
override fun onStart(episodeSession: EpisodeSession, backgroundTaskScope: ExtensionBackgroundTaskScope) {
43+
val mediaLoaded = CompletableDeferred<Unit>()
44+
backgroundTaskScope.launch("MediaLoadedListener") {
45+
context.subscribeEvents<EpisodeFetchSelectPlayState.MediaLoadedEvent>().collectLatest { event ->
46+
if (event.episodeId == episodeSession.episodeId && mediaLoaded.isActive) {
47+
mediaLoaded.complete(Unit)
48+
}
49+
}
50+
}
51+
4052
backgroundTaskScope.launch("MediaSelectorListener") {
41-
context.sessionFlow.collectLatest { session ->
42-
session.fetchSelectFlow.collectLatest inner@{ fetchSelect ->
43-
if (fetchSelect == null) return@inner
53+
mediaLoaded.await() // 播放器开始播放了再跑这个 extension
54+
episodeSession.fetchSelectFlow.collectLatest inner@{ fetchSelect ->
55+
if (fetchSelect == null) return@inner
4456

45-
fetchSelect.mediaSelector.events.onBeforeSelect.collect {
46-
// 切换 数据源 前保存播放进度
47-
savePlayProgressOrRemove(session.episodeId)
48-
}
57+
fetchSelect.mediaSelector.events.onBeforeSelect.collect {
58+
// 切换 数据源 前保存播放进度
59+
savePlayProgressOrRemove(episodeSession.episodeId)
4960
}
50-
5161
}
5262
}
5363

5464
backgroundTaskScope.launch("PlaybackStateListener") {
5565
val player = context.player
56-
player.playbackState.collect { playbackState ->
66+
player.playbackState.collectLatest { playbackState ->
5767
when (playbackState) {
5868
// 加载播放进度
5969
PlaybackState.READY -> {
@@ -65,17 +75,19 @@ class RememberPlayProgressExtension(
6575
logger.info { "Loaded saved position: $positionMillis, waiting for video properties" }
6676
player.mediaProperties.filter { it != null && it.durationMillis > 0L }.firstOrNull()
6777
logger.info { "Loaded saved position: $positionMillis, video properties ready, seeking" }
68-
withContext(Dispatchers.Main) { // android must call in main thread
78+
withContext(Dispatchers.Main + NonCancellable) { // android must call in main thread
6979
player.seekTo(positionMillis)
7080
}
7181
}
7282
}
7383

7484
PlaybackState.PAUSED -> {
85+
mediaLoaded.await() // 播放器开始播放了一次之后再保存状态
7586
savePlayProgressOrRemove(episodeSession.episodeId)
7687
}
7788

7889
PlaybackState.FINISHED -> {
90+
mediaLoaded.await() // 播放器开始播放了一次之后再保存状态
7991
savePlayProgressOrRemove(episodeSession.episodeId)
8092
}
8193

@@ -147,4 +159,4 @@ class RememberPlayProgressExtension(
147159

148160
private val logger = logger<RememberPlayProgressExtension>()
149161
}
150-
}
162+
}

app/shared/app-data/src/commonMain/kotlin/domain/player/extension/SwitchNextEpisodeExtension.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2024-2025 OpenAni and contributors.
2+
* Copyright (C) 2024-2026 OpenAni and contributors.
33
*
44
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
55
* Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link.
@@ -9,9 +9,11 @@
99

1010
package me.him188.ani.app.domain.player.extension
1111

12+
import kotlinx.coroutines.CompletableDeferred
1213
import kotlinx.coroutines.flow.collectLatest
1314
import kotlinx.coroutines.flow.distinctUntilChanged
1415
import kotlinx.coroutines.flow.map
16+
import me.him188.ani.app.domain.episode.EpisodeFetchSelectPlayState
1517
import me.him188.ani.app.domain.episode.EpisodeSession
1618
import me.him188.ani.app.domain.settings.GetVideoScaffoldConfigUseCase
1719
import me.him188.ani.utils.logging.info
@@ -34,7 +36,15 @@ class SwitchNextEpisodeExtension(
3436
private val getVideoScaffoldConfigUseCase: GetVideoScaffoldConfigUseCase by koin.inject()
3537

3638
override fun onStart(episodeSession: EpisodeSession, backgroundTaskScope: ExtensionBackgroundTaskScope) {
39+
val mediaLoaded = CompletableDeferred<Unit>()
40+
backgroundTaskScope.launch("MediaLoadedListener") {
41+
context.subscribeEvents<EpisodeFetchSelectPlayState.MediaLoadedEvent>().collectLatest {
42+
if (mediaLoaded.isActive) mediaLoaded.complete(Unit)
43+
}
44+
}
45+
3746
backgroundTaskScope.launch("SwitchNextEpisode") {
47+
mediaLoaded.await() // 播放器开始播放了再启用自动下一集特性
3848
context.sessionFlow.collectLatest { session ->
3949
getVideoScaffoldConfigUseCase()
4050
.map { it.autoPlayNext }

app/shared/app-data/src/commonTest/kotlin/domain/episode/EpisodeFetchPlayStateSwitchEpisodeTest.kt

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import me.him188.ani.app.domain.media.resolver.TestUniversalMediaResolver
3030
import me.him188.ani.app.domain.player.extension.AbstractPlayerExtensionTest
3131
import me.him188.ani.app.domain.player.extension.RememberPlayProgressExtension
3232
import me.him188.ani.app.domain.player.extension.SwitchNextEpisodeExtension
33-
import me.him188.ani.app.domain.player.extension.loadMedia
3433
import me.him188.ani.app.domain.settings.GetVideoScaffoldConfigUseCase
3534
import me.him188.ani.utils.coroutines.childScope
3635
import org.openani.mediamp.PlaybackState
@@ -89,21 +88,14 @@ class EpisodeFetchPlayStateSwitchEpisodeTest : AbstractPlayerExtensionTest() {
8988
suite.setMediaDuration(100_000)
9089
advanceUntilIdle()
9190

92-
suite.player.currentPositionMillis.value = suite.player.mediaProperties.value!!.durationMillis
91+
suite.player.seekTo(suite.player.mediaProperties.value!!.durationMillis)
9392
suite.player.playbackState.value = PlaybackState.FINISHED
9493
advanceUntilIdle() // 自动切换到下一集数
9594

96-
// 前一集播放完毕了
97-
assertEquals(null, playHistory.getPositionMillisByEpisodeId(initialEpisodeId))
98-
99-
assertEquals(newEpisodeId, state.getCurrentEpisodeId())
100-
101-
// 加载新的视频
102-
suite.player.loadMedia(100_000)
103-
advanceUntilIdle() // 自动加载播放进度
95+
// 没有实际加载媒体源时,不会覆盖或删除旧进度
96+
assertEquals(3000, playHistory.getPositionMillisByEpisodeId(initialEpisodeId))
10497

105-
// should load the saved progress for new episode
106-
assertEquals(5000, suite.player.currentPositionMillis.value)
98+
assertEquals(initialEpisodeId, state.getCurrentEpisodeId())
10799

108100
testScope.cancel()
109101
}
@@ -119,14 +111,18 @@ class EpisodeFetchPlayStateSwitchEpisodeTest : AbstractPlayerExtensionTest() {
119111
playHistory.saveOrUpdate(newEpisodeId, 5000)
120112

121113
// 播放到一半
122-
123114
assertEquals(initialEpisodeId, state.getCurrentEpisodeId())
124115

125-
// 播到最尾部了
116+
val myMedia = TestMediaList[0]
117+
ms1.complete(listOf(myMedia))
118+
state.mediaSelectorFlow.filterNotNull().first().select(myMedia)
126119
suite.setMediaDuration(100_000)
127120
advanceUntilIdle()
128121

129-
suite.player.currentPositionMillis.value = suite.player.mediaProperties.value!!.durationMillis
122+
assertEquals(3000, suite.player.currentPositionMillis.value)
123+
124+
// 播到最尾部了
125+
suite.player.seekTo(suite.player.mediaProperties.value!!.durationMillis)
130126
suite.player.playbackState.value = PlaybackState.FINISHED
131127
advanceUntilIdle() // 自动切换到下一集数
132128

@@ -135,11 +131,9 @@ class EpisodeFetchPlayStateSwitchEpisodeTest : AbstractPlayerExtensionTest() {
135131

136132
assertEquals(newEpisodeId, state.getCurrentEpisodeId())
137133

138-
val myMedia = TestMediaList[0]
139-
ms1.complete(listOf(myMedia))
140-
state.mediaSelectorFlow.filterNotNull().first().select(myMedia)
141-
advanceUntilIdle() // 自动选择
142-
134+
val myMedia1 = TestMediaList[1]
135+
ms1.complete(listOf(myMedia1))
136+
state.mediaSelectorFlow.filterNotNull().first().select(myMedia1)
143137
suite.setMediaDuration(100_000)
144138
advanceUntilIdle() // 自动加载播放进度
145139

0 commit comments

Comments
 (0)