diff --git a/CHANGES.md b/CHANGES.md index 0bf6869120..8136757432 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -26,6 +26,7 @@ Release Notes. * Add unified release script (`tools/releasing/release.sh`) with two-step flow: `prepare-vote` and `vote-passed`. * Fix an issue where `JDBCPluginConfig.Plugin.JDBC.SQL_BODY_MAX_LENGTH` was not honored by clickhouse-0.3.1 and clickhouse-0.3.2.x plugins. - Add tracing support for vector-store retrieval operations. +* Fix agent lifecycle events: the Start event now carries the service instance name, and the Shutdown event is delivered on graceful JVM exit. `ServiceManager` prepares/starts higher-priority `BootService`s first and shuts them down last (matching `BootService#priority()`), and the shutdown event refreshes its gRPC deadline before sending. All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/249?closed=1) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java index 015596ad26..222a1d8221 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java @@ -50,7 +50,7 @@ public void boot() { } public void shutdown() { - bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> { + bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> { try { service.shutdown(); } catch (Throwable e) { @@ -103,7 +103,7 @@ private Map loadAllServices() { } private void prepare() { - bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> { + bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> { try { service.prepare(); } catch (Throwable e) { @@ -113,7 +113,7 @@ private void prepare() { } private void startup() { - bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service -> { + bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service -> { try { service.boot(); } catch (Throwable e) { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/EventReportServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/EventReportServiceClient.java index 73860707fd..475405c726 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/EventReportServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/EventReportServiceClient.java @@ -50,11 +50,13 @@ public class EventReportServiceClient implements BootService, GRPCChannelListene private final AtomicBoolean reported = new AtomicBoolean(); + private volatile boolean bootCompleted; + private Event.Builder startingEvent; - private EventServiceGrpc.EventServiceStub eventServiceStub; + private volatile EventServiceGrpc.EventServiceStub eventServiceStub; - private GRPCChannelStatus status; + private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT; @Override public void prepare() throws Throwable { @@ -91,6 +93,7 @@ public void boot() throws Throwable { @Override public void onComplete() throws Throwable { startingEvent.setEndTime(System.currentTimeMillis()); + bootCompleted = true; reportStartingEvent(); } @@ -117,7 +120,8 @@ public void shutdown() throws Throwable { ) .setLayer(EVENT_LAYER_NAME); - final StreamObserver collector = eventServiceStub.collect(new StreamObserver() { + final EventServiceGrpc.EventServiceStub stub = eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS); + final StreamObserver collector = stub.collect(new StreamObserver() { @Override public void onNext(final Commands commands) { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); @@ -143,25 +147,27 @@ public void onCompleted() { @Override public void statusChanged(final GRPCChannelStatus status) { + if (CONNECTED.equals(status)) { + final Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); + eventServiceStub = EventServiceGrpc.newStub(channel); + } this.status = status; - if (!CONNECTED.equals(status)) { - return; + if (CONNECTED.equals(status)) { + reportStartingEvent(); } - - final Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel(); - eventServiceStub = EventServiceGrpc.newStub(channel); - eventServiceStub = eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS); - - reportStartingEvent(); } private void reportStartingEvent() { - if (reported.compareAndSet(false, true)) { + if (!bootCompleted || !CONNECTED.equals(status)) { + return; + } + if (!reported.compareAndSet(false, true)) { return; } - final StreamObserver collector = eventServiceStub.collect(new StreamObserver() { + final EventServiceGrpc.EventServiceStub stub = eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS); + final StreamObserver collector = stub.collect(new StreamObserver() { @Override public void onNext(final Commands commands) { ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands); diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerOrderingTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerOrderingTest.java new file mode 100644 index 0000000000..b232c5540c --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerOrderingTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.boot; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.junit.After; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class ServiceManagerOrderingTest { + + private final List prepareOrder = new ArrayList<>(); + private final List bootOrder = new ArrayList<>(); + private final List shutdownOrder = new ArrayList<>(); + + private class RecordingService implements BootService { + private final String name; + private final int priority; + + private RecordingService(String name, int priority) { + this.name = name; + this.priority = priority; + } + + @Override + public void prepare() { + prepareOrder.add(name); + } + + @Override + public void boot() { + bootOrder.add(name); + } + + @Override + public void onComplete() { + } + + @Override + public void shutdown() { + shutdownOrder.add(name); + } + + @Override + public int priority() { + return priority; + } + } + + @After + public void tearDown() throws Exception { + setBootedServices(new LinkedHashMap<>()); + } + + @Test + public void higherPriorityBootsFirstAndShutsDownLast() throws Exception { + Map services = new LinkedHashMap<>(); + services.put(Integer.class, new RecordingService("low", 0)); + services.put(Long.class, new RecordingService("high", Integer.MAX_VALUE)); + services.put(Short.class, new RecordingService("mid", 100)); + setBootedServices(services); + + invoke("prepare"); + invoke("startup"); + ServiceManager.INSTANCE.shutdown(); + + assertThat(prepareOrder, is(Arrays.asList("high", "mid", "low"))); + assertThat(bootOrder, is(Arrays.asList("high", "mid", "low"))); + assertThat(shutdownOrder, is(Arrays.asList("low", "mid", "high"))); + } + + private void setBootedServices(Map services) throws Exception { + Field field = ServiceManager.class.getDeclaredField("bootedServices"); + field.setAccessible(true); + field.set(ServiceManager.INSTANCE, services); + } + + private void invoke(String method) throws Exception { + Method m = ServiceManager.class.getDeclaredMethod(method); + m.setAccessible(true); + m.invoke(ServiceManager.INSTANCE); + } +} diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index 31dc8482f1..d8dafc0438 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -44,12 +44,10 @@ import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; -import org.apache.skywalking.apm.agent.core.boot.ServiceManager; import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig.Plugin.Kafka; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import org.apache.skywalking.apm.agent.core.plugin.loader.AgentClassLoader; -import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; import org.apache.skywalking.apm.util.StringUtil; @@ -183,14 +181,10 @@ public final KafkaProducer getProducer() { return producer; } - /** - * make kafka producer init later but before {@link GRPCChannelManager} - * - * @return priority value - */ + // Higher than the Kafka reporters sharing this producer, so the producer closes only after they stop. @Override public int priority() { - return ServiceManager.INSTANCE.findService(GRPCChannelManager.class).priority() - 1; + return 1; } @Override diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java index 3a6b7c2d49..ed3c593f5e 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java @@ -19,6 +19,7 @@ package org.apache.skywalking.apm.agent.core.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.lang.reflect.Method; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +47,11 @@ public void testAddListener() throws Exception { assertEquals(counter.get(), times); } + @Test + public void outranksKafkaReportersSoProducerClosesLast() { + assertTrue(new KafkaProducerManager().priority() > new KafkaTraceSegmentServiceClient().priority()); + } + @Test public void testFormatTopicNameThenRegister() { KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();