Handle async cache in updates and manga screens

- Also fix concurrent accesses to main cache map
- Also debounce sources and updates list updates to maybe avoid crashing due to dupe LazyColumn keys
This commit is contained in:
arkon 2022-10-22 10:50:44 -04:00
parent d558f9e1d6
commit 152eb5b951
7 changed files with 64 additions and 69 deletions

View file

@ -23,6 +23,7 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.withTimeout
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
/**
@ -305,9 +306,9 @@ class DownloadCache(
* Returns a new map containing only the key entries of [transform] that are not null.
*/
private inline fun <K, V, R> Map<out K, V>.mapNotNullKeys(transform: (Map.Entry<K?, V>) -> R?): MutableMap<R, V> {
val destination = LinkedHashMap<R, V>()
forEach { element -> transform(element)?.let { destination[it] = element.value } }
return destination
val mutableMap = ConcurrentHashMap<R, V>()
forEach { element -> transform(element)?.let { mutableMap[it] = element.value } }
return mutableMap
}
}

View file

@ -78,13 +78,13 @@ class DownloadQueue(
.startWith(getActiveDownloads())
.onBackpressureBuffer()
fun getStatusAsFlow(): Flow<Download> = getStatusObservable().asFlow()
fun statusFlow(): Flow<Download> = getStatusObservable().asFlow()
private fun getUpdatedObservable(): Observable<List<Download>> = updatedRelay.onBackpressureBuffer()
.startWith(Unit)
.map { this }
fun getUpdatedAsFlow(): Flow<List<Download>> = getUpdatedObservable().asFlow()
fun updatedFlow(): Flow<List<Download>> = getUpdatedObservable().asFlow()
private fun setPagesFor(download: Download) {
if (download.status == Download.State.DOWNLOADED || download.status == Download.State.ERROR) {
@ -111,7 +111,7 @@ class DownloadQueue(
.filter { it.status == Download.State.DOWNLOADING }
}
fun getProgressAsFlow(): Flow<Download> = getProgressObservable().asFlow()
fun progressFlow(): Flow<Download> = getProgressObservable().asFlow()
private fun setPagesSubject(pages: List<Page>?, subject: PublishSubject<Int>?) {
pages?.forEach { it.setStatusSubject(subject) }

View file

@ -16,6 +16,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.receiveAsFlow
import logcat.LogPriority
import uy.kohesive.injekt.Injekt
@ -38,6 +39,7 @@ class SourcesPresenter(
fun onCreate() {
presenterScope.launchIO {
getEnabledSources.subscribe()
.debounce(500) // Avoid crashes due to LazyColumn rendering
.catch { exception ->
logcat(LogPriority.ERROR, exception)
_events.send(Event.FailedFetchingSources)

View file

@ -34,7 +34,7 @@ class DownloadPresenter : BasePresenter<DownloadController>() {
super.onCreate(savedState)
presenterScope.launch {
downloadQueue.getUpdatedAsFlow()
downloadQueue.updatedFlow()
.catch { error -> logcat(LogPriority.ERROR, error) }
.map { downloads ->
downloads
@ -49,9 +49,9 @@ class DownloadPresenter : BasePresenter<DownloadController>() {
}
}
fun getDownloadStatusFlow() = downloadQueue.getStatusAsFlow()
fun getDownloadStatusFlow() = downloadQueue.statusFlow()
fun getDownloadProgressFlow() = downloadQueue.getProgressAsFlow()
fun getDownloadProgressFlow() = downloadQueue.progressFlow()
/**
* Pauses the download queue.

View file

@ -34,6 +34,7 @@ import eu.kanade.domain.track.model.toDomainTrack
import eu.kanade.domain.ui.UiPreferences
import eu.kanade.tachiyomi.R
import eu.kanade.tachiyomi.data.database.models.Track
import eu.kanade.tachiyomi.data.download.DownloadCache
import eu.kanade.tachiyomi.data.download.DownloadManager
import eu.kanade.tachiyomi.data.download.model.Download
import eu.kanade.tachiyomi.data.track.EnhancedTrackService
@ -63,6 +64,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
@ -91,6 +93,7 @@ class MangaPresenter(
private val trackManager: TrackManager = Injekt.get(),
private val sourceManager: SourceManager = Injekt.get(),
private val downloadManager: DownloadManager = Injekt.get(),
private val downloadCache: DownloadCache = Injekt.get(),
private val getMangaAndChapters: GetMangaWithChapters = Injekt.get(),
private val getDuplicateLibraryManga: GetDuplicateLibraryManga = Injekt.get(),
private val setMangaChapterFlags: SetMangaChapterFlags = Injekt.get(),
@ -113,9 +116,6 @@ class MangaPresenter(
private val successState: MangaScreenState.Success?
get() = state.value as? MangaScreenState.Success
private var observeDownloadsStatusJob: Job? = null
private var observeDownloadsPageJob: Job? = null
private var _trackList: List<TrackItem> = emptyList()
val trackList get() = _trackList
@ -169,10 +169,11 @@ class MangaPresenter(
)
}
// For UI changes
presenterScope.launch {
getMangaAndChapters.subscribe(mangaId)
.distinctUntilChanged()
presenterScope.launchIO {
combine(
getMangaAndChapters.subscribe(mangaId).distinctUntilChanged(),
downloadCache.changes,
) { mangaAndChapters, _ -> mangaAndChapters }
.collectLatest { (manga, chapters) ->
val chapterItems = chapters.toChapterItemsParams(manga)
updateSuccessState {
@ -181,20 +182,11 @@ class MangaPresenter(
chapters = chapterItems,
)
}
observeDownloads()
}
}
basePreferences.incognitoMode()
.asHotFlow { incognitoMode = it }
.launchIn(presenterScope)
observeDownloads()
basePreferences.downloadedOnly()
.asHotFlow { downloadedOnlyMode = it }
.launchIn(presenterScope)
// This block runs once on create
presenterScope.launchIO {
val manga = getMangaAndChapters.awaitManga(mangaId)
val chapters = getMangaAndChapters.awaitChapters(mangaId)
@ -207,7 +199,7 @@ class MangaPresenter(
val needRefreshInfo = !manga.initialized
val needRefreshChapter = chapters.isEmpty()
// Show what we have earlier.
// Show what we have earlier
_state.update {
MangaScreenState.Success(
manga = manga,
@ -238,6 +230,14 @@ class MangaPresenter(
// Initial loading finished
updateSuccessState { it.copy(isRefreshingData = false) }
}
basePreferences.incognitoMode()
.asHotFlow { incognitoMode = it }
.launchIn(presenterScope)
basePreferences.downloadedOnly()
.asHotFlow { downloadedOnlyMode = it }
.launchIn(presenterScope)
}
fun fetchAllFromSource(manualFetch: Boolean = true) {
@ -467,9 +467,8 @@ class MangaPresenter(
// Chapters list - start
private fun observeDownloads() {
observeDownloadsStatusJob?.cancel()
observeDownloadsStatusJob = presenterScope.launchIO {
downloadManager.queue.getStatusAsFlow()
presenterScope.launchIO {
downloadManager.queue.statusFlow()
.filter { it.manga.id == successState?.manga?.id }
.catch { error -> logcat(LogPriority.ERROR, error) }
.collect {
@ -479,9 +478,8 @@ class MangaPresenter(
}
}
observeDownloadsPageJob?.cancel()
observeDownloadsPageJob = presenterScope.launchIO {
downloadManager.queue.getProgressAsFlow()
presenterScope.launchIO {
downloadManager.queue.progressFlow()
.filter { it.manga.id == successState?.manga?.id }
.catch { error -> logcat(LogPriority.ERROR, error) }
.collect {

View file

@ -32,7 +32,7 @@ class MorePresenter(
presenterScope.launchIO {
combine(
DownloadService.isRunning,
downloadManager.queue.getUpdatedAsFlow(),
downloadManager.queue.updatedFlow(),
) { isRunning, downloadQueue -> Pair(isRunning, downloadQueue.size) }
.collectLatest { (isDownloading, downloadQueueSize) ->
val pendingDownloadExists = downloadQueueSize != 0

View file

@ -18,6 +18,7 @@ import eu.kanade.domain.updates.model.UpdatesWithRelations
import eu.kanade.presentation.components.ChapterDownloadAction
import eu.kanade.presentation.updates.UpdatesState
import eu.kanade.presentation.updates.UpdatesStateImpl
import eu.kanade.tachiyomi.data.download.DownloadCache
import eu.kanade.tachiyomi.data.download.DownloadManager
import eu.kanade.tachiyomi.data.download.DownloadService
import eu.kanade.tachiyomi.data.download.model.Download
@ -27,11 +28,12 @@ import eu.kanade.tachiyomi.util.lang.launchIO
import eu.kanade.tachiyomi.util.lang.launchNonCancellable
import eu.kanade.tachiyomi.util.lang.withUIContext
import eu.kanade.tachiyomi.util.system.logcat
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
@ -50,6 +52,7 @@ class UpdatesPresenter(
private val getManga: GetManga = Injekt.get(),
private val sourceManager: SourceManager = Injekt.get(),
private val downloadManager: DownloadManager = Injekt.get(),
private val downloadCache: DownloadCache = Injekt.get(),
private val getChapter: GetChapter = Injekt.get(),
basePreferences: BasePreferences = Injekt.get(),
uiPreferences: UiPreferences = Injekt.get(),
@ -70,12 +73,6 @@ class UpdatesPresenter(
// First and last selected index in list
private val selectedPositions: Array<Int> = arrayOf(-1, -1)
/**
* Subscription to observe download status changes.
*/
private var observeDownloadsStatusJob: Job? = null
private var observeDownloadsPageJob: Job? = null
override fun onCreate(savedState: Bundle?) {
super.onCreate(savedState)
@ -86,10 +83,11 @@ class UpdatesPresenter(
add(Calendar.MONTH, -3)
}
observeDownloads()
getUpdates.subscribe(calendar)
.distinctUntilChanged()
combine(
getUpdates.subscribe(calendar).distinctUntilChanged(),
downloadCache.changes,
) { updates, _ -> updates }
.debounce(500) // Avoid crashes due to LazyColumn rendering
.catch {
logcat(LogPriority.ERROR, it)
_events.send(Event.InternalError)
@ -99,6 +97,26 @@ class UpdatesPresenter(
state.isLoading = false
}
}
presenterScope.launchIO {
downloadManager.queue.statusFlow()
.catch { error -> logcat(LogPriority.ERROR, error) }
.collect {
withUIContext {
updateDownloadState(it)
}
}
}
presenterScope.launchIO {
downloadManager.queue.progressFlow()
.catch { error -> logcat(LogPriority.ERROR, error) }
.collect {
withUIContext {
updateDownloadState(it)
}
}
}
}
private fun List<UpdatesWithRelations>.toUpdateItems(): List<UpdatesItem> {
@ -125,30 +143,6 @@ class UpdatesPresenter(
}
}
private suspend fun observeDownloads() {
observeDownloadsStatusJob?.cancel()
observeDownloadsStatusJob = presenterScope.launchIO {
downloadManager.queue.getStatusAsFlow()
.catch { error -> logcat(LogPriority.ERROR, error) }
.collect {
withUIContext {
updateDownloadState(it)
}
}
}
observeDownloadsPageJob?.cancel()
observeDownloadsPageJob = presenterScope.launchIO {
downloadManager.queue.getProgressAsFlow()
.catch { error -> logcat(LogPriority.ERROR, error) }
.collect {
withUIContext {
updateDownloadState(it)
}
}
}
}
/**
* Update status of chapters.
*