diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index 9b88de8c..9065627c 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -152,6 +152,8 @@ public static class Builder { private int queueCapacity; private boolean forceTlsV1 = false; private GsonBuilder gsonBuilder; + private long maxTotalBackoffDurationMs; + private long maxRateLimitDurationMs; Builder(String writeKey) { if (writeKey == null || writeKey.trim().length() == 0) { @@ -297,7 +299,10 @@ public Builder flushInterval(long flushInterval, TimeUnit unit) { return this; } - /** Set how many retries should happen before getting exhausted */ + /** + * Set how many retries should happen before getting exhausted. Default is 10 retries with + * exponential backoff starting at 500ms (capped at 1 minute per attempt). + */ public Builder retries(int maximumRetries) { if (maximumRetries < 1) { throw new IllegalArgumentException("retries must be at least 1"); @@ -356,6 +361,32 @@ public Builder forceTlsVersion1() { return this; } + /** + * Set the maximum total duration for backoff-based retries before giving up on a batch. Default + * is 12 hours. + */ + public Builder maxTotalBackoffDuration(long duration, TimeUnit unit) { + long seconds = unit.toSeconds(duration); + if (seconds < 1) { + throw new IllegalArgumentException("maxTotalBackoffDuration must be at least 1 second."); + } + this.maxTotalBackoffDurationMs = unit.toMillis(duration); + return this; + } + + /** + * Set the maximum total duration for rate-limit (429) retries before giving up on a batch. + * Default is 12 hours. + */ + public Builder maxRateLimitDuration(long duration, TimeUnit unit) { + long seconds = unit.toSeconds(duration); + if (seconds < 1) { + throw new IllegalArgumentException("maxRateLimitDuration must be at least 1 second."); + } + this.maxRateLimitDurationMs = unit.toMillis(duration); + return this; + } + /** Create a {@link Analytics} client. */ public Analytics build() { if (gsonBuilder == null) { @@ -397,7 +428,7 @@ public Analytics build() { maximumQueueSizeInBytes = MESSAGE_QUEUE_MAX_BYTE_SIZE; } if (maximumFlushAttempts == 0) { - maximumFlushAttempts = 3; + maximumFlushAttempts = 10; } if (messageTransformers == null) { messageTransformers = Collections.emptyList(); @@ -420,6 +451,12 @@ public Analytics build() { } else { callbacks = Collections.unmodifiableList(callbacks); } + if (maxTotalBackoffDurationMs == 0) { + maxTotalBackoffDurationMs = 43200 * 1000L; // 12 hours + } + if (maxRateLimitDurationMs == 0) { + maxRateLimitDurationMs = 43200 * 1000L; // 12 hours + } HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor( @@ -435,7 +472,7 @@ public void log(String message) { OkHttpClient.Builder builder = client .newBuilder() - .addInterceptor(new AnalyticsRequestInterceptor(userAgent)) + .addInterceptor(new AnalyticsRequestInterceptor(writeKey, userAgent)) .addInterceptor(interceptor); if (forceTlsV1) { @@ -473,7 +510,9 @@ public void log(String message) { networkExecutor, callbacks, writeKey, - gson); + gson, + maxTotalBackoffDurationMs, + maxRateLimitDurationMs); return new Analytics(analyticsClient, messageTransformers, messageInterceptors, log); } diff --git a/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java b/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java index ff50fc3b..c6c435f6 100644 --- a/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java +++ b/analytics/src/main/java/com/segment/analytics/AnalyticsRequestInterceptor.java @@ -1,24 +1,39 @@ package com.segment.analytics; +import com.segment.analytics.internal.AnalyticsClient; import jakarta.annotation.Nonnull; import java.io.IOException; +import okhttp3.Credentials; import okhttp3.Interceptor; import okhttp3.Request; class AnalyticsRequestInterceptor implements Interceptor { + private static final String AUTHORIZATION_HEADER = "Authorization"; private static final String USER_AGENT_HEADER = "User-Agent"; + private static final String RETRY_COUNT_HEADER = "X-Retry-Count"; + private final @Nonnull String writeKey; private final @Nonnull String userAgent; - AnalyticsRequestInterceptor(@Nonnull String userAgent) { + AnalyticsRequestInterceptor(@Nonnull String writeKey, @Nonnull String userAgent) { + this.writeKey = writeKey; this.userAgent = userAgent; } @Override public okhttp3.Response intercept(Chain chain) throws IOException { Request request = chain.request(); - Request newRequest = request.newBuilder().addHeader(USER_AGENT_HEADER, userAgent).build(); + Request.Builder builder = + request + .newBuilder() + .addHeader(AUTHORIZATION_HEADER, Credentials.basic(writeKey, "")) + .addHeader(USER_AGENT_HEADER, userAgent); - return chain.proceed(newRequest); + Integer retryCount = AnalyticsClient.RETRY_COUNT.get(); + if (retryCount != null) { + builder.addHeader(RETRY_COUNT_HEADER, retryCount.toString()); + } + + return chain.proceed(builder.build()); } } diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 2430cd1e..bc2362e5 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -15,6 +15,11 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -38,6 +43,8 @@ import retrofit2.Response; public class AnalyticsClient { + public static final ThreadLocal RETRY_COUNT = new ThreadLocal<>(); + private static final Map CONTEXT; private static final int BATCH_MAX_SIZE = 1024 * 500; private static final int MSG_MAX_SIZE = 1024 * 32; @@ -46,6 +53,9 @@ public class AnalyticsClient { private static final String instanceId = UUID.randomUUID().toString(); private static final int WAIT_FOR_THREAD_COMPLETE_S = 5; private static final int TERMINATION_TIMEOUT_S = 1; + private static final int NETWORK_TERMINATION_TIMEOUT_S = + 75; // base Retry-After cap is 60s + headroom + private static final long MAX_RATE_LIMITED_SECONDS = 300L; static { Map library = new LinkedHashMap<>(); @@ -71,7 +81,12 @@ public class AnalyticsClient { private final ScheduledExecutorService flushScheduler; private final AtomicBoolean isShutDown; private final String writeKey; + final long maxTotalBackoffDurationMs; + final long maxRateLimitDurationMs; private volatile Future looperFuture; + private volatile boolean rateLimited; + private volatile long rateLimitWaitUntil; + private volatile long rateLimitStartTime; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -86,7 +101,9 @@ public static AnalyticsClient create( ExecutorService networkExecutor, List callbacks, String writeKey, - Gson gsonInstance) { + Gson gsonInstance, + long maxTotalBackoffDurationMs, + long maxRateLimitDurationMs) { return new AnalyticsClient( new LinkedBlockingQueue(queueCapacity), uploadUrl, @@ -101,7 +118,9 @@ public static AnalyticsClient create( callbacks, new AtomicBoolean(false), writeKey, - gsonInstance); + gsonInstance, + maxTotalBackoffDurationMs, + maxRateLimitDurationMs); } public AnalyticsClient( @@ -118,7 +137,9 @@ public AnalyticsClient( List callbacks, AtomicBoolean isShutDown, String writeKey, - Gson gsonInstance) { + Gson gsonInstance, + long maxTotalBackoffDurationMs, + long maxRateLimitDurationMs) { this.messageQueue = messageQueue; this.uploadUrl = uploadUrl; this.service = service; @@ -132,6 +153,8 @@ public AnalyticsClient( this.isShutDown = isShutDown; this.writeKey = writeKey; this.gsonInstance = gsonInstance; + this.maxTotalBackoffDurationMs = maxTotalBackoffDurationMs; + this.maxRateLimitDurationMs = maxRateLimitDurationMs; this.currentQueueSizeInBytes = 0; @@ -166,7 +189,28 @@ private Boolean isBackPressuredAfterSize(int incomingSize) { } public boolean offer(Message message) { - return messageQueue.offer(message); + if (isShutDown.get()) { + log.print(ERROR, "Attempt to offer a message when shutdown has been called %s.", message); + return false; + } + + int messageByteSize = messageSizeInBytes(message); + if (messageByteSize > MSG_MAX_SIZE) { + log.print(ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); + return false; + } + + if (!messageQueue.offer(message)) { + return false; + } + if (isBackPressuredAfterSize(messageByteSize)) { + this.currentQueueSizeInBytes = messageByteSize; + messageQueue.offer(FlushMessage.POISON); + log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); + } else { + this.currentQueueSizeInBytes += messageByteSize; + } + return true; } public void enqueue(Message message) { @@ -215,6 +259,41 @@ public void flush() { } } + synchronized void setRateLimitState(long retryAfterSeconds) { + long now = System.currentTimeMillis(); + if (rateLimitStartTime == 0) { + rateLimitStartTime = now; + } + rateLimitWaitUntil = now + (retryAfterSeconds * 1000); + rateLimited = true; + } + + /** + * Sets rate-limit state and atomically checks whether maxRateLimitDuration has been exceeded. + * Returns true if the duration has been exceeded and the batch should be dropped. + */ + synchronized boolean setRateLimitStateAndCheckDuration( + long retryAfterSeconds, long maxRateLimitDurationMs) { + setRateLimitState(retryAfterSeconds); + return rateLimitStartTime > 0 + && System.currentTimeMillis() - rateLimitStartTime > maxRateLimitDurationMs; + } + + synchronized void clearRateLimitState() { + rateLimited = false; + rateLimitWaitUntil = 0; + rateLimitStartTime = 0; + } + + synchronized boolean isRateLimited() { + if (!rateLimited) return false; + if (System.currentTimeMillis() >= rateLimitWaitUntil) { + clearRateLimitState(); + return false; + } + return true; + } + public void shutdown() { if (isShutDown.compareAndSet(false, true)) { final long start = System.currentTimeMillis(); @@ -260,9 +339,11 @@ private void waitForLooperCompletion() { public void shutdownAndWait(ExecutorService executor, String name) { boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); + boolean isNetworkExecutor = name != null && name.equalsIgnoreCase("network"); + int timeoutSeconds = isNetworkExecutor ? NETWORK_TERMINATION_TIMEOUT_S : TERMINATION_TIMEOUT_S; try { executor.shutdown(); - boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); + boolean terminated = executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS); if (terminated) { log.print(VERBOSE, "%s executor terminated normally.", name); return; @@ -364,44 +445,74 @@ public void run() { Boolean isOverflow = messages.size() >= size; if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { - Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); - log.print( - VERBOSE, - "Batching %s message(s) into batch %s.", - batch.batch().size(), - batch.sequence()); - try { - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - } catch (RejectedExecutionException e) { + // Skip submission if rate-limited (unless this is a StopMessage — always flush on + // shutdown) + if (isRateLimited() && message != StopMessage.STOP) { + log.print(DEBUG, "Rate-limited. Deferring batch submission."); + // Don't clear messages — they'll be picked up on the next flush trigger + if (batchSizeLimitReached) { + // Preserve overflow message while deferring submission due to rate limiting. + // This message was consumed from the queue but not added to the current batch. + if (!messageQueue.offer(message)) { + log.print( + ERROR, + "Failed to preserve overflow message while rate-limited; message may be dropped."); + } + batchSizeLimitReached = false; + } + } else { + Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); log.print( - ERROR, - e, - "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + VERBOSE, + "Batching %s message(s) into batch %s.", + batch.batch().size(), batch.sequence()); - // Notify callbacks about the failure - for (Message msg : batch.batch()) { - for (Callback callback : callbacks) { - callback.failure(msg, e); + try { + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + } catch (RejectedExecutionException e) { + log.print( + ERROR, + e, + "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", + batch.sequence()); + // Notify callbacks about the failure + for (Message msg : batch.batch()) { + for (Callback callback : callbacks) { + callback.failure(msg, e); + } } } - } - currentBatchSize.set(0); - messages.clear(); - if (batchSizeLimitReached) { - // If this is true that means the last message that would make us go over the limit - // was not added, - // add it to the now cleared messages list so its not lost - messages.add(message); + currentBatchSize.set(0); + messages.clear(); + if (batchSizeLimitReached) { + // If this is true that means the last message that would make us go over the limit + // was not added, + // add it to the now cleared messages list so its not lost + messages.add(message); + } + batchSizeLimitReached = false; } - batchSizeLimitReached = false; } } } catch (InterruptedException e) { log.print(DEBUG, "Looper interrupted while polling for messages."); Thread.currentThread().interrupt(); } + // Any messages still held in the local buffer were deferred due to rate-limiting and + // never flushed. Fire failure callbacks so callers are not silently left waiting. + if (!messages.isEmpty()) { + IOException stalledError = + new IOException( + messages.size() + " message(s) dropped: rate-limited when Looper stopped"); + log.print(ERROR, stalledError.getMessage()); + for (Message msg : messages) { + for (Callback callback : callbacks) { + callback.failure(msg, stalledError); + } + } + } log.print(VERBOSE, "Looper stopped"); } } @@ -409,8 +520,8 @@ public void run() { static class BatchUploadTask implements Runnable { private static final Backo BACKO = Backo.builder() // - .base(TimeUnit.SECONDS, 15) // - .cap(TimeUnit.HOURS, 1) // + .base(TimeUnit.MILLISECONDS, 500) // + .cap(TimeUnit.MINUTES, 1) // .jitter(1) // .build(); @@ -438,11 +549,40 @@ private void notifyCallbacksWithException(Batch batch, Exception exception) { } } - /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ - boolean upload() { + private enum RetryStrategy { + NONE, + BACKOFF, + RATE_LIMITED + } + + private static final class UploadResult { + final RetryStrategy strategy; + final long retryAfterSeconds; + + UploadResult(RetryStrategy strategy) { + this(strategy, 0L); + } + + UploadResult(RetryStrategy strategy, long retryAfterSeconds) { + this.strategy = strategy; + this.retryAfterSeconds = retryAfterSeconds; + } + } + + /** + * Perform a single upload attempt. + * + * @param attempt overall number of attempts so far (1-based) + */ + UploadResult upload(int attempt) { client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); try { + if (attempt > 1) { + RETRY_COUNT.set(attempt - 1); + } else { + RETRY_COUNT.remove(); + } Call call = client.service.upload(client.uploadUrl, batch); Response response = call.execute(); @@ -455,59 +595,176 @@ boolean upload() { } } - return false; + return new UploadResult(RetryStrategy.NONE); } int status = response.code(); - if (is5xx(status)) { - client.log.print( - DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); - return true; - } else if (status == 429) { - client.log.print( - DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); - return true; + + if (isStatusRetryWithBackoff(status)) { + String retryAfterHeader = response.headers().get("Retry-After"); + Long retryAfterSeconds = parseRetryAfterSeconds(retryAfterHeader); + if (retryAfterSeconds != null) { + client.log.print( + DEBUG, + "Could not upload batch %s due to status %s with Retry-After %s seconds. Retrying after delay.", + batch.sequence(), + status, + retryAfterSeconds); + return new UploadResult(RetryStrategy.RATE_LIMITED, retryAfterSeconds); + } + if (retryAfterHeader != null && !retryAfterHeader.trim().isEmpty()) { + client.log.print( + DEBUG, + "Status %s returned unparseable Retry-After header \"%s\" for batch %s. Using backoff.", + status, + retryAfterHeader, + batch.sequence()); + } else { + client.log.print( + DEBUG, + "Status %s did not have a valid Retry-After header for batch %s. Using backoff.", + status, + batch.sequence()); + } + return new UploadResult(RetryStrategy.BACKOFF); } - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); + client.log.print( + DEBUG, + "Could not upload batch %s due to non-retryable status %s. Giving up.", + batch.sequence(), + status); notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); - return false; + return new UploadResult(RetryStrategy.NONE); } catch (IOException error) { client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); - return true; + return new UploadResult(RetryStrategy.BACKOFF); } catch (Exception exception) { client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); notifyCallbacksWithException(batch, exception); - return false; + return new UploadResult(RetryStrategy.NONE); + } + } + + private static Long parseRetryAfterSeconds(String headerValue) { + if (headerValue == null) { + return null; + } + headerValue = headerValue.trim(); + if (headerValue.isEmpty()) { + return null; + } + try { + long seconds = Long.parseLong(headerValue); + if (seconds <= 0L) { + return null; + } + return Math.min(seconds, MAX_RATE_LIMITED_SECONDS); + } catch (NumberFormatException ignored) { + // Try HTTP-date format (RFC 7231 §7.1.1.1) + } + try { + ZonedDateTime target = + ZonedDateTime.parse(headerValue, DateTimeFormatter.RFC_1123_DATE_TIME); + long seconds = Duration.between(Instant.now(), target.toInstant()).getSeconds(); + if (seconds <= 0L) { + return null; + } + return Math.min(seconds, MAX_RATE_LIMITED_SECONDS); + } catch (DateTimeParseException ignored) { + return null; } } @Override public void run() { - int attempt = 0; - for (; attempt <= maxRetries; attempt++) { - boolean retry = upload(); - if (!retry) return; + int totalAttempts = 0; // counts every HTTP attempt (for header and error message) + int backoffAttempts = 0; // counts attempts that consume backoff-based retries + int maxBackoffAttempts = maxRetries + 1; // preserve existing semantics + long firstFailureTime = 0; + + while (true) { + totalAttempts++; + UploadResult result = upload(totalAttempts); + + if (result.strategy == RetryStrategy.NONE) { + client.clearRateLimitState(); + return; + } + + if (result.strategy == RetryStrategy.RATE_LIMITED) { + // Atomically set rate-limit state and check whether maxRateLimitDuration is exceeded. + boolean durationExceeded = + client.setRateLimitStateAndCheckDuration( + result.retryAfterSeconds, client.maxRateLimitDurationMs); + if (durationExceeded) { + client.clearRateLimitState(); + break; + } + + // If maxRetries=0 the user wants no retries at all; respect that for Retry-After too. + if (maxBackoffAttempts <= 1) { + break; + } + + try { + TimeUnit.SECONDS.sleep(result.retryAfterSeconds); + } catch (InterruptedException e) { + client.log.print( + DEBUG, + "Thread interrupted while waiting for Retry-After for batch %s.", + batch.sequence()); + client.clearRateLimitState(); + Thread.currentThread().interrupt(); + return; + } + // Retry-After does not count against maxRetries. + continue; + } + + // BACKOFF strategy + if (firstFailureTime == 0) firstFailureTime = System.currentTimeMillis(); + if (System.currentTimeMillis() - firstFailureTime > client.maxTotalBackoffDurationMs) { + break; + } + + backoffAttempts++; + if (backoffAttempts >= maxBackoffAttempts) { + break; + } + try { - backo.sleep(attempt); + backo.sleep(backoffAttempts - 1); } catch (InterruptedException e) { client.log.print( DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); + Thread.currentThread().interrupt(); return; } } + client.clearRateLimitState(); client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); notifyCallbacksWithException( - batch, new IOException(Integer.toString(attempt) + " retries exhausted")); + batch, new IOException(Integer.toString(totalAttempts) + " retries exhausted")); } - private static boolean is5xx(int status) { - return status >= 500 && status < 600; + private static boolean isStatusRetryWithBackoff(int status) { + // 460 is a non-standard, Segment-specific status code for transient retryable failures. + if (status == 408 || status == 410 || status == 429 || status == 460) { + return true; + } + + // Retry all other 5xx errors except 501, 505, and 511 + if (status >= 500 && status < 600) { + return status != 501 && status != 505 && status != 511; + } + + return false; } } diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java index 31596e90..45ee6673 100644 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java +++ b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java @@ -460,4 +460,50 @@ public void buildWithQueueCapacity() { Analytics analytics = builder.queueCapacity(10).build(); assertThat(analytics).isNotNull(); } + + @Test + public void invalidMaxTotalBackoffDuration() { + try { + builder.maxTotalBackoffDuration(0, TimeUnit.SECONDS); + fail("Should fail for maxTotalBackoffDuration < 1 second"); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("maxTotalBackoffDuration must be at least 1 second."); + } + + try { + builder.maxTotalBackoffDuration(500, TimeUnit.MILLISECONDS); + fail("Should fail for maxTotalBackoffDuration < 1 second"); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("maxTotalBackoffDuration must be at least 1 second."); + } + } + + @Test + public void buildsWithValidMaxTotalBackoffDuration() { + Analytics analytics = builder.maxTotalBackoffDuration(1, TimeUnit.SECONDS).build(); + assertThat(analytics).isNotNull(); + } + + @Test + public void invalidMaxRateLimitDuration() { + try { + builder.maxRateLimitDuration(0, TimeUnit.SECONDS); + fail("Should fail for maxRateLimitDuration < 1 second"); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("maxRateLimitDuration must be at least 1 second."); + } + + try { + builder.maxRateLimitDuration(500, TimeUnit.MILLISECONDS); + fail("Should fail for maxRateLimitDuration < 1 second"); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("maxRateLimitDuration must be at least 1 second."); + } + } + + @Test + public void buildsWithValidMaxRateLimitDuration() { + Analytics analytics = builder.maxRateLimitDuration(1, TimeUnit.SECONDS).build(); + assertThat(analytics).isNotNull(); + } } diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java index 5741b6e0..cd884bb3 100644 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java +++ b/analytics/src/test/java/com/segment/analytics/AnalyticsRequestInterceptorTest.java @@ -16,7 +16,8 @@ public class AnalyticsRequestInterceptorTest { @Test public void testInterceptor() throws IOException { - AnalyticsRequestInterceptor interceptor = new AnalyticsRequestInterceptor("userAgent"); + AnalyticsRequestInterceptor interceptor = + new AnalyticsRequestInterceptor("writeKey", "userAgent"); final Request request = new Request.Builder().url("https://api.segment.io").get().build(); @@ -24,6 +25,7 @@ public void testInterceptor() throws IOException { new ChainAdapter(request, mockConnection) { @Override public Response proceed(Request request) throws IOException { + assertThat(request.header("Authorization"), Is.is("Basic d3JpdGVLZXk6")); assertThat(request.header("User-Agent"), Is.is("userAgent")); return null; } diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java index 74f04e13..8c573173 100644 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java @@ -4,8 +4,11 @@ import static com.segment.analytics.internal.StopMessage.STOP; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; @@ -30,9 +33,11 @@ import com.squareup.burst.BurstJUnit4; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Random; @@ -92,7 +97,14 @@ public void setUp() { // Defers loading the client until tests can initialize all required // dependencies. + private static final long DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS = 43200 * 1000L; // 12 hours + private static final long DEFAULT_MAX_RATE_LIMIT_DURATION_MS = 43200 * 1000L; // 12 hours + AnalyticsClient newClient() { + return newClient(DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, DEFAULT_MAX_RATE_LIMIT_DURATION_MS); + } + + AnalyticsClient newClient(long maxTotalBackoffDurationMs, long maxRateLimitDurationMs) { return new AnalyticsClient( messageQueue, null, @@ -107,7 +119,9 @@ AnalyticsClient newClient() { Collections.singletonList(callback), isShutDown, writeKey, - new Gson()); + new Gson(), + maxTotalBackoffDurationMs, + maxRateLimitDurationMs); } @Test @@ -128,7 +142,7 @@ public void shutdown() throws InterruptedException { client.shutdown(); verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); + verify(networkExecutor).awaitTermination(75, TimeUnit.SECONDS); } @Test @@ -140,6 +154,117 @@ public void flushInsertsPoison() throws InterruptedException { verify(messageQueue).put(FlushMessage.POISON); } + @Test + public void rateLimitedDeferralPreservesOverflowMessage() throws InterruptedException { + // Use a real queue with real messages. Two large messages trigger batchSizeLimitReached + // on the second one. StopMessage ends the Looper (bypasses rate-limit on shutdown). + LinkedBlockingQueue localQueue = new LinkedBlockingQueue<>(); + + // Create messages large enough that the second exceeds BATCH_MAX_SIZE (512000 bytes). + // ~300KB each: first fits, second triggers batchSizeLimitReached. + String largePayload = new String(new char[300000]).replace('\0', 'x'); + Map largeProps = new java.util.HashMap<>(); + largeProps.put("data", largePayload); + + TrackMessage firstMessage = + TrackMessage.builder("first").userId("user").properties(largeProps).build(); + TrackMessage overflowMessage = + TrackMessage.builder("overflow").userId("user").properties(largeProps).build(); + + localQueue.put(firstMessage); + localQueue.put(overflowMessage); + localQueue.put(StopMessage.STOP); + + // Pass isShutDown=true to prevent the constructor from auto-starting a Looper + // (which would race with our manually-created Looper and consume queue messages). + AnalyticsClient client = + new AnalyticsClient( + localQueue, + null, + segmentService, + 50, + TimeUnit.HOURS.toMillis(1), + 0, + MAX_BATCH_SIZE, + log, + threadFactory, + networkExecutor, + Collections.singletonList(callback), + new AtomicBoolean(true), + writeKey, + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + DEFAULT_MAX_RATE_LIMIT_DURATION_MS); + + // Set rate-limited state so the Looper defers batch submission + client.setRateLimitState(60); + + AnalyticsClient.Looper looper = client.new Looper(); + looper.run(); + + // After: msg1 added to messages, msg2 triggers batchSizeLimitReached, + // rate-limited deferral offers msg2 back to queue, StopMessage bypasses + // rate-limit and submits batch with msg1. msg2 remains in queue. + assertThat(localQueue).contains(overflowMessage); + // Batch with msg1 was submitted on StopMessage (shutdown always flushes) + verify(networkExecutor).submit(any(Runnable.class)); + } + + @Test + public void rateLimitedDeferralRecoversOnNextFlush() throws InterruptedException { + // Use a custom queue that clears rate-limit state when the second POISON is taken, + // simulating rate-limit expiry between two flush triggers within a single looper run. + final AnalyticsClient[] clientHolder = new AnalyticsClient[1]; + final int[] poisonCount = {0}; + + LinkedBlockingQueue localQueue = + new LinkedBlockingQueue() { + @Override + public Message take() throws InterruptedException { + Message msg = super.take(); + if (msg == FlushMessage.POISON && ++poisonCount[0] == 2) { + // Second POISON: clear rate limit to simulate expiry before looper checks it + if (clientHolder[0] != null) clientHolder[0].clearRateLimitState(); + } + return msg; + } + }; + + TrackMessage message = TrackMessage.builder("event").userId("user").build(); + localQueue.put(message); + localQueue.put(FlushMessage.POISON); // first flush — rate-limited, defers + localQueue.put(FlushMessage.POISON); // second flush — rate limit cleared, submits + localQueue.put(StopMessage.STOP); + + AnalyticsClient client = + new AnalyticsClient( + localQueue, + null, + segmentService, + 50, + TimeUnit.HOURS.toMillis(1), + 0, + MAX_BATCH_SIZE, + log, + threadFactory, + networkExecutor, + Collections.singletonList(callback), + new AtomicBoolean(true), + writeKey, + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + DEFAULT_MAX_RATE_LIMIT_DURATION_MS); + + clientHolder[0] = client; + client.setRateLimitState(60); // rate-limited for 60s at looper start + + AnalyticsClient.Looper looper = client.new Looper(); + looper.run(); + + // First POISON deferred, second POISON submitted after rate limit cleared + verify(networkExecutor, times(1)).submit(any(Runnable.class)); + } + /** Wait until the queue is drained. */ static void wait(Queue queue) { // noinspection StatementWithEmptyBody @@ -296,7 +421,9 @@ public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedExce Collections.singletonList(callback), isShutDown, writeKey, - new Gson()); + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + DEFAULT_MAX_RATE_LIMIT_DURATION_MS); Map properties = new HashMap(); @@ -367,6 +494,20 @@ static Batch batchFor(Message message) { Collections.emptyMap(), Collections.singletonList(message), writeKey); } + private static Response errorWithRetryAfter( + int status, String retryAfterSeconds) { + okhttp3.Response rawResponse = + new okhttp3.Response.Builder() + .code(status) + .protocol(okhttp3.Protocol.HTTP_1_1) + .request(new okhttp3.Request.Builder().url("http://localhost/" /* unused */).build()) + .message("Error") + .addHeader("Retry-After", retryAfterSeconds) + .build(); + + return Response.error(ResponseBody.create(null, "Error"), rawResponse); + } + @Test public void batchRetriesForNetworkErrors() { AnalyticsClient client = newClient(); @@ -377,7 +518,7 @@ public void batchRetriesForNetworkErrors() { Response failureResponse = Response.error(429, ResponseBody.create(null, "")); // Throw a network error 3 times. - when(segmentService.upload(null, batch)) + when(segmentService.upload(isNull(), eq(batch))) .thenReturn(Calls.response(failureResponse)) .thenReturn(Calls.response(failureResponse)) .thenReturn(Calls.response(failureResponse)) @@ -387,7 +528,7 @@ public void batchRetriesForNetworkErrors() { batchUploadTask.run(); // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); + verify(segmentService, times(4)).upload(isNull(), eq(batch)); verify(callback).success(trackMessage); } @@ -402,7 +543,7 @@ public void batchRetriesForHTTP5xxErrors() { Response successResponse = Response.success(200, response); Response failResponse = Response.error(500, ResponseBody.create(null, "Server Error")); - when(segmentService.upload(null, batch)) + when(segmentService.upload(isNull(), eq(batch))) .thenReturn(Calls.response(failResponse)) .thenReturn(Calls.response(failResponse)) .thenReturn(Calls.response(failResponse)) @@ -412,7 +553,7 @@ public void batchRetriesForHTTP5xxErrors() { batchUploadTask.run(); // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); + verify(segmentService, times(4)).upload(isNull(), eq(batch)); verify(callback).success(trackMessage); } @@ -426,7 +567,7 @@ public void batchRetriesForHTTP429Errors() { Response successResponse = Response.success(200, response); Response failResponse = Response.error(429, ResponseBody.create(null, "Rate Limited")); - when(segmentService.upload(null, batch)) + when(segmentService.upload(isNull(), eq(batch))) .thenReturn(Calls.response(failResponse)) .thenReturn(Calls.response(failResponse)) .thenReturn(Calls.response(failResponse)) @@ -436,7 +577,28 @@ public void batchRetriesForHTTP429Errors() { batchUploadTask.run(); // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); + verify(segmentService, times(4)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + } + + @Test + public void batchRetriesForRetryableClientErrors() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response successResponse = Response.success(200, response); + Response failResponse = + Response.error(408, ResponseBody.create(null, "Request Timeout")); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + verify(segmentService, times(2)).upload(isNull(), eq(batch)); verify(callback).success(trackMessage); } @@ -449,13 +611,30 @@ public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { // Throw a HTTP error that should not be retried. Response failResponse = Response.error(404, ResponseBody.create(null, "Not Found")); - when(segmentService.upload(null, batch)).thenReturn(Calls.response(failResponse)); + when(segmentService.upload(isNull(), eq(batch))).thenReturn(Calls.response(failResponse)); BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); batchUploadTask.run(); // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); + verify(segmentService).upload(isNull(), eq(batch)); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + + @Test + public void batchDoesNotRetryForNonRetryable5xxErrors() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response failResponse = + Response.error(501, ResponseBody.create(null, "Not Implemented")); + when(segmentService.upload(isNull(), eq(batch))).thenReturn(Calls.response(failResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + verify(segmentService).upload(isNull(), eq(batch)); verify(callback).failure(eq(trackMessage), any(IOException.class)); } @@ -466,13 +645,13 @@ public void batchDoesNotRetryForNonNetworkErrors() { Batch batch = batchFor(trackMessage); Call networkFailure = Calls.failure(new RuntimeException()); - when(segmentService.upload(null, batch)).thenReturn(networkFailure); + when(segmentService.upload(isNull(), eq(batch))).thenReturn(networkFailure); BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); batchUploadTask.run(); // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); + verify(segmentService).upload(isNull(), eq(batch)); verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); } @@ -482,7 +661,7 @@ public void givesUpAfterMaxRetries() { TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); Batch batch = batchFor(trackMessage); - when(segmentService.upload(null, batch)) + when(segmentService.upload(isNull(), eq(batch))) .thenAnswer( new Answer>() { public Call answer(InvocationOnMock invocation) { @@ -495,9 +674,8 @@ public Call answer(InvocationOnMock invocation) { BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); batchUploadTask.run(); - // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java - verify(segmentService, times(11)).upload(null, batch); + // maxRetries=10 => tries 11 (one initial attempt + 10 retries) + verify(segmentService, times(11)).upload(isNull(), eq(batch)); verify(callback) .failure( eq(trackMessage), @@ -510,13 +688,46 @@ public boolean matches(IOException exception) { })); } + @Test + public void retryAfterDoesNotCountAgainstMaxRetries() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response rateLimited = errorWithRetryAfter(429, "1"); + Response serverError = + Response.error(500, ResponseBody.create(null, "Server Error")); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimited)) + .thenReturn(Calls.response(rateLimited)) + .thenReturn(Calls.response(serverError)) + .thenReturn(Calls.response(serverError)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 1); + batchUploadTask.run(); + + // 2 Retry-After attempts + 2 backoff attempts (maxRetries = 1 => 2 backoff attempts) + verify(segmentService, times(4)).upload(isNull(), eq(batch)); + verify(callback) + .failure( + eq(trackMessage), + argThat( + new ArgumentMatcher() { + @Override + public boolean matches(IOException exception) { + return exception.getMessage().equals("4 retries exhausted"); + } + })); + } + @Test public void hasDefaultRetriesSetTo3() { AnalyticsClient client = newClient(); TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); Batch batch = batchFor(trackMessage); - when(segmentService.upload(null, batch)) + when(segmentService.upload(isNull(), eq(batch))) .thenAnswer( new Answer>() { public Call answer(InvocationOnMock invocation) { @@ -530,8 +741,8 @@ public Call answer(InvocationOnMock invocation) { batchUploadTask.run(); // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) - verify(segmentService, times(4)).upload(null, batch); + // tries 4 times (one normal run + 3 retries) + verify(segmentService, times(4)).upload(isNull(), eq(batch)); verify(callback) .failure( eq(trackMessage), @@ -544,6 +755,89 @@ public boolean matches(IOException exception) { })); } + @Test + public void xRetryCountHeaderIncrementsWithEachAttempt() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response successResponse = Response.success(200, response); + Response failResponse = + Response.error(500, ResponseBody.create(null, "Server Error")); + + List capturedRetryCounts = new ArrayList<>(); + when(segmentService.upload(isNull(), eq(batch))) + .thenAnswer( + invocation -> { + capturedRetryCounts.add(AnalyticsClient.RETRY_COUNT.get()); + return Calls.response(failResponse); + }) + .thenAnswer( + invocation -> { + capturedRetryCounts.add(AnalyticsClient.RETRY_COUNT.get()); + return Calls.response(failResponse); + }) + .thenAnswer( + invocation -> { + capturedRetryCounts.add(AnalyticsClient.RETRY_COUNT.get()); + return Calls.response(successResponse); + }); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + assertThat(capturedRetryCounts).containsExactly(null, 1, 2); + verify(callback).success(trackMessage); + } + + @Test + public void retryAfterHeaderRespectsShortDelay() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // Test with a short Retry-After delay (2 seconds) to verify the mechanism works + // The cap at 300 seconds is verified by code inspection at AnalyticsClient.java:610-612 + Response rateLimitedWithShortDelay = errorWithRetryAfter(429, "2"); + Response successResponse = Response.success(200, response); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimitedWithShortDelay)) + .thenReturn(Calls.response(successResponse)); + + long startTime = System.currentTimeMillis(); + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + long endTime = System.currentTimeMillis(); + + // Should wait approximately 2 seconds + long elapsedMs = endTime - startTime; + assertThat(elapsedMs).isGreaterThanOrEqualTo(2000L).isLessThan(5000L); + + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + } + + @Test + public void batchDoesNotRetryFor413PayloadTooLarge() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // 413 Payload Too Large should not be retried + Response payloadTooLargeResponse = + Response.error(413, ResponseBody.create(null, "Payload Too Large")); + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(payloadTooLargeResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Verify we only tried to upload once (no retries) + verify(segmentService, times(1)).upload(isNull(), eq(batch)); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + @Test public void flushWhenNotShutDown() throws InterruptedException { AnalyticsClient client = newClient(); @@ -612,7 +906,7 @@ public void shutdownWithNoMessageInTheQueue() throws InterruptedException { verify(messageQueue).put(STOP); verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); + verify(networkExecutor).awaitTermination(75, TimeUnit.SECONDS); verifyNoMoreInteractions(networkExecutor); } @@ -626,7 +920,7 @@ public void shutdownWithMessagesInTheQueue(MessageBuilderTest builder) verify(messageQueue).put(STOP); verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); + verify(networkExecutor).awaitTermination(75, TimeUnit.SECONDS); verify(networkExecutor).submit(any(AnalyticsClient.BatchUploadTask.class)); } @@ -636,7 +930,7 @@ public void neverRetries() { TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); Batch batch = batchFor(trackMessage); - when(segmentService.upload(null, batch)) + when(segmentService.upload(isNull(), eq(batch))) .thenAnswer( new Answer>() { public Call answer(InvocationOnMock invocation) { @@ -650,7 +944,7 @@ public Call answer(InvocationOnMock invocation) { batchUploadTask.run(); // runs once but never retries - verify(segmentService, times(1)).upload(null, batch); + verify(segmentService, times(1)).upload(isNull(), eq(batch)); verify(callback) .failure( eq(trackMessage), @@ -872,7 +1166,9 @@ public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgu Collections.singletonList(callback), isShutDown, writeKey, - new Gson()); + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + DEFAULT_MAX_RATE_LIMIT_DURATION_MS); Map properties = new HashMap(); properties.put("property3", generateDataOfSizeSpecialChars(((int) (MAX_MSG_SIZE * 0.9)), true)); @@ -914,7 +1210,9 @@ public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgu Collections.singletonList(callback), isShutDown, writeKey, - new Gson()); + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + DEFAULT_MAX_RATE_LIMIT_DURATION_MS); Map properties = new HashMap(); properties.put("property3", generateDataOfSizeSpecialChars(MAX_MSG_SIZE, true)); @@ -949,7 +1247,9 @@ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedExcep Collections.singletonList(callback), isShutDown, writeKey, - new Gson()); + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + DEFAULT_MAX_RATE_LIMIT_DURATION_MS); Map properties = new HashMap(); properties.put("property3", generateDataOfSizeSpecialChars(1024 * 8, true)); @@ -966,4 +1266,475 @@ public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedExcep verify(networkExecutor, times(21)).submit(any(Runnable.class)); } + + @Test + public void rateLimitStateClearedOnSuccess() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response rateLimited = errorWithRetryAfter(429, "1"); + Response successResponse = Response.success(200, response); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimited)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // After 429 -> 200 success, rate-limit state should be cleared + assertThat(client.isRateLimited()).isFalse(); + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + } + + @Test + public void maxTotalBackoffDurationDropsBatch() { + // Use a very short maxTotalBackoffDuration (1ms) so it expires quickly + AnalyticsClient client = newClient(1L, DEFAULT_MAX_RATE_LIMIT_DURATION_MS); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + final int highMaxRetries = 1000; + // Return repeated 500s — should hit maxTotalBackoffDuration before maxRetries + when(segmentService.upload(isNull(), eq(batch))) + .thenAnswer( + new Answer>() { + public Call answer(InvocationOnMock invocation) { + Response failResponse = + Response.error(500, ResponseBody.create(null, "Server Error")); + return Calls.response(failResponse); + } + }); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, highMaxRetries); + batchUploadTask.run(); + + // Should have been dropped well before reaching 1001 attempts (maxRetries=1000). + verify(segmentService, org.mockito.Mockito.atLeast(2)).upload(isNull(), eq(batch)); + verify(segmentService, org.mockito.Mockito.atMost(highMaxRetries)).upload(isNull(), eq(batch)); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + + @Test + public void maxRateLimitDurationDropsBatch() { + // Use a very short maxRateLimitDuration (1ms) so it expires immediately + AnalyticsClient client = newClient(DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, 1L); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // Return repeated 429 with Retry-After + Response rateLimited = errorWithRetryAfter(429, "1"); + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimited)) + .thenReturn(Calls.response(rateLimited)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // After the first 429 sets rate-limit state, the second should exceed maxRateLimitDuration + // and the batch should be dropped + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + + @Test + public void rateLimitStateSetOn429() { + AnalyticsClient client = spy(newClient()); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response rateLimited = errorWithRetryAfter(429, "1"); + Response successResponse = Response.success(200, response); + + assertThat(client.isRateLimited()).isFalse(); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimited)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Verify setRateLimitStateAndCheckDuration was called (state was actually set on 429) + verify(client).setRateLimitStateAndCheckDuration(eq(1L), anyLong()); + assertThat(client.isRateLimited()).isFalse(); + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + } + + @Test + public void isRateLimitedAutoExpiryClears() { + AnalyticsClient client = newClient(); + + client.setRateLimitState(0); // waitUntil = now + 0 = now (already expired) + assertThat(client.isRateLimited()).isFalse(); // should auto-clear and return false + // Calling again confirms state was actually cleared (not just returning false) + assertThat(client.isRateLimited()).isFalse(); + } + + @Test + public void retryAfterZeroMaxRetriesExitsAfterOneAttempt() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response rateLimited = errorWithRetryAfter(429, "1"); + when(segmentService.upload(isNull(), eq(batch))).thenReturn(Calls.response(rateLimited)); + + // maxRetries=0 => maxBackoffAttempts=1 => should exit after first 429 without sleeping + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0); + long start = System.currentTimeMillis(); + batchUploadTask.run(); + long elapsed = System.currentTimeMillis() - start; + + verify(segmentService, times(1)).upload(isNull(), eq(batch)); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + // Should not have slept the 1-second Retry-After + assertThat(elapsed).isLessThan(1000L); + } + + @Test + public void parseRetryAfterParsesHttpDateInFuture() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // RFC 7231 HTTP-date format — 2 seconds in the future + java.time.ZonedDateTime futureDate = + java.time.ZonedDateTime.now(java.time.ZoneOffset.UTC).plusSeconds(2); + String httpDate = futureDate.format(java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME); + + Response rateLimitedHttpDate = errorWithRetryAfter(429, httpDate); + Response successResponse = Response.success(200, response); + + // maxRetries=1 => maxBackoffAttempts=2. We return two rate-limited responses then success. + // RATE_LIMITED retries don't count against maxRetries, so all 3 attempts complete. + // If it fell through to BACKOFF, the 2nd failure would exhaust retries and fail. + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimitedHttpDate)) + .thenReturn(Calls.response(rateLimitedHttpDate)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 1); + batchUploadTask.run(); + + verify(segmentService, times(3)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + } + + @Test + public void parseRetryAfterHttpDateInPastReturnsNull() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // RFC 7231 HTTP-date format — in the past + Response rateLimitedHttpDate = + errorWithRetryAfter(429, "Wed, 21 Oct 2015 07:28:00 GMT"); + Response successResponse = Response.success(200, response); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimitedHttpDate)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Date in the past returns null, so falls back to BACKOFF strategy + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + // Rate-limit state should never have been set (BACKOFF path doesn't set it) + assertThat(client.isRateLimited()).isFalse(); + } + + @Test + public void parseRetryAfterMalformedStringFallsBackToBackoff() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response malformedRetryAfter = errorWithRetryAfter(429, "not-a-date-or-number"); + Response successResponse = Response.success(200, response); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(malformedRetryAfter)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Unparseable header falls back to BACKOFF; task retries and succeeds + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + // BACKOFF path never sets rate-limit state + assertThat(client.isRateLimited()).isFalse(); + } + + @Test + public void retryableClientErrors410And460() { + for (int status : new int[] {410, 460}) { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response failResponse = + Response.error(status, ResponseBody.create(null, "Error")); + Response successResponse = Response.success(200, response); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(successResponse)); + + new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES).run(); + + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + } + } + + @Test + public void nonRetryable5xxErrors505And511() { + for (int status : new int[] {505, 511}) { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response failResponse = + Response.error(status, ResponseBody.create(null, "Error")); + + when(segmentService.upload(isNull(), eq(batch))).thenReturn(Calls.response(failResponse)); + + new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES).run(); + + verify(segmentService, times(1)).upload(isNull(), eq(batch)); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + } + + @Test + public void offerReturnsFalseWhenQueueFull() { + AnalyticsClient client = newClient(); + TrackMessage first = TrackMessage.builder("first").userId("bar").build(); + TrackMessage second = TrackMessage.builder("second").userId("bar").build(); + + // Stub the spy before the calls — doReturn avoids the eager real-method invocation + doReturn(false).when(messageQueue).offer(eq(second)); + + assertThat(client.offer(first)).isTrue(); + assertThat(client.offer(second)).isFalse(); + // No callback — caller is responsible for dead-lettering on false return + verify(callback, never()).failure(any(Message.class), any(Throwable.class)); + } + + @Test + public void offerTriggersFlushWhenByteBudgetExceeded() { + // 1-byte byte budget forces isBackPressuredAfterSize to trigger immediately + AnalyticsClient client = + new AnalyticsClient( + messageQueue, + null, + segmentService, + 50, + TimeUnit.HOURS.toMillis(1), + 0, + 1, + log, + threadFactory, + networkExecutor, + Collections.singletonList(callback), + isShutDown, + writeKey, + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + DEFAULT_MAX_RATE_LIMIT_DURATION_MS); + + TrackMessage message = TrackMessage.builder("foo").userId("bar").build(); + boolean result = client.offer(message); + + assertThat(result).isTrue(); + // offer() must have inserted POISON into the queue to trigger a flush + verify(messageQueue).offer(eq(FlushMessage.POISON)); + } + + @Test + public void nonRetryable4xxErrors400And401And403() { + for (int status : new int[] {400, 401, 403}) { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response failResponse = + Response.error(status, ResponseBody.create(null, "Error")); + + when(segmentService.upload(isNull(), eq(batch))).thenReturn(Calls.response(failResponse)); + + new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES).run(); + + verify(segmentService, times(1)).upload(isNull(), eq(batch)); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + } + + @Test + public void retryAfterOn529UsesRateLimitedPath() { + // 529 with Retry-After should use RATE_LIMITED strategy (does NOT count against maxRetries). + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response successResponse = Response.success(200, response); + + // maxRetries=1 => maxBackoffAttempts=2; but RATE_LIMITED does not consume backoff attempts, + // so the task keeps retrying until maxRateLimitDuration is exceeded or success. + // We return 529 twice then success to prove it doesn't exhaust maxRetries. + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(errorWithRetryAfter(529, "1"))) + .thenReturn(Calls.response(errorWithRetryAfter(529, "1"))) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 1); + batchUploadTask.run(); + + // 3 attempts: 2 rate-limited retries (not counted) + 1 success + verify(segmentService, times(3)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + // Rate-limit state is cleared on success + assertThat(client.isRateLimited()).isFalse(); + } + + @Test + public void retryAfterOn503UsesRateLimitedPath() { + // 503 with Retry-After should use RATE_LIMITED strategy (does NOT count against maxRetries). + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response serviceUnavailable = errorWithRetryAfter(503, "1"); + Response successResponse = Response.success(200, response); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(serviceUnavailable)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Should have retried via RATE_LIMITED and succeeded on the second attempt + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + // Rate-limit state is cleared on success + assertThat(client.isRateLimited()).isFalse(); + } + + @Test + public void offerReturnsFalseWhenShutdown() { + AnalyticsClient client = newClient(); + isShutDown.set(true); + TrackMessage message = TrackMessage.builder("foo").userId("bar").build(); + + assertThat(client.offer(message)).isFalse(); + } + + @Test + public void offerReturnsFalseForOversizedMessage() { + AnalyticsClient client = newClient(); + // Build a message larger than MSG_MAX_SIZE (32KB = 32768 bytes) + StringBuilder bigValue = new StringBuilder(); + for (int i = 0; i < 5000; i++) { + bigValue.append("12345678"); // 40000 chars — well over 32KB when JSON-serialized + } + Map props = new HashMap<>(); + props.put("big", bigValue.toString()); + TrackMessage oversized = TrackMessage.builder("foo").userId("bar").properties(props).build(); + + assertThat(client.offer(oversized)).isFalse(); + } + + @Test + public void ioExceptionDuringUploadRetriesWithBackoff() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response successResponse = Response.success(200, response); + + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.failure(new IOException("network timeout"))) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).success(trackMessage); + } + + @Test + public void retryAfterCappedAtMaxRateLimitedSeconds() { + // Server sends Retry-After: 9999, but parseRetryAfterSeconds caps it to 300. + // We use maxRateLimitDuration=1ms so the duration check trips after the first sleep. + // Flow: attempt 1 returns 429+Retry-After:9999 → capped to 300s → setRateLimitState → + // duration check passes (just started) → sleep(300) → attempt 2 → 429 again → + // setRateLimitState → duration now exceeds 1ms → batch dropped. + AnalyticsClient shortClient = + new AnalyticsClient( + messageQueue, + null, + segmentService, + 50, + TimeUnit.HOURS.toMillis(1), + DEFAULT_RETRIES, + MAX_BATCH_SIZE, + log, + threadFactory, + networkExecutor, + Collections.singletonList(callback), + isShutDown, + writeKey, + new Gson(), + DEFAULT_MAX_TOTAL_BACKOFF_DURATION_MS, + 1L); // 1ms maxRateLimitDuration + + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // Use Retry-After: 1 (small so the test doesn't sleep 300s) to verify the cap behavior + // indirectly — the key assertion is that maxRateLimitDuration kicks in. + Response rateLimited = errorWithRetryAfter(429, "1"); + when(segmentService.upload(isNull(), eq(batch))) + .thenReturn(Calls.response(rateLimited)) + .thenReturn(Calls.response(rateLimited)); + + BatchUploadTask batchUploadTask = + new BatchUploadTask(shortClient, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // 2 attempts: first one sleeps 1s, second one exceeds maxRateLimitDuration → dropped + verify(segmentService, times(2)).upload(isNull(), eq(batch)); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + + @Test + public void threadInterruptionDuringRetryAfterSleep() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response rateLimited = errorWithRetryAfter(429, "60"); + + when(segmentService.upload(isNull(), eq(batch))).thenReturn(Calls.response(rateLimited)); + + // Interrupt the thread before running — the sleep will throw immediately + Thread.currentThread().interrupt(); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Task should exit without exhausting retries; rate-limit state should be cleared + verify(segmentService, times(1)).upload(isNull(), eq(batch)); + assertThat(client.isRateLimited()).isFalse(); + // Interrupt flag should be preserved + assertThat(Thread.interrupted()).isTrue(); + } } diff --git a/e2e-cli/README.md b/e2e-cli/README.md index b319749e..4fc02485 100644 --- a/e2e-cli/README.md +++ b/e2e-cli/README.md @@ -4,15 +4,41 @@ E2E test CLI for the [analytics-java](https://github.com/segmentio/analytics-jav Built with Kotlin (JVM) and packaged as a fat jar via Maven. -## Setup +## Running E2E tests + +### With devbox (recommended) ```bash -mvn package -pl e2e-cli -am +# From repo root — activates Java 11 and Maven automatically +devbox shell + +# Then from e2e-cli dir: +./run-e2e.sh ``` -## Usage +### Without devbox + +Requires Java 11+, Maven, and Node.js 18+. ```bash +./run-e2e.sh +``` + +The script auto-detects `java` and `mvn` on PATH. If they're not on PATH but devbox has been initialized, it falls back to the devbox nix profile binaries automatically. + +### Override sdk-e2e-tests location + +```bash +E2E_TESTS_DIR=../my-e2e-tests ./run-e2e.sh +``` + +## Manual CLI usage + +```bash +# Build first (from repo root) +mvn package -pl e2e-cli -am -DskipTests + +# Run java -jar e2e-cli/target/e2e-cli-*-jar-with-dependencies.jar --input '{"writeKey":"...", ...}' ``` diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index db3d0167..c49eff78 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -1,6 +1,6 @@ { "sdk": "java", - "test_suites": "basic", + "test_suites": "basic,retry", "auto_settings": false, "patch": null, "env": {} diff --git a/e2e-cli/pom.xml b/e2e-cli/pom.xml index b5782b8f..1aea1231 100644 --- a/e2e-cli/pom.xml +++ b/e2e-cli/pom.xml @@ -17,52 +17,26 @@ E2E testing CLI for Segment Analytics for Java. - - org.jetbrains.kotlin - kotlin-stdlib - ${kotlin.version} - com.segment.analytics.java analytics ${project.version} - org.jetbrains.kotlinx - kotlinx-serialization-json - 1.4.1 + com.google.code.gson + gson - src/main/kotlin - - kotlin-maven-plugin - org.jetbrains.kotlin - ${kotlin.version} + org.apache.maven.plugins + maven-compiler-plugin - - kotlinx-serialization - + 8 + 8 - - - org.jetbrains.kotlin - kotlin-maven-serialization - ${kotlin.version} - - - - - compile - compile - - compile - - - org.apache.maven.plugins @@ -73,7 +47,7 @@ - cli.MainKt + cli.Main diff --git a/e2e-cli/run-e2e.sh b/e2e-cli/run-e2e.sh index 782df0e8..b8e99426 100755 --- a/e2e-cli/run-e2e.sh +++ b/e2e-cli/run-e2e.sh @@ -2,7 +2,9 @@ # # Run E2E tests for analytics-java # -# Prerequisites: Java 11+, Maven, Node.js 18+ +# Prerequisites: Node.js 18+ and one of: +# - devbox (recommended): run `devbox shell` first, then ./run-e2e.sh +# - Java 11+ and Maven on PATH # # Usage: # ./run-e2e.sh [extra args passed to run-tests.sh] @@ -17,11 +19,36 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" SDK_ROOT="$SCRIPT_DIR/.." E2E_DIR="${E2E_TESTS_DIR:-$SDK_ROOT/../sdk-e2e-tests}" +# Resolve java and mvn — prefer JAVA_HOME/devbox nix profile, fall back to PATH +if [[ -z "$JAVA" ]]; then + if command -v java &>/dev/null; then + JAVA="java" + elif [[ -f "$SDK_ROOT/.devbox/nix/profile/default/bin/java" ]]; then + JAVA="$SDK_ROOT/.devbox/nix/profile/default/bin/java" + else + echo "Error: java not found. Run 'devbox shell' first or install Java 11+." + exit 1 + fi +fi + +if [[ -z "$MVN" ]]; then + if command -v mvn &>/dev/null; then + MVN="mvn" + elif [[ -f "$SDK_ROOT/.devbox/nix/profile/default/bin/mvn" ]]; then + MVN="$SDK_ROOT/.devbox/nix/profile/default/bin/mvn" + else + echo "Error: mvn not found. Run 'devbox shell' first or install Maven." + exit 1 + fi +fi + echo "=== Building analytics-java e2e-cli ===" +echo "Using Java: $($JAVA -version 2>&1 | head -1)" +echo "Using Maven: $($MVN -version 2>&1 | head -1)" # Build SDK and e2e-cli cd "$SDK_ROOT" -mvn package -pl e2e-cli -am -DskipTests +$MVN package -pl e2e-cli -am -DskipTests # Find the built jar CLI_JAR=$(find "$SDK_ROOT/e2e-cli/target" -name "e2e-cli-*-jar-with-dependencies.jar" | head -1) @@ -37,5 +64,5 @@ echo "" cd "$E2E_DIR" ./scripts/run-tests.sh \ --sdk-dir "$SCRIPT_DIR" \ - --cli "java -jar $CLI_JAR" \ + --cli "$JAVA -jar $CLI_JAR" \ "$@" diff --git a/e2e-cli/src/main/java/cli/Main.java b/e2e-cli/src/main/java/cli/Main.java new file mode 100644 index 00000000..7c10646d --- /dev/null +++ b/e2e-cli/src/main/java/cli/Main.java @@ -0,0 +1,203 @@ +package cli; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import com.segment.analytics.Analytics; +import com.segment.analytics.Callback; +import com.segment.analytics.messages.*; + +import java.lang.reflect.Type; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class Main { + + private static final Gson gson = new Gson(); + private static final int DEFAULT_MAX_RETRIES = 1000; + + public static void main(String[] args) { + Map output; + try { + output = run(args); + } catch (Exception e) { + output = errorOutput(e.getMessage()); + } + System.out.println(gson.toJson(output)); + } + + @SuppressWarnings("unchecked") + private static Map run(String[] args) throws Exception { + int inputIndex = indexOf(args, "--input"); + if (inputIndex == -1 || inputIndex + 1 >= args.length) { + throw new IllegalArgumentException("Missing required --input argument"); + } + + String inputJson = args[inputIndex + 1]; + Type mapType = new TypeToken>() {}.getType(); + Map input = gson.fromJson(inputJson, mapType); + + String writeKey = (String) input.get("writeKey"); + String apiHost = (String) input.get("apiHost"); + Map config = (Map) input.getOrDefault("config", Collections.emptyMap()); + List> sequences = (List>) input.get("sequences"); + + int flushAt = intVal(config, "flushAt", 20); + long flushIntervalMs = longVal(config, "flushInterval", 10000L); + int maxRetries = intVal(config, "maxRetries", DEFAULT_MAX_RETRIES); + + AtomicBoolean hasError = new AtomicBoolean(false); + AtomicReference errorMessage = new AtomicReference<>(); + + Analytics analytics = Analytics.builder(writeKey) + .endpoint(apiHost) + .flushQueueSize(flushAt) + .flushInterval(Math.max(flushIntervalMs, 1000L), TimeUnit.MILLISECONDS) + .retries(maxRetries) + .callback(new Callback() { + @Override + public void success(Message message) { + } + + @Override + public void failure(Message message, Throwable throwable) { + hasError.set(true); + errorMessage.set(throwable != null ? throwable.getMessage() : "unknown error"); + } + }) + .build(); + + for (Map seq : sequences) { + long delayMs = longVal(seq, "delayMs", 0L); + if (delayMs > 0) { + Thread.sleep(delayMs); + } + List> events = (List>) seq.get("events"); + if (events != null) { + for (Map event : events) { + sendEvent(analytics, event); + } + } + } + + analytics.flush(); + analytics.shutdown(); + + if (hasError.get()) { + return errorOutput(errorMessage.get()); + } + + Map result = new LinkedHashMap<>(); + result.put("success", true); + result.put("sentBatches", 1); + return result; + } + + @SuppressWarnings("unchecked") + private static void sendEvent(Analytics analytics, Map event) { + String type = (String) event.get("type"); + if (type == null) { + throw new IllegalArgumentException("Event missing 'type' field"); + } + + String userId = strVal(event, "userId", ""); + String anonymousId = (String) event.get("anonymousId"); + String messageId = (String) event.get("messageId"); + String timestamp = (String) event.get("timestamp"); + Map traits = (Map) event.getOrDefault("traits", Collections.emptyMap()); + Map properties = (Map) event.getOrDefault("properties", Collections.emptyMap()); + String eventName = (String) event.get("event"); + String name = (String) event.get("name"); + String groupId = (String) event.get("groupId"); + String previousId = (String) event.get("previousId"); + Map context = (Map) event.get("context"); + Map integrations = (Map) event.get("integrations"); + + MessageBuilder builder; + switch (type) { + case "identify": + builder = IdentifyMessage.builder().traits(traits); + break; + case "track": + builder = TrackMessage.builder(eventName != null ? eventName : "Unknown Event").properties(properties); + break; + case "page": + builder = PageMessage.builder(name != null ? name : "Unknown Page").properties(properties); + break; + case "screen": + builder = ScreenMessage.builder(name != null ? name : "Unknown Screen").properties(properties); + break; + case "alias": + builder = AliasMessage.builder(previousId != null ? previousId : ""); + break; + case "group": + builder = GroupMessage.builder(groupId != null ? groupId : "").traits(traits); + break; + default: + throw new IllegalArgumentException("Unknown event type: " + type); + } + + if (!userId.isEmpty()) { + builder.userId(userId); + } + if (anonymousId != null) { + builder.anonymousId(anonymousId); + } + if (messageId != null) { + builder.messageId(messageId); + } + if (timestamp != null) { + builder.timestamp(Date.from(Instant.parse(timestamp))); + } + if (context != null) { + builder.context(context); + } + if (integrations != null) { + for (Map.Entry entry : integrations.entrySet()) { + Object value = entry.getValue(); + if (value instanceof Boolean) { + builder.enableIntegration(entry.getKey(), (Boolean) value); + } else if (value instanceof Map) { + builder.integrationOptions(entry.getKey(), (Map) value); + } + } + } + + analytics.enqueue(builder); + } + + private static Map errorOutput(String error) { + Map result = new LinkedHashMap<>(); + result.put("success", false); + result.put("error", error); + result.put("sentBatches", 0); + return result; + } + + private static int indexOf(String[] arr, String target) { + for (int i = 0; i < arr.length; i++) { + if (target.equals(arr[i])) return i; + } + return -1; + } + + private static int intVal(Map map, String key, int defaultVal) { + Object v = map.get(key); + if (v instanceof Number) return ((Number) v).intValue(); + return defaultVal; + } + + private static long longVal(Map map, String key, long defaultVal) { + Object v = map.get(key); + if (v instanceof Number) return ((Number) v).longValue(); + return defaultVal; + } + + private static String strVal(Map map, String key, String defaultVal) { + Object v = map.get(key); + if (v instanceof String) return (String) v; + return defaultVal; + } +} diff --git a/e2e-cli/src/main/kotlin/cli/Main.kt b/e2e-cli/src/main/kotlin/cli/Main.kt deleted file mode 100644 index b4577615..00000000 --- a/e2e-cli/src/main/kotlin/cli/Main.kt +++ /dev/null @@ -1,189 +0,0 @@ -package cli - -import com.google.gson.Gson -import com.google.gson.reflect.TypeToken -import com.segment.analytics.Analytics -import com.segment.analytics.Callback -import com.segment.analytics.messages.* -import java.time.Instant -import java.util.Date -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean - -data class CLIOutput( - val success: Boolean, - val error: String? = null, - val sentBatches: Int = 0 -) - -data class CLIConfig( - val flushAt: Int? = null, - val flushInterval: Long? = null, - val maxRetries: Int? = null, - val timeout: Int? = null -) - -data class EventSequence( - val delayMs: Long = 0, - val events: List> -) - -data class CLIInput( - val writeKey: String, - val apiHost: String, - val sequences: List, - val config: CLIConfig? = null -) - -private val gson = Gson() - -fun main(args: Array) { - var output = CLIOutput(success = false, error = "Unknown error") - - try { - // Parse --input argument - val inputIndex = args.indexOf("--input") - if (inputIndex == -1 || inputIndex + 1 >= args.size) { - throw IllegalArgumentException("Missing required --input argument") - } - - val inputJson = args[inputIndex + 1] - val input = gson.fromJson(inputJson, CLIInput::class.java) - - val flushAt = input.config?.flushAt ?: 20 - val flushIntervalMs = input.config?.flushInterval ?: 10000L - - val flushLatch = CountDownLatch(1) - val hasError = AtomicBoolean(false) - var errorMessage: String? = null - - val analytics = Analytics.builder(input.writeKey) - .endpoint(input.apiHost) - .flushQueueSize(flushAt) - .flushInterval(maxOf(flushIntervalMs, 1000L), TimeUnit.MILLISECONDS) - .callback(object : Callback { - override fun success(message: Message?) { - // Event sent successfully - } - - override fun failure(message: Message?, throwable: Throwable?) { - hasError.set(true) - errorMessage = throwable?.message - } - }) - .build() - - // Process event sequences - for (seq in input.sequences) { - if (seq.delayMs > 0) { - Thread.sleep(seq.delayMs) - } - - for (event in seq.events) { - sendEvent(analytics, event) - } - } - - // Flush and shutdown - analytics.flush() - analytics.shutdown() - - output = if (hasError.get()) { - CLIOutput(success = false, error = errorMessage, sentBatches = 0) - } else { - CLIOutput(success = true, sentBatches = 1) - } - - } catch (e: Exception) { - output = CLIOutput(success = false, error = e.message ?: e.toString()) - } - - println(gson.toJson(output)) -} - -fun sendEvent(analytics: Analytics, event: Map) { - val type = event["type"] as? String - ?: throw IllegalArgumentException("Event missing 'type' field") - - val userId = event["userId"] as? String ?: "" - val anonymousId = event["anonymousId"] as? String - val messageId = event["messageId"] as? String - val timestamp = event["timestamp"] as? String - @Suppress("UNCHECKED_CAST") - val traits = event["traits"] as? Map ?: emptyMap() - @Suppress("UNCHECKED_CAST") - val properties = event["properties"] as? Map ?: emptyMap() - val eventName = event["event"] as? String - val name = event["name"] as? String - val category = event["category"] as? String - val groupId = event["groupId"] as? String - val previousId = event["previousId"] as? String - @Suppress("UNCHECKED_CAST") - val context = event["context"] as? Map - @Suppress("UNCHECKED_CAST") - val integrations = event["integrations"] as? Map - - val messageBuilder: MessageBuilder<*, *> = when (type) { - "identify" -> { - IdentifyMessage.builder().apply { - traits(traits) - } - } - "track" -> { - TrackMessage.builder(eventName ?: "Unknown Event").apply { - properties(properties) - } - } - "page" -> { - PageMessage.builder(name ?: "Unknown Page").apply { - properties(properties) - } - } - "screen" -> { - ScreenMessage.builder(name ?: "Unknown Screen").apply { - properties(properties) - } - } - "alias" -> { - AliasMessage.builder(previousId ?: "") - } - "group" -> { - GroupMessage.builder(groupId ?: "").apply { - traits(traits) - } - } - else -> throw IllegalArgumentException("Unknown event type: $type") - } - - if (userId.isNotEmpty()) { - messageBuilder.userId(userId) - } - if (anonymousId != null) { - messageBuilder.anonymousId(anonymousId) - } - if (messageId != null) { - messageBuilder.messageId(messageId) - } - if (timestamp != null) { - messageBuilder.timestamp(parseTimestamp(timestamp)) - } - if (context != null) { - messageBuilder.context(context) - } - if (integrations != null) { - for ((key, value) in integrations) { - when (value) { - is Boolean -> messageBuilder.enableIntegration(key, value) - is Map<*, *> -> @Suppress("UNCHECKED_CAST") - messageBuilder.integrationOptions(key, value as Map) - } - } - } - - analytics.enqueue(messageBuilder) -} - -private fun parseTimestamp(iso: String): Date { - return Date.from(Instant.parse(iso)) -}