Skip to content

Commit 4cb4bd5

Browse files
committed
Show pending HTTP cache entries immediately, close #2903
1 parent 02a6932 commit 4cb4bd5

2 files changed

Lines changed: 279 additions & 6 deletions

File tree

app/shared/app-data/src/commonMain/kotlin/domain/media/cache/storage/HttpMediaCacheStorage.kt

Lines changed: 109 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@ package me.him188.ani.app.domain.media.cache.storage
1111

1212
import androidx.datastore.core.DataStore
1313
import kotlinx.coroutines.Dispatchers
14+
import kotlinx.coroutines.flow.Flow
15+
import kotlinx.coroutines.flow.MutableStateFlow
16+
import kotlinx.coroutines.flow.flatMapLatest
17+
import kotlinx.coroutines.flow.flowOf
1418
import kotlinx.coroutines.sync.Mutex
1519
import kotlinx.coroutines.sync.withLock
1620
import kotlinx.coroutines.withContext
1721
import me.him188.ani.app.data.persistent.database.dao.HttpCacheDownloadStateDao
1822
import me.him188.ani.app.domain.media.cache.MediaCache
19-
import me.him188.ani.app.domain.media.cache.engine.HttpMediaCacheEngine
23+
import me.him188.ani.app.domain.media.cache.MediaCacheState
24+
import me.him188.ani.app.domain.media.cache.engine.MediaCacheEngine
2025
import me.him188.ani.app.domain.media.resolver.EpisodeMetadata
2126
import me.him188.ani.datasources.api.Media
2227
import me.him188.ani.datasources.api.MediaCacheMetadata
@@ -29,7 +34,7 @@ class HttpMediaCacheStorage(
2934
override val mediaSourceId: String,
3035
private val store: DataStore<List<MediaCacheSave>>,
3136
private val dao: HttpCacheDownloadStateDao,
32-
private val httpEngine: HttpMediaCacheEngine,
37+
private val httpEngine: MediaCacheEngine,
3338
private val displayName: String,
3439
parentCoroutineContext: CoroutineContext = EmptyCoroutineContext,
3540
) : AbstractDataStoreMediaCacheStorage(mediaSourceId, store, httpEngine, displayName, parentCoroutineContext) {
@@ -69,12 +74,110 @@ class HttpMediaCacheStorage(
6974
resume: Boolean
7075
): MediaCache {
7176
return lock.withLock {
72-
super.cache(media, metadata, episodeMetadata, false)
73-
}.also {
74-
if (resume) {
75-
it.resume()
77+
listFlow.value.firstOrNull {
78+
isSameMediaAndEpisode(it, media, metadata)
79+
}?.let { return it }
80+
81+
if (!engine.supports(media)) {
82+
throw UnsupportedOperationException("Engine does not support media: $media")
7683
}
84+
85+
val pending = PendingHttpMediaCache(media, metadata)
86+
var createdForCleanup: MediaCache? = null
87+
listFlow.value += pending
88+
89+
try {
90+
val createdCache = httpEngine.createCache(
91+
media,
92+
metadata,
93+
episodeMetadata,
94+
scope.coroutineContext,
95+
)
96+
createdForCleanup = createdCache
97+
98+
withContext(Dispatchers.IO_) {
99+
store.updateData { list ->
100+
list + MediaCacheSave(createdCache.origin, createdCache.metadata, engine.engineKey)
101+
}
102+
}
103+
104+
pending.attach(createdCache, resume)
105+
pending
106+
} catch (e: Throwable) {
107+
listFlow.value = listFlow.value.filterNot { it === pending }
108+
createdForCleanup?.closeAndDeleteFiles()
109+
throw e
110+
}
111+
}
112+
}
113+
114+
override suspend fun deleteFirst(predicate: (MediaCache) -> Boolean): Boolean {
115+
return lock.withLock {
116+
super.deleteFirst(predicate)
77117
}
78118
}
79119
}
80120

121+
private class PendingHttpMediaCache(
122+
override val origin: Media,
123+
override val metadata: MediaCacheMetadata,
124+
) : MediaCache {
125+
private val delegate = MutableStateFlow<MediaCache?>(null)
126+
private val desiredState = MutableStateFlow(MediaCacheState.IN_PROGRESS)
127+
override val isDeleted: MutableStateFlow<Boolean> = MutableStateFlow(false)
128+
129+
override val state: Flow<MediaCacheState> = delegate.flatMapLatest {
130+
it?.state ?: desiredState
131+
}
132+
133+
override val canPlay: Flow<Boolean> = delegate.flatMapLatest {
134+
it?.canPlay ?: flowOf(false)
135+
}
136+
137+
override val fileStats: Flow<MediaCache.FileStats> = delegate.flatMapLatest {
138+
it?.fileStats ?: flowOf(MediaCache.FileStats.Unspecified)
139+
}
140+
141+
override val sessionStats: Flow<MediaCache.SessionStats> = delegate.flatMapLatest {
142+
it?.sessionStats ?: flowOf(MediaCache.SessionStats.Unspecified)
143+
}
144+
145+
suspend fun attach(cache: MediaCache, resume: Boolean) {
146+
delegate.value = cache
147+
if (isDeleted.value) {
148+
cache.closeAndDeleteFiles()
149+
return
150+
}
151+
152+
when (desiredState.value) {
153+
MediaCacheState.IN_PROGRESS -> if (resume) cache.resume()
154+
MediaCacheState.PAUSED -> cache.pause()
155+
}
156+
}
157+
158+
override suspend fun getCachedMedia() =
159+
delegate.value?.getCachedMedia()
160+
?: throw IllegalStateException("Cache is still being created")
161+
162+
override suspend fun pause() {
163+
delegate.value?.pause() ?: run {
164+
desiredState.value = MediaCacheState.PAUSED
165+
}
166+
}
167+
168+
override suspend fun close() {
169+
isDeleted.value = true
170+
delegate.value?.close()
171+
}
172+
173+
override suspend fun resume() {
174+
delegate.value?.resume() ?: run {
175+
desiredState.value = MediaCacheState.IN_PROGRESS
176+
}
177+
}
178+
179+
override suspend fun closeAndDeleteFiles() {
180+
isDeleted.value = true
181+
delegate.value?.closeAndDeleteFiles()
182+
}
183+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Copyright (C) 2024-2026 OpenAni and contributors.
3+
*
4+
* 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
5+
* Use of this source code is governed by the GNU AGPLv3 license, which can be found at the following link.
6+
*
7+
* https://github.com/open-ani/ani/blob/main/LICENSE
8+
*/
9+
10+
package me.him188.ani.app.domain.media.cache.storage
11+
12+
import kotlinx.coroutines.CompletableDeferred
13+
import kotlinx.coroutines.flow.Flow
14+
import kotlinx.coroutines.flow.first
15+
import kotlinx.coroutines.flow.flowOf
16+
import kotlinx.coroutines.launch
17+
import kotlinx.coroutines.test.runCurrent
18+
import kotlinx.coroutines.test.runTest
19+
import me.him188.ani.app.data.persistent.MemoryDataStore
20+
import me.him188.ani.app.data.persistent.database.dao.HttpCacheDownloadStateDao
21+
import me.him188.ani.app.domain.media.cache.MediaCache
22+
import me.him188.ani.app.domain.media.cache.MediaCacheState
23+
import me.him188.ani.app.domain.media.cache.TestMediaCache
24+
import me.him188.ani.app.domain.media.cache.engine.MediaCacheEngine
25+
import me.him188.ani.app.domain.media.cache.engine.MediaCacheEngineKey
26+
import me.him188.ani.app.domain.media.cache.engine.MediaStats
27+
import me.him188.ani.app.domain.media.createTestDefaultMedia
28+
import me.him188.ani.app.domain.media.createTestMediaProperties
29+
import me.him188.ani.app.domain.media.resolver.EpisodeMetadata
30+
import me.him188.ani.datasources.api.CachedMedia
31+
import me.him188.ani.datasources.api.EpisodeSort
32+
import me.him188.ani.datasources.api.Media
33+
import me.him188.ani.datasources.api.MediaCacheMetadata
34+
import me.him188.ani.datasources.api.source.MediaSourceKind
35+
import me.him188.ani.datasources.api.source.MediaSourceLocation
36+
import me.him188.ani.datasources.api.topic.EpisodeRange
37+
import me.him188.ani.datasources.api.topic.ResourceLocation
38+
import me.him188.ani.utils.httpdownloader.DownloadId
39+
import me.him188.ani.utils.httpdownloader.DownloadState
40+
import me.him188.ani.utils.httpdownloader.DownloadStatus
41+
import kotlin.coroutines.CoroutineContext
42+
import kotlin.test.Test
43+
import kotlin.test.assertEquals
44+
import kotlin.test.assertFalse
45+
import kotlin.test.assertSame
46+
47+
class HttpMediaCacheStorageTest {
48+
@Test
49+
fun `cache shows placeholder before slow engine creation completes`() = runTest {
50+
val createGate = CompletableDeferred<Unit>()
51+
val metadataStore = MemoryDataStore<List<MediaCacheSave>>(emptyList())
52+
val engine = DelayedHttpCacheEngine(createGate)
53+
val storage = HttpMediaCacheStorage(
54+
mediaSourceId = "local-file-system",
55+
store = metadataStore,
56+
dao = FakeHttpCacheDownloadStateDao,
57+
httpEngine = engine,
58+
displayName = "Test HTTP Storage",
59+
parentCoroutineContext = backgroundScope.coroutineContext,
60+
)
61+
val media = testMedia()
62+
val metadata = testMetadata()
63+
64+
val job = launch {
65+
storage.cache(
66+
media = media,
67+
metadata = metadata,
68+
episodeMetadata = EpisodeMetadata("Episode 1", EpisodeSort(1), EpisodeSort(1)),
69+
resume = false,
70+
)
71+
}
72+
73+
runCurrent()
74+
75+
val pending = storage.listFlow.first().single()
76+
assertSame(media, pending.origin)
77+
assertEquals(metadata, pending.metadata)
78+
assertEquals(MediaCacheState.IN_PROGRESS, pending.state.first())
79+
assertEquals(MediaCache.FileStats.Unspecified, pending.fileStats.first())
80+
assertEquals(MediaCache.SessionStats.Unspecified, pending.sessionStats.first())
81+
assertFalse(pending.canPlay.first())
82+
assertEquals(emptyList(), metadataStore.data.first())
83+
84+
createGate.complete(Unit)
85+
runCurrent()
86+
job.join()
87+
88+
val saved = metadataStore.data.first().single()
89+
assertEquals(media.mediaId, saved.origin.mediaId)
90+
assertEquals(metadata, saved.metadata)
91+
}
92+
}
93+
94+
private class DelayedHttpCacheEngine(
95+
private val createGate: CompletableDeferred<Unit>,
96+
) : MediaCacheEngine {
97+
override val engineKey: MediaCacheEngineKey = MediaCacheEngineKey.WebM3u
98+
override val stats: Flow<MediaStats> = flowOf(MediaStats.Zero)
99+
100+
override fun supports(media: Media): Boolean = true
101+
102+
override suspend fun restore(
103+
origin: Media,
104+
metadata: MediaCacheMetadata,
105+
parentContext: CoroutineContext,
106+
): MediaCache? = null
107+
108+
override suspend fun createCache(
109+
origin: Media,
110+
metadata: MediaCacheMetadata,
111+
episodeMetadata: EpisodeMetadata,
112+
parentContext: CoroutineContext,
113+
): MediaCache {
114+
createGate.await()
115+
return TestMediaCache(
116+
media = CachedMedia(
117+
origin = origin,
118+
cacheMediaSourceId = "local-file-system",
119+
download = ResourceLocation.LocalFile("/tmp/${origin.mediaId}.mp4"),
120+
),
121+
metadata = metadata,
122+
)
123+
}
124+
125+
override suspend fun deleteUnusedCaches(all: List<MediaCache>) = Unit
126+
}
127+
128+
private object FakeHttpCacheDownloadStateDao : HttpCacheDownloadStateDao {
129+
override fun getAll(): Flow<List<DownloadState>> = flowOf(emptyList())
130+
131+
override suspend fun upsert(state: DownloadState) = Unit
132+
133+
override suspend fun updateStatus(id: DownloadId, status: DownloadStatus) = Unit
134+
135+
override suspend fun deleteAll() = Unit
136+
137+
override suspend fun deleteById(id: DownloadId) = Unit
138+
139+
override suspend fun getById(id: DownloadId): DownloadState? = null
140+
}
141+
142+
private fun testMedia(): Media {
143+
return createTestDefaultMedia(
144+
mediaId = "media-1",
145+
mediaSourceId = "source-1",
146+
originalUrl = "https://example.com/media-1",
147+
download = ResourceLocation.HttpStreamingFile("https://example.com/media-1.m3u8"),
148+
originalTitle = "Episode 1",
149+
publishedTime = 1L,
150+
properties = createTestMediaProperties(
151+
subjectName = "Test Subject",
152+
episodeName = "Episode 1",
153+
),
154+
episodeRange = EpisodeRange.single(EpisodeSort(1)),
155+
location = MediaSourceLocation.Online,
156+
kind = MediaSourceKind.WEB,
157+
)
158+
}
159+
160+
private fun testMetadata(): MediaCacheMetadata {
161+
return MediaCacheMetadata(
162+
subjectId = "1",
163+
episodeId = "1",
164+
subjectNameCN = "Test Subject",
165+
subjectNames = listOf("Test Subject"),
166+
episodeSort = EpisodeSort(1),
167+
episodeEp = EpisodeSort(1),
168+
episodeName = "Episode 1",
169+
)
170+
}

0 commit comments

Comments
 (0)