diff --git a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt index 13312771a..a7de36d1f 100644 --- a/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt +++ b/apps/flipcash/shared/chat/src/main/kotlin/com/flipcash/shared/chat/ChatCoordinator.kt @@ -65,7 +65,6 @@ import kotlinx.coroutines.launch import javax.inject.Inject import javax.inject.Singleton import kotlin.time.Clock -import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds @Singleton @@ -86,8 +85,6 @@ class ChatCoordinator @Inject constructor( companion object { private const val TAG = "ChatCoordinator" private val HEARTBEAT_INTERVAL = 30.seconds - private const val RETRY_BASE_MS = 2_000L - private const val RETRY_MAX_MS = 60_000L } private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) @@ -96,10 +93,8 @@ class ChatCoordinator @Inject constructor( private var syncJob: Job? = null private var flagObserverJob: Job? = null private var eventStreamCollectJob: Job? = null - private var eventStreamRetryJob: Job? = null private var feedObserverJob: Job? = null private var heartbeatJob: Job? = null - private var retryAttempt = 0 private var backgroundedActiveChat: ChatId? = null val state: StateFlow @@ -138,11 +133,9 @@ class ChatCoordinator @Inject constructor( trace(tag = TAG, message = "User logged in, hydrating chat", type = TraceType.User) this.cluster.value = cluster observeFeedFromDb() - if (isChatEnabled()) { - syncFeed() - openEventStream() - startHeartbeat() - } + syncFeed() + openEventStream() + startHeartbeat() observeFeatureFlag() } @@ -437,30 +430,14 @@ class ChatCoordinator @Inject constructor( } } - private fun openEventStream(force: Boolean = false) { - if (!force && eventStreamingController.isConnected) { + private fun openEventStream() { + if (eventStreamingController.isConnected) { trace(tag = TAG, message = "Event stream already connected, skipping open", type = TraceType.Process) ensureCollector() return } - eventStreamRetryJob?.cancel() - val opened = eventStreamingController.open(scope) { - // Stream died — schedule a re-open with exponential backoff - val attempt = retryAttempt++ - val delayMs = (RETRY_BASE_MS * (1L shl attempt.coerceAtMost(5))) - .coerceAtMost(RETRY_MAX_MS) - trace(tag = TAG, message = "Event stream error, retry #${attempt + 1} in ${delayMs}ms", type = TraceType.Process) - eventStreamRetryJob = scope.launch { - delay(delayMs.milliseconds) - openEventStream(force = true) - } - } - if (opened) { - retryAttempt = 0 - } else { - trace(tag = TAG, message = "Event stream failed to open", type = TraceType.Error) - } + eventStreamingController.open(scope) ensureCollector() } @@ -473,8 +450,6 @@ class ChatCoordinator @Inject constructor( } private fun closeEventStream() { - eventStreamRetryJob?.cancel() - eventStreamRetryJob = null eventStreamCollectJob?.cancel() eventStreamCollectJob = null eventStreamingController.close() @@ -485,9 +460,11 @@ class ChatCoordinator @Inject constructor( heartbeatJob = scope.launch { while (true) { delay(HEARTBEAT_INTERVAL) - if (!eventStreamingController.isConnected) { + if (!eventStreamingController.isStreamActive) { trace(tag = TAG, message = "Heartbeat: event stream dead, syncing feed and reconnecting", type = TraceType.Process) syncFeed() + // Close the dead ref so open() creates a fresh one + eventStreamingController.close() openEventStream() } } diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt index 47607a2a9..9bac417ea 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/controllers/EventStreamingController.kt @@ -22,19 +22,21 @@ class EventStreamingController @Inject constructor( private var streamRef: EventStreamReference? = null - val isConnected: Boolean get() = streamRef?.isActive == true + val isConnected: Boolean get() = streamRef != null + + val isStreamActive: Boolean get() = streamRef?.isActive == true + + fun open(scope: CoroutineScope): Boolean { + if (streamRef != null) { + trace("EventStreamingController: Stream already open, skipping") + return true + } - fun open( - scope: CoroutineScope, - onStreamError: (() -> Unit)? = null, - ): Boolean { val owner = userManager.accountCluster?.authority?.keyPair ?: run { trace("EventStreamingController: No account cluster, cannot open stream") return false } - close() - streamRef = repository.openEventStream( scope = scope, owner = owner, @@ -44,7 +46,10 @@ class EventStreamingController @Inject constructor( }, onError = { error -> trace("EventStreamingController: Stream error: ${error.message}") - onStreamError?.invoke() + // Clear the ref so the next heartbeat, lifecycle, or network + // event creates a fresh stream. The framework guarantees this + // fires only once per stream, so no risk of clearing a newer ref. + streamRef = null }, ) return true diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt index 26cf1332e..b87894bfd 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/inject/FlipcashModule.kt @@ -2,6 +2,7 @@ package com.flipcash.services.inject import android.content.Context import com.flipcash.services.internal.annotations.FlipcashManagedChannel +import com.flipcash.services.internal.annotations.FlipcashManagedStreamingChannel import com.flipcash.services.internal.annotations.FlipcashProtocol import com.flipcash.services.internal.domain.ActivityFeedMessageMapper import com.flipcash.services.internal.domain.ImageModerationResponseMapper @@ -97,6 +98,25 @@ internal object FlipcashModule { @ApplicationContext context: Context, @FlipcashProtocol config: ProtocolConfig, + ): ManagedChannel { + return AndroidChannelBuilder + .usingBuilder(OkHttpChannelBuilder.forAddress(config.baseUrl, config.port)) + .context(context) + .userAgent(config.userAgent) + .keepAliveWithoutCalls(false) + .idleTimeout(5, TimeUnit.MINUTES) + .intercept(LoggingClientInterceptor()) + .build() + .also { observeChannelState("flipcash", it) } + } + + @Singleton + @Provides + @FlipcashManagedStreamingChannel + fun provideManagedStreamingChannel( + @ApplicationContext context: Context, + @FlipcashProtocol + config: ProtocolConfig, ): ManagedChannel { return AndroidChannelBuilder .usingBuilder(OkHttpChannelBuilder.forAddress(config.baseUrl, config.port)) @@ -107,7 +127,7 @@ internal object FlipcashModule { .keepAliveWithoutCalls(true) .intercept(LoggingClientInterceptor()) .build() - .also { observeChannelState("flipcash", it) } + .also { observeChannelState("flipcash-stream", it) } } private fun observeChannelState(name: String, channel: ManagedChannel) { diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/annotations/FlipcashManagedChannel.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/annotations/FlipcashManagedChannel.kt index 0bb40bd59..ee14eb48c 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/annotations/FlipcashManagedChannel.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/annotations/FlipcashManagedChannel.kt @@ -10,4 +10,14 @@ import javax.inject.Qualifier AnnotationTarget.VALUE_PARAMETER, AnnotationTarget.FIELD ) -annotation class FlipcashManagedChannel \ No newline at end of file +annotation class FlipcashManagedChannel + +@Qualifier +@Target( + AnnotationTarget.FUNCTION, + AnnotationTarget.PROPERTY_GETTER, + AnnotationTarget.PROPERTY_SETTER, + AnnotationTarget.VALUE_PARAMETER, + AnnotationTarget.FIELD +) +annotation class FlipcashManagedStreamingChannel \ No newline at end of file diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt index 0b98db63d..02589428a 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/api/EventStreamingApi.kt @@ -2,7 +2,7 @@ package com.flipcash.services.internal.network.api import com.codeinc.flipcash.gen.events.v1.EventStreamingGrpcKt import com.codeinc.flipcash.gen.events.v1.EventStreamingService -import com.flipcash.services.internal.annotations.FlipcashManagedChannel +import com.flipcash.services.internal.annotations.FlipcashManagedStreamingChannel import com.getcode.opencode.internal.network.core.GrpcApi import io.grpc.ManagedChannel import kotlinx.coroutines.flow.Flow @@ -11,12 +11,11 @@ import javax.inject.Singleton @Singleton internal class EventStreamingApi @Inject constructor( - @FlipcashManagedChannel + @FlipcashManagedStreamingChannel managedChannel: ManagedChannel, ) : GrpcApi(managedChannel) { private val api = EventStreamingGrpcKt.EventStreamingCoroutineStub(managedChannel) - .withWaitForReady() fun streamEvents( requests: Flow, diff --git a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt index 2de483795..126b0296a 100644 --- a/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt +++ b/services/flipcash/src/main/kotlin/com/flipcash/services/internal/network/services/EventStreamingService.kt @@ -15,6 +15,7 @@ import com.getcode.utils.trace import com.google.protobuf.Timestamp import kotlin.time.Clock import kotlinx.coroutines.CoroutineScope +import kotlinx.datetime.Instant import kotlinx.coroutines.launch import javax.inject.Inject @@ -29,15 +30,14 @@ internal class EventStreamingService @Inject constructor( onEvent: (ChatUpdate) -> Unit, onError: (Throwable) -> Unit = {}, ): EventStreamReference { - trace("EventStream Opening stream.") + trace(tag = "event-stream", message = "Opening stream.") val streamReference = EventStreamReference(scope, "event-streaming") streamReference.retain() streamReference.timeoutHandler = { - trace("EventStream timed out") - streamReference.coroutineScope.launch { - openStream(scope, owner, streamReference, onEvent, onError) - } + trace(tag = "event-stream", message = "Timed out, signaling error for reconnect") + streamReference.destroy() + onError(IllegalStateException("Event stream timed out")) } streamReference.coroutineScope.launch { @@ -82,7 +82,8 @@ internal class EventStreamingService @Inject constructor( streamRef.receivedPing(updatedTimeout = response.ping.pingDelay.seconds * 1_000L) sendRequest(pong) - trace(message = "EventStream Pong. Server timestamp: ${response.ping.timestamp}") + val serverTime = Instant.fromEpochSeconds(response.ping.timestamp.seconds, response.ping.timestamp.nanos) + trace(tag = "event-stream", message = "Pong. Server timestamp: $serverTime") } RpcEventStreamingService.StreamEventsResponse.TypeCase.EVENTS -> { @@ -93,7 +94,8 @@ internal class EventStreamingService @Inject constructor( } else -> { trace( - message = "EventStream received unhandled event type: ${event.typeCase}", + tag = "event-stream", + message = "Received unhandled event type: ${event.typeCase}", type = TraceType.Log, ) } @@ -111,14 +113,16 @@ internal class EventStreamingService @Inject constructor( } onError(error) trace( - message = "EventStream error: ${response.error.code}", + tag = "event-stream", + message = "Error: ${response.error.code}", type = TraceType.Error, ) } else -> { trace( - message = "EventStream received empty message.", + tag = "event-stream", + message = "Received empty message.", type = TraceType.Error, ) } diff --git a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt index 2e7058733..cd8b68e40 100644 --- a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt +++ b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStream.kt @@ -3,6 +3,7 @@ package com.getcode.opencode.internal.bidi import com.getcode.utils.TraceType import com.getcode.utils.trace import io.grpc.Status +import io.grpc.StatusException import io.grpc.StatusRuntimeException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay @@ -23,6 +24,7 @@ fun openBidirectionalStream( reconnectOnUnavailable: Boolean = false, reconnectOnDeadlineExceeded: Boolean = false, reconnectOnCancelled: Boolean = false, + reconnectOnAborted: Boolean = false, reconnectDelayMs: Long = 300L, maxReconnectAttempts: Int = 8, onReconnectAttempt: ((attempt: Int, reason: Throwable?) -> Unit)? = null @@ -33,166 +35,148 @@ fun openBidirectionalStream( // Clean up any previous stream streamRef.cancel() - val requestChannel = Channel(capacity = Channel.RENDEZVOUS) - val requestFlow = requestChannel.consumeAsFlow() - - // Make sure we close the channel when stream completes/cancels - streamRef.complete { - trace(tag = tag, message = "Stream completed → closing request channel") - requestChannel.close() - onComplete() - } - - var attemptConnection: suspend (Int) -> Boolean = { _ -> false } // placeholder - - fun shouldReconnect(cause: Throwable): Boolean { - val statusException = cause as? StatusRuntimeException - val statusCode = statusException?.status?.code - + fun isRetryable(cause: Throwable): Boolean { + val code = cause.grpcStatusCode() return when { - reconnectOnDeadlineExceeded && statusCode == Status.Code.DEADLINE_EXCEEDED -> { - trace("Reconnecting stream due to DEADLINE_EXCEEDED status...") - true - } - reconnectOnCancelled && statusCode == Status.Code.CANCELLED -> { - trace("Reconnecting stream due to CANCELLED status...") - true - } - reconnectOnUnavailable && statusCode == Status.Code.UNAVAILABLE -> { - trace("Reconnecting stream due to UNAVAILABLE status...") - true - } - else -> { - false - } + reconnectOnUnavailable && code == Status.Code.UNAVAILABLE -> true + reconnectOnDeadlineExceeded && code == Status.Code.DEADLINE_EXCEEDED -> true + reconnectOnCancelled && code == Status.Code.CANCELLED -> true + reconnectOnAborted && code == Status.Code.ABORTED -> true + else -> false } } - suspend fun handleReconnectOrFail(attempt: Int, cause: Throwable): Boolean { - if (!shouldReconnect(cause)) { - trace( - tag = tag, - message = "Not reconnecting - unhandled error", - type = TraceType.Error, - error = cause - ) - onError(cause) - return false - } + // Single coordinator loop — no competing reconnect chains. + // Errors signal this channel; the loop decides whether to retry. + val retrySignal = Channel(capacity = Channel.CONFLATED) - onReconnectAttempt?.invoke(attempt, cause) - trace( - tag = tag, - message = "Scheduling reconnect in ${reconnectDelayMs}ms (attempt $attempt)" - ) - - delay(reconnectDelayMs) - - // Recursive reconnect attempt - return attemptConnection(attempt + 1) - } - - fun handleStreamError(attempt: Int, cause: Throwable) { - val status = (cause as? StatusRuntimeException)?.status - val code = status?.code - - val shouldRetry = when { - shouldReconnect(cause) -> true - code == Status.Code.UNAVAILABLE -> true - code == Status.Code.DEADLINE_EXCEEDED -> true - code == Status.Code.CANCELLED -> true - cause is CancellationException -> true - else -> false - } + streamRef.coroutineScope.launch { + var attempt = 0 - if (shouldRetry) { - trace( - tag = tag, - message = "Stream failed (${code?.name ?: cause.javaClass.simpleName}) → reconnecting" - ) - streamRef.coroutineScope.launch { - handleReconnectOrFail(attempt, cause) + while (attempt++ <= maxReconnectAttempts) { + if (attempt > 1) { + delay(reconnectDelayMs) } - } else { - trace( - tag = tag, - message = "Stream failed permanently", - type = TraceType.Error, - error = cause - ) - onError(cause) - } - } - - attemptConnection = attempt@{ attempt: Int -> - if (attempt > maxReconnectAttempts) { - trace( - tag = tag, - message = "Giving up after $maxReconnectAttempts attempts", - type = TraceType.Error - ) - onError(IllegalStateException("Max reconnect attempts reached")) - return@attempt false - } - trace(tag = tag, message = "Opening bidirectional stream (attempt $attempt)") + trace(tag = tag, message = "Opening bidirectional stream (attempt $attempt)") + + // Fresh channel per attempt — consumeAsFlow() is single-use + val requestChannel = Channel(capacity = Channel.RENDEZVOUS) + val requestFlow = requestChannel.consumeAsFlow() + + val responseFlow = try { + apiCall(requestFlow) + } catch (e: Exception) { + trace( + tag = tag, + message = "Failed to create stream flow", + type = TraceType.Error, + error = e + ) + requestChannel.close() + if (isRetryable(e)) { + onReconnectAttempt?.invoke(attempt, e) + continue + } + onError(e) + return@launch + } - val responseFlow = try { - apiCall(requestFlow) - } catch (e: Exception) { - trace( - tag = tag, - message = "Failed to create stream flow", - type = TraceType.Error, - error = e - ) - return@attempt handleReconnectOrFail(attempt, e) - } + var activated = false - streamRef.activateStream() - - val collectionJob = streamRef.coroutineScope.launch { - responseFlow - .catch { cause -> handleStreamError(attempt, cause) } - .collect { response -> - responseHandler(response) { request -> - streamRef.coroutineScope.launch { - try { - requestChannel.send(request) - } catch (e: Exception) { - trace( - tag = tag, - message = "Failed to send follow-up request", - type = TraceType.Error, - error = e - ) + val collectionJob = launch { + responseFlow + .catch { cause -> + requestChannel.close() + if (cause !is CancellationException) { + retrySignal.trySend(cause) + } + } + .collect { response -> + if (!activated) { + activated = true + streamRef.activateStream() + trace(tag = tag, message = "Stream activated on first response") + } + responseHandler(response) { request -> + launch { + try { + requestChannel.send(request) + } catch (e: Exception) { + trace( + tag = tag, + message = "Failed to send follow-up request", + type = TraceType.Error, + error = e + ) + } } } } + } + + try { + trace(tag = tag, message = "Sending initial request") + streamRef.postponeTimeout() + requestChannel.send(initialRequest()) + trace(tag = tag, message = "Initial request sent") + } catch (e: Exception) { + trace( + tag = tag, + message = "Failed to send initial request", + type = TraceType.Error, + error = e + ) + collectionJob.cancel() + requestChannel.close() + if (isRetryable(e)) { + onReconnectAttempt?.invoke(attempt, e) + continue } - } + onError(e) + return@launch + } + + // Wait for the collection to fail (retry signal) or complete normally. + // If the stream is healthy, this suspends indefinitely until an error occurs. + val error = retrySignal.receive() + collectionJob.cancel() + requestChannel.close() + + val code = error.grpcStatusCode() + if (isRetryable(error)) { + trace( + tag = tag, + message = "Stream failed (${code?.name ?: error.javaClass.simpleName}) → reconnecting" + ) + onReconnectAttempt?.invoke(attempt, error) + // Reset attempt counter on successful activation — only count + // consecutive failures, not total lifetime attempts. + if (activated) attempt = 0 + continue + } - try { - trace(tag = tag, message = "Sending initial request") - streamRef.postponeTimeout() - requestChannel.send(initialRequest()) - trace(tag = tag, message = "Initial request sent") - } catch (e: Exception) { trace( tag = tag, - message = "Failed to send initial request", + message = "Stream failed permanently", type = TraceType.Error, - error = e + error = error ) - collectionJob.cancel() - return@attempt handleReconnectOrFail(attempt, e) + onError(error) + return@launch } - true + trace( + tag = tag, + message = "Giving up after $maxReconnectAttempts attempts", + type = TraceType.Error + ) + onError(IllegalStateException("Max reconnect attempts reached")) } +} - // Start the first connection attempt - streamRef.coroutineScope.launch { - attemptConnection(1) - } -} \ No newline at end of file +internal fun Throwable.grpcStatusCode(): Status.Code? = when (this) { + is StatusRuntimeException -> status.code + is StatusException -> status.code + else -> null +} diff --git a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStreamForResult.kt b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStreamForResult.kt index e41b05d4f..f4cad421b 100644 --- a/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStreamForResult.kt +++ b/services/opencode/src/main/kotlin/com/getcode/opencode/internal/bidi/OpenStreamForResult.kt @@ -3,7 +3,6 @@ package com.getcode.opencode.internal.bidi import com.getcode.utils.TraceType import com.getcode.utils.trace import io.grpc.Status -import io.grpc.StatusRuntimeException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch @@ -20,6 +19,7 @@ suspend fun , reconnectOnUnavailable: Boolean = false, reconnectOnDeadlineExceeded: Boolean = false, reconnectOnCancelled: Boolean = false, + reconnectOnAborted: Boolean = false, reconnectHandler: (() -> Unit)? = null, responseHandler: suspend (response: Response, onResult: (ResultType) -> Unit, requestChannel: (Request) -> Unit) -> Unit ): ResultType { @@ -47,8 +47,7 @@ suspend fun , // Handle responses streamRef.coroutineScope.launch { responseFlow.catch { t -> - val statusException = t as? StatusRuntimeException - val statusCode = statusException?.status?.code + val statusCode = t.grpcStatusCode() when { reconnectOnDeadlineExceeded && statusCode == Status.Code.DEADLINE_EXCEEDED -> { trace("Reconnecting stream due to DEADLINE_EXCEEDED status...") @@ -62,6 +61,10 @@ suspend fun , trace("Reconnecting stream due to UNAVAILABLE status...") reconnectHandler?.invoke() } + reconnectOnAborted && statusCode == Status.Code.ABORTED -> { + trace("Reconnecting stream due to ABORTED status...") + reconnectHandler?.invoke() + } else -> { trace( message = "Stream error: ${t.message}",