diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt index 7ac40cd3d..13312771a 100644 --- a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt @@ -19,10 +19,12 @@ import com.flipcash.app.persistence.sources.mediator.ChatMessageRemoteMediator import com.flipcash.app.persistence.sources.ChatMessageDataSource import com.flipcash.app.persistence.sources.ChatMetadataDataSource import com.flipcash.app.persistence.sources.ContactDataSource +import com.flipcash.app.persistence.entities.ChatMetadataEntity import com.flipcash.services.controllers.ChatController import com.flipcash.services.controllers.ChatMessagingController import com.flipcash.services.controllers.EventStreamingController import com.flipcash.services.models.chat.ChatId +import com.flipcash.services.models.chat.ChatMetadata import com.flipcash.services.models.chat.ChatMember import com.flipcash.services.models.chat.ChatMessage import com.flipcash.services.models.chat.MessagePointer @@ -54,7 +56,6 @@ import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterNotNull -import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map @@ -96,6 +97,7 @@ class ChatCoordinator @Inject constructor( private var flagObserverJob: Job? = null private var eventStreamCollectJob: Job? = null private var eventStreamRetryJob: Job? = null + private var feedObserverJob: Job? = null private var heartbeatJob: Job? = null private var retryAttempt = 0 private var backgroundedActiveChat: ChatId? = null @@ -135,7 +137,7 @@ class ChatCoordinator @Inject constructor( override suspend fun onUserLoggedIn(cluster: AccountCluster) { trace(tag = TAG, message = "User logged in, hydrating chat", type = TraceType.User) this.cluster.value = cluster - hydrateFromPersistence() + observeFeedFromDb() if (isChatEnabled()) { syncFeed() openEventStream() @@ -241,6 +243,10 @@ class ChatCoordinator @Inject constructor( messagingController.getMessages(chatId) .onSuccess { messages -> messageDataSource.upsert(chatId, messages) + + val latest = messages.maxByOrNull { it.messageId } ?: return@onSuccess + metadataDataSource.updateLastMessageId(chatId, latest.messageId) + metadataDataSource.updateLastActivity(chatId, latest.timestamp.toEpochMilliseconds()) } } @@ -260,20 +266,9 @@ class ChatCoordinator @Inject constructor( messageDataSource.confirmPending(chatId, clientMessageId, serverMessage) advanceReadPointer(chatId, serverMessage.messageId) - // Update feed metadata so the contact list shows the latest message + // Update feed metadata — reactive flow picks up the change metadataDataSource.updateLastMessageId(chatId, serverMessage.messageId) metadataDataSource.updateLastActivity(chatId, serverMessage.timestamp.toEpochMilliseconds()) - _state.update { state -> - val updatedFeed = state.feed.map { meta -> - if (meta.chatId == chatId) { - meta.copy( - lastMessage = serverMessage, - lastActivity = serverMessage.timestamp, - ) - } else meta - } - state.copy(feed = updatedFeed) - } } .onFailure { messageDataSource.failPending(chatId, clientMessageId) @@ -285,7 +280,7 @@ class ChatCoordinator @Inject constructor( IllegalStateException("No account") ) - // Optimistically update local pointer so the feed unread count clears immediately + // Update local pointer — reactive flow updates the feed's unread count val pointer = MessagePointer( type = PointerType.READ, userId = selfId, @@ -293,21 +288,6 @@ class ChatCoordinator @Inject constructor( timestamp = Clock.System.now(), ) memberDataSource.updatePointers(chatId, pointer) - _state.update { state -> - val updatedFeed = state.feed.map { meta -> - if (meta.chatId == chatId) { - meta.copy(members = meta.members.map { member -> - if (member.userId == selfId) { - val updated = member.pointers - .filter { it.type != PointerType.READ } - .plus(pointer) - member.copy(pointers = updated) - } else member - }) - } else meta - } - state.copy(feed = updatedFeed) - } return messagingController.advancePointer(chatId, PointerType.READ, messageId) } @@ -361,6 +341,8 @@ class ChatCoordinator @Inject constructor( closeEventStream() syncJob?.cancel() flagObserverJob?.cancel() + feedObserverJob?.cancel() + feedObserverJob = null _state.value = ChatState() cluster.value = null metadataDataSource.clear() @@ -398,20 +380,29 @@ class ChatCoordinator @Inject constructor( .launchIn(scope) } - private suspend fun hydrateFromPersistence() { - val entities = metadataDataSource.observeAll().firstOrNull() ?: return - if (entities.isEmpty()) return - - val feed = entities.map { entity -> - val members = memberDataSource.getMembersForChat(entity.chatIdHex) + private fun observeFeedFromDb() { + feedObserverJob?.cancel() + feedObserverJob = combine( + metadataDataSource.observeAll(), + memberDataSource.observeAll(), + ) { metadataEntities, membersByChat -> + buildFeedFromDb(metadataEntities, membersByChat) + }.onEach { feed -> + _state.update { it.copy(feed = feed) } + }.launchIn(scope) + } + + private suspend fun buildFeedFromDb( + metadataEntities: List, + membersByChat: Map>, + ): List { + return metadataEntities.map { entity -> + val members = membersByChat[entity.chatIdHex] ?: emptyList() val lastMessage = entity.lastMessageId?.let { messageDataSource.getLatest(entity.chatIdHex) } metadataDataSource.toMetadata(entity, members, lastMessage) } - - _state.update { it.copy(feed = feed) } - trace(tag = TAG, message = "Hydrated ${feed.size} chats from persistence", type = TraceType.Process) } private fun syncFeed() { @@ -427,9 +418,12 @@ class ChatCoordinator @Inject constructor( for (chat in page.chats) { memberDataSource.upsert(chat.chatId, chat.members) + chat.lastMessage?.let { msg -> + messageDataSource.upsert(chat.chatId, listOf(msg)) + } } - _state.update { it.copy(feed = page.chats, feedSyncState = FeedSyncState.Synced) } + _state.update { it.copy(feedSyncState = FeedSyncState.Synced) } trace(tag = TAG, message = "Feed synced: ${page.chats.size} chats", type = TraceType.Process) // Prefetch first page of messages for chats with no cached messages @@ -534,6 +528,9 @@ class ChatCoordinator @Inject constructor( metadataDataSource.upsert(metaUpdate.metadata) memberDataSource.deleteForChat(metaUpdate.metadata.chatId) memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members) + metaUpdate.metadata.lastMessage?.let { msg -> + messageDataSource.upsert(metaUpdate.metadata.chatId, listOf(msg)) + } } is MetadataUpdate.LastActivityChanged -> { metadataDataSource.updateLastActivity( @@ -544,81 +541,26 @@ class ChatCoordinator @Inject constructor( } } - // --- Single atomic state update for all in-memory changes --- - - var needsFeedSync = false - - _state.update { state -> - var feed = state.feed - var typingIndicators = state.typingIndicators - - // New messages → update feed last message - if (lastMsg != null) { - val exists = feed.any { it.chatId == chatId } - feed = if (exists) { - feed.map { meta -> - if (meta.chatId == chatId) { - meta.copy( - lastMessage = lastMsg, - lastActivity = lastMsg.timestamp, - ) - } else meta - } - } else { - needsFeedSync = true - feed - } - } + // --- Check if unknown chat requires a full feed sync --- - // Pointer updates → merge into member pointers - if (update.pointerUpdates.isNotEmpty()) { - feed = feed.map { meta -> - if (meta.chatId == chatId) { - meta.copy(members = meta.members.map { member -> - val memberPointerUpdates = update.pointerUpdates - .filter { it.userId == member.userId } - if (memberPointerUpdates.isNotEmpty()) { - val updated = member.pointers.toMutableList() - for (p in memberPointerUpdates) { - updated.removeAll { it.type == p.type } - updated.add(p) - } - member.copy(pointers = updated) - } else member - }) - } else meta - } + if (lastMsg != null) { + if (!metadataDataSource.exists(chatId)) { + syncFeed() } + } - // Typing notifications (ephemeral) - if (update.typingNotifications.isNotEmpty()) { - val currentTypists = typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf() + // --- Update ephemeral state (typing indicators are not DB-backed) --- + + if (update.typingNotifications.isNotEmpty()) { + _state.update { state -> + val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf() for (notification in update.typingNotifications) { applyTypingNotification(currentTypists, notification) } - typingIndicators = typingIndicators + (chatId to currentTypists.toSet()) - } - - // Metadata full refreshes - for (metaUpdate in update.metadataUpdates) { - if (metaUpdate is MetadataUpdate.FullRefresh) { - val exists = feed.any { it.chatId == metaUpdate.metadata.chatId } - feed = if (exists) { - feed.map { - if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it - } - } else { - feed + metaUpdate.metadata - } - } + state.copy( + typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet()) + ) } - - state.copy(feed = feed, typingIndicators = typingIndicators) - } - - // Side effects after state update - if (needsFeedSync) { - syncFeed() } } diff --git a/apps/flipcash/shared/persistence/db/src/main/kotlin/com/flipcash/app/persistence/dao/ChatMemberDao.kt b/apps/flipcash/shared/persistence/db/src/main/kotlin/com/flipcash/app/persistence/dao/ChatMemberDao.kt index 552dcf2b0..666b1c99a 100644 --- a/apps/flipcash/shared/persistence/db/src/main/kotlin/com/flipcash/app/persistence/dao/ChatMemberDao.kt +++ b/apps/flipcash/shared/persistence/db/src/main/kotlin/com/flipcash/app/persistence/dao/ChatMemberDao.kt @@ -16,6 +16,9 @@ interface ChatMemberDao { @Query("SELECT * FROM chat_members WHERE chat_id_hex = :chatIdHex") fun observeMembersForChat(chatIdHex: String): Flow> + @Query("SELECT * FROM chat_members") + fun observeAll(): Flow> + @Insert(onConflict = OnConflictStrategy.REPLACE) suspend fun upsert(entity: ChatMemberEntity) diff --git a/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMemberDataSource.kt b/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMemberDataSource.kt index 273da49f7..8d33e9be3 100644 --- a/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMemberDataSource.kt +++ b/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMemberDataSource.kt @@ -24,6 +24,12 @@ class ChatMemberDataSource @Inject constructor( entities.map { mapper.toMember(it) } } ?: emptyFlow() + fun observeAll(): Flow>> = + db?.chatMemberDao()?.observeAll()?.map { entities -> + entities.groupBy { it.chatIdHex } + .mapValues { (_, members) -> members.map { mapper.toMember(it) } } + } ?: emptyFlow() + suspend fun getMembersForChat(chatId: ChatId): List = getMembersForChat(mapper.chatIdHex(chatId)) diff --git a/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMetadataDataSource.kt b/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMetadataDataSource.kt index 0f6c46575..520f24b3d 100644 --- a/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMetadataDataSource.kt +++ b/apps/flipcash/shared/persistence/sources/src/main/kotlin/com/flipcash/app/persistence/sources/ChatMetadataDataSource.kt @@ -40,6 +40,9 @@ class ChatMetadataDataSource @Inject constructor( db?.chatMetadataDao()?.updateLastMessageId(mapper.chatIdHex(chatId), messageId) } + suspend fun exists(chatId: ChatId): Boolean = + db?.chatMetadataDao()?.getById(mapper.chatIdHex(chatId)) != null + fun toMetadata( entity: ChatMetadataEntity, members: List,