Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import com.flipcash.app.persistence.sources.mediator.ChatMessageRemoteMediator
import com.flipcash.app.persistence.sources.ChatMessageDataSource
import com.flipcash.app.persistence.sources.ChatMetadataDataSource
import com.flipcash.app.persistence.sources.ContactDataSource
import com.flipcash.app.persistence.entities.ChatMetadataEntity
import com.flipcash.services.controllers.ChatController
import com.flipcash.services.controllers.ChatMessagingController
import com.flipcash.services.controllers.EventStreamingController
import com.flipcash.services.models.chat.ChatId
import com.flipcash.services.models.chat.ChatMetadata
import com.flipcash.services.models.chat.ChatMember
import com.flipcash.services.models.chat.ChatMessage
import com.flipcash.services.models.chat.MessagePointer
Expand Down Expand Up @@ -54,7 +56,6 @@ import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
Expand Down Expand Up @@ -96,6 +97,7 @@ class ChatCoordinator @Inject constructor(
private var flagObserverJob: Job? = null
private var eventStreamCollectJob: Job? = null
private var eventStreamRetryJob: Job? = null
private var feedObserverJob: Job? = null
private var heartbeatJob: Job? = null
private var retryAttempt = 0
private var backgroundedActiveChat: ChatId? = null
Expand Down Expand Up @@ -135,7 +137,7 @@ class ChatCoordinator @Inject constructor(
override suspend fun onUserLoggedIn(cluster: AccountCluster) {
trace(tag = TAG, message = "User logged in, hydrating chat", type = TraceType.User)
this.cluster.value = cluster
hydrateFromPersistence()
observeFeedFromDb()
if (isChatEnabled()) {
syncFeed()
openEventStream()
Expand Down Expand Up @@ -241,6 +243,10 @@ class ChatCoordinator @Inject constructor(
messagingController.getMessages(chatId)
.onSuccess { messages ->
messageDataSource.upsert(chatId, messages)

val latest = messages.maxByOrNull { it.messageId } ?: return@onSuccess
metadataDataSource.updateLastMessageId(chatId, latest.messageId)
metadataDataSource.updateLastActivity(chatId, latest.timestamp.toEpochMilliseconds())
}
}

Expand All @@ -260,20 +266,9 @@ class ChatCoordinator @Inject constructor(
messageDataSource.confirmPending(chatId, clientMessageId, serverMessage)
advanceReadPointer(chatId, serverMessage.messageId)

// Update feed metadata so the contact list shows the latest message
// Update feed metadata — reactive flow picks up the change
metadataDataSource.updateLastMessageId(chatId, serverMessage.messageId)
metadataDataSource.updateLastActivity(chatId, serverMessage.timestamp.toEpochMilliseconds())
_state.update { state ->
val updatedFeed = state.feed.map { meta ->
if (meta.chatId == chatId) {
meta.copy(
lastMessage = serverMessage,
lastActivity = serverMessage.timestamp,
)
} else meta
}
state.copy(feed = updatedFeed)
}
}
.onFailure {
messageDataSource.failPending(chatId, clientMessageId)
Expand All @@ -285,29 +280,14 @@ class ChatCoordinator @Inject constructor(
IllegalStateException("No account")
)

// Optimistically update local pointer so the feed unread count clears immediately
// Update local pointer — reactive flow updates the feed's unread count
val pointer = MessagePointer(
type = PointerType.READ,
userId = selfId,
value = messageId,
timestamp = Clock.System.now(),
)
memberDataSource.updatePointers(chatId, pointer)
_state.update { state ->
val updatedFeed = state.feed.map { meta ->
if (meta.chatId == chatId) {
meta.copy(members = meta.members.map { member ->
if (member.userId == selfId) {
val updated = member.pointers
.filter { it.type != PointerType.READ }
.plus(pointer)
member.copy(pointers = updated)
} else member
})
} else meta
}
state.copy(feed = updatedFeed)
}

return messagingController.advancePointer(chatId, PointerType.READ, messageId)
}
Expand Down Expand Up @@ -361,6 +341,8 @@ class ChatCoordinator @Inject constructor(
closeEventStream()
syncJob?.cancel()
flagObserverJob?.cancel()
feedObserverJob?.cancel()
feedObserverJob = null
_state.value = ChatState()
cluster.value = null
metadataDataSource.clear()
Expand Down Expand Up @@ -398,20 +380,29 @@ class ChatCoordinator @Inject constructor(
.launchIn(scope)
}

private suspend fun hydrateFromPersistence() {
val entities = metadataDataSource.observeAll().firstOrNull() ?: return
if (entities.isEmpty()) return

val feed = entities.map { entity ->
val members = memberDataSource.getMembersForChat(entity.chatIdHex)
private fun observeFeedFromDb() {
feedObserverJob?.cancel()
feedObserverJob = combine(
metadataDataSource.observeAll(),
memberDataSource.observeAll(),
) { metadataEntities, membersByChat ->
buildFeedFromDb(metadataEntities, membersByChat)
}.onEach { feed ->
_state.update { it.copy(feed = feed) }
}.launchIn(scope)
}

private suspend fun buildFeedFromDb(
metadataEntities: List<ChatMetadataEntity>,
membersByChat: Map<String, List<ChatMember>>,
): List<ChatMetadata> {
return metadataEntities.map { entity ->
val members = membersByChat[entity.chatIdHex] ?: emptyList()
val lastMessage = entity.lastMessageId?.let {
messageDataSource.getLatest(entity.chatIdHex)
}
metadataDataSource.toMetadata(entity, members, lastMessage)
}

_state.update { it.copy(feed = feed) }
trace(tag = TAG, message = "Hydrated ${feed.size} chats from persistence", type = TraceType.Process)
}

private fun syncFeed() {
Expand All @@ -427,9 +418,12 @@ class ChatCoordinator @Inject constructor(

for (chat in page.chats) {
memberDataSource.upsert(chat.chatId, chat.members)
chat.lastMessage?.let { msg ->
messageDataSource.upsert(chat.chatId, listOf(msg))
}
}

_state.update { it.copy(feed = page.chats, feedSyncState = FeedSyncState.Synced) }
_state.update { it.copy(feedSyncState = FeedSyncState.Synced) }
trace(tag = TAG, message = "Feed synced: ${page.chats.size} chats", type = TraceType.Process)

// Prefetch first page of messages for chats with no cached messages
Expand Down Expand Up @@ -534,6 +528,9 @@ class ChatCoordinator @Inject constructor(
metadataDataSource.upsert(metaUpdate.metadata)
memberDataSource.deleteForChat(metaUpdate.metadata.chatId)
memberDataSource.upsert(metaUpdate.metadata.chatId, metaUpdate.metadata.members)
metaUpdate.metadata.lastMessage?.let { msg ->
messageDataSource.upsert(metaUpdate.metadata.chatId, listOf(msg))
}
}
is MetadataUpdate.LastActivityChanged -> {
metadataDataSource.updateLastActivity(
Expand All @@ -544,81 +541,26 @@ class ChatCoordinator @Inject constructor(
}
}

// --- Single atomic state update for all in-memory changes ---

var needsFeedSync = false

_state.update { state ->
var feed = state.feed
var typingIndicators = state.typingIndicators

// New messages → update feed last message
if (lastMsg != null) {
val exists = feed.any { it.chatId == chatId }
feed = if (exists) {
feed.map { meta ->
if (meta.chatId == chatId) {
meta.copy(
lastMessage = lastMsg,
lastActivity = lastMsg.timestamp,
)
} else meta
}
} else {
needsFeedSync = true
feed
}
}
// --- Check if unknown chat requires a full feed sync ---

// Pointer updates → merge into member pointers
if (update.pointerUpdates.isNotEmpty()) {
feed = feed.map { meta ->
if (meta.chatId == chatId) {
meta.copy(members = meta.members.map { member ->
val memberPointerUpdates = update.pointerUpdates
.filter { it.userId == member.userId }
if (memberPointerUpdates.isNotEmpty()) {
val updated = member.pointers.toMutableList()
for (p in memberPointerUpdates) {
updated.removeAll { it.type == p.type }
updated.add(p)
}
member.copy(pointers = updated)
} else member
})
} else meta
}
if (lastMsg != null) {
if (!metadataDataSource.exists(chatId)) {
syncFeed()
}
}

// Typing notifications (ephemeral)
if (update.typingNotifications.isNotEmpty()) {
val currentTypists = typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf()
// --- Update ephemeral state (typing indicators are not DB-backed) ---

if (update.typingNotifications.isNotEmpty()) {
_state.update { state ->
val currentTypists = state.typingIndicators[chatId]?.toMutableSet() ?: mutableSetOf()
for (notification in update.typingNotifications) {
applyTypingNotification(currentTypists, notification)
}
typingIndicators = typingIndicators + (chatId to currentTypists.toSet())
}

// Metadata full refreshes
for (metaUpdate in update.metadataUpdates) {
if (metaUpdate is MetadataUpdate.FullRefresh) {
val exists = feed.any { it.chatId == metaUpdate.metadata.chatId }
feed = if (exists) {
feed.map {
if (it.chatId == metaUpdate.metadata.chatId) metaUpdate.metadata else it
}
} else {
feed + metaUpdate.metadata
}
}
state.copy(
typingIndicators = state.typingIndicators + (chatId to currentTypists.toSet())
)
}

state.copy(feed = feed, typingIndicators = typingIndicators)
}

// Side effects after state update
if (needsFeedSync) {
syncFeed()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ interface ChatMemberDao {
@Query("SELECT * FROM chat_members WHERE chat_id_hex = :chatIdHex")
fun observeMembersForChat(chatIdHex: String): Flow<List<ChatMemberEntity>>

@Query("SELECT * FROM chat_members")
fun observeAll(): Flow<List<ChatMemberEntity>>

@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun upsert(entity: ChatMemberEntity)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class ChatMemberDataSource @Inject constructor(
entities.map { mapper.toMember(it) }
} ?: emptyFlow()

fun observeAll(): Flow<Map<String, List<ChatMember>>> =
db?.chatMemberDao()?.observeAll()?.map { entities ->
entities.groupBy { it.chatIdHex }
.mapValues { (_, members) -> members.map { mapper.toMember(it) } }
} ?: emptyFlow()

suspend fun getMembersForChat(chatId: ChatId): List<ChatMember> =
getMembersForChat(mapper.chatIdHex(chatId))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class ChatMetadataDataSource @Inject constructor(
db?.chatMetadataDao()?.updateLastMessageId(mapper.chatIdHex(chatId), messageId)
}

suspend fun exists(chatId: ChatId): Boolean =
db?.chatMetadataDao()?.getById(mapper.chatIdHex(chatId)) != null

fun toMetadata(
entity: ChatMetadataEntity,
members: List<ChatMember>,
Expand Down
Loading