Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 4908b2c

Browse files
amarzialimcculls
andauthored
Make webflux compatible with latest spring 6 (DataDog#6352)
* Make webflux compatible with latest spring 6 * webflux refactoring * enable java 17 latest dep * Update dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy Co-authored-by: Stuart McCulloch <stuart.mcculloch@datadoghq.com> * unformat --------- Co-authored-by: Stuart McCulloch <stuart.mcculloch@datadoghq.com>
1 parent 30789d4 commit 4908b2c

File tree

28 files changed

+328
-405
lines changed

28 files changed

+328
-405
lines changed

‎.circleci/config.continue.yml.j2

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,20 @@ build_test_jobs: &build_test_jobs
10921092
maxWorkers: 4
10931093
testJvm: "8"
10941094
1095+
- xlarge_tests:
1096+
requires:
1097+
- ok_to_test
1098+
- build_latestdep
1099+
name: test_17_inst_latest
1100+
gradleTarget: ":instrumentationLatestDepTest"
1101+
gradleParameters: "-PskipFlakyTests"
1102+
triggeredBy: *instrumentation_modules
1103+
stage: instrumentation
1104+
cacheType: latestdep
1105+
parallelism: 4
1106+
maxWorkers: 4
1107+
testJvm: "17"
1108+
10951109
{% if flaky %}
10961110
- tests:
10971111
requires:

‎dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@
258258
0 org.springframework.beans.factory.support.DefaultListableBeanFactory
259259
0 org.springframework.beans.factory.support.DisposableBeanAdapter
260260
1 org.springframework.boot.SpringApplicationShutdownHook$Handlers
261+
1 org.springframework.boot.autoconfigure.ssl.FileWatcher$WatcherThread
261262
2 org.springframework.boot.*
262263
0 org.apache.xalan.transformer.TransformerImpl
263264
# More runnables to deal with

‎dd-java-agent/instrumentation/spring-webflux-5/build.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ muzzle {
4848
module = "spring-webflux"
4949
versions = "[5.1.1.RELEASE,6)"
5050
}
51+
pass {
52+
name = "webflux_6"
53+
group = "org.springframework"
54+
module = "spring-webflux"
55+
versions = "[6,)"
56+
javaVersion = "17"
57+
extraDependency "io.projectreactor:reactor-core:3.6.0"
58+
}
5159
}
5260

5361
apply from: "$rootDir/gradle/java.gradle"

‎dd-java-agent/instrumentation/spring-webflux-5/src/bootTest/groovy/SpringWebfluxTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ class SpringWebfluxTest extends AgentTestRunner {
198198
resourceName "TestController.tracedMethod"
199199
operationName "trace.annotation"
200200
}
201-
childOf(span(0)) // FIXME this is wrong
201+
childOfPrevious()
202202
errored false
203203
tags {
204204
"$Tags.COMPONENT" "trace"

‎dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientDecorator.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
44
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
55
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
6-
import java.lang.invoke.MethodHandle;
7-
import java.lang.invoke.MethodHandles;
8-
import java.lang.invoke.MethodType;
96
import java.net.URI;
107
import org.springframework.web.reactive.function.client.ClientRequest;
118
import org.springframework.web.reactive.function.client.ClientResponse;
@@ -18,8 +15,6 @@ public class SpringWebfluxHttpClientDecorator
1815
public static final CharSequence CANCELLED_MESSAGE =
1916
UTF8BytesString.create("The subscription was cancelled");
2017

21-
private static final MethodHandle RAW_STATUS_CODE = findRawStatusCode();
22-
2318
public static final SpringWebfluxHttpClientDecorator DECORATE =
2419
new SpringWebfluxHttpClientDecorator();
2520

@@ -52,13 +47,8 @@ protected URI url(final ClientRequest httpRequest) {
5247

5348
@Override
5449
protected int status(final ClientResponse httpResponse) {
55-
if (null != RAW_STATUS_CODE) {
56-
try {
57-
return (int) RAW_STATUS_CODE.invokeExact(httpResponse);
58-
} catch (Throwable ignored) {
59-
}
60-
}
61-
return httpResponse.statusCode().value();
50+
final Integer code = StatusCodes.STATUS_CODE_FUNCTION.apply(httpResponse);
51+
return code != null ? code : 0;
6252
}
6353

6454
@Override
@@ -70,13 +60,4 @@ protected String getRequestHeader(ClientRequest request, String headerName) {
7060
protected String getResponseHeader(ClientResponse response, String headerName) {
7161
return response.headers().asHttpHeaders().getFirst(headerName);
7262
}
73-
74-
private static MethodHandle findRawStatusCode() {
75-
try {
76-
return MethodHandles.publicLookup()
77-
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
78-
} catch (IllegalAccessException | NoSuchMethodException e) {
79-
return null;
80-
}
81-
}
8263
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package datadog.trace.instrumentation.springwebflux.client;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import java.lang.invoke.MethodHandles;
5+
import java.lang.invoke.MethodType;
6+
import java.util.function.Function;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.http.HttpStatus;
10+
import org.springframework.web.reactive.function.client.ClientResponse;
11+
12+
// Adapted from Opentelemetry
13+
public final class StatusCodes {
14+
private static final Logger LOGGER = LoggerFactory.getLogger(StatusCodes.class);
15+
public static final Function<ClientResponse, Integer> STATUS_CODE_FUNCTION =
16+
getStatusCodeFunction();
17+
18+
private static Function<ClientResponse, Integer> getStatusCodeFunction() {
19+
Function<ClientResponse, Integer> statusCodeFunction = getStatusCodeFunction60();
20+
if (statusCodeFunction == null) {
21+
statusCodeFunction = getStatusCodeFunction51();
22+
}
23+
if (statusCodeFunction == null) {
24+
statusCodeFunction = getStatusCodeFunction50();
25+
}
26+
if (statusCodeFunction == null) {
27+
LOGGER.debug(
28+
"Unable to find a status code extractor function working for the current webflux client version. "
29+
+ "Status codes will not be tagged on webflux client spans");
30+
}
31+
return statusCodeFunction;
32+
}
33+
34+
// in webflux 6.0, HttpStatusCode class was introduced, and statusCode() was changed to return
35+
// HttpStatusCode instead of HttpStatus
36+
private static Function<ClientResponse, Integer> getStatusCodeFunction60() {
37+
MethodHandle statusCode;
38+
MethodHandle value;
39+
try {
40+
Class<?> httpStatusCodeClass =
41+
Class.forName(
42+
"org.springframework.http.HttpStatusCode", false, StatusCodes.class.getClassLoader());
43+
statusCode =
44+
MethodHandles.publicLookup()
45+
.findVirtual(
46+
ClientResponse.class, "statusCode", MethodType.methodType(httpStatusCodeClass));
47+
value =
48+
MethodHandles.publicLookup()
49+
.findVirtual(httpStatusCodeClass, "value", MethodType.methodType(int.class));
50+
} catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException e) {
51+
return null;
52+
}
53+
54+
return response -> {
55+
try {
56+
Object httpStatusCode = statusCode.invoke(response);
57+
return (int) value.invoke(httpStatusCode);
58+
} catch (Throwable e) {
59+
return null;
60+
}
61+
};
62+
}
63+
64+
// in webflux 5.1, rawStatusCode() was introduced to retrieve the exact status code
65+
// note: rawStatusCode() was deprecated in 6.0
66+
private static Function<ClientResponse, Integer> getStatusCodeFunction51() {
67+
MethodHandle rawStatusCode;
68+
try {
69+
rawStatusCode =
70+
MethodHandles.publicLookup()
71+
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
72+
} catch (IllegalAccessException | NoSuchMethodException e) {
73+
return null;
74+
}
75+
76+
return response -> {
77+
try {
78+
return (int) rawStatusCode.invoke(response);
79+
} catch (Throwable e) {
80+
return null;
81+
}
82+
};
83+
}
84+
85+
// in webflux 5.0, statusCode() returns HttpStatus, which only represents standard status codes
86+
// (there's no way to capture arbitrary status codes)
87+
private static Function<ClientResponse, Integer> getStatusCodeFunction50() {
88+
MethodHandle statusCode;
89+
MethodHandle value;
90+
try {
91+
statusCode =
92+
MethodHandles.publicLookup()
93+
.findVirtual(
94+
ClientResponse.class, "statusCode", MethodType.methodType(HttpStatus.class));
95+
value =
96+
MethodHandles.publicLookup()
97+
.findVirtual(HttpStatus.class, "value", MethodType.methodType(int.class));
98+
} catch (IllegalAccessException | NoSuchMethodException e) {
99+
return null;
100+
}
101+
102+
return response -> {
103+
try {
104+
Object httpStatusCode = statusCode.invoke(response);
105+
return (int) value.invoke(httpStatusCode);
106+
} catch (Throwable e) {
107+
return null;
108+
}
109+
};
110+
}
111+
112+
private StatusCodes() {}
113+
}

‎dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/client/WebClientFilterInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public WebClientFilterInstrumentation() {
2020
public String[] helperClassNames() {
2121
return new String[] {
2222
packageName + ".SpringWebfluxHttpClientDecorator",
23+
packageName + ".StatusCodes",
2324
packageName + ".TraceWebClientSubscriber",
2425
packageName + ".WebClientTracingFilter",
2526
packageName + ".WebClientTracingFilter$MonoWebClientTrace",

‎dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/server/AbstractWebfluxInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public String[] helperClassNames() {
1313
return new String[] {
1414
packageName + ".SpringWebfluxHttpServerDecorator",
1515
packageName + ".AdviceUtils",
16+
packageName + ".AdviceUtils$SpanSubscriber",
1617
packageName + ".AdviceUtils$SpanFinishingSubscriber",
1718
packageName + ".RouteOnSuccessOrError"
1819
};

‎dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/server/AdviceUtils.java

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ public static <T> Mono<T> setPublisherSpan(Mono<T> mono, AgentSpan span) {
4343
return mono.<T>transform(finishSpanNextOrError(span));
4444
}
4545

46+
public static <T> Mono<T> wrapMonoWithScope(Mono<T> mono, AgentSpan span) {
47+
return mono.<T>transform(wrapPublisher(span));
48+
}
49+
4650
/**
4751
* Idea for this has been lifted from https://github.com/reactor/reactor-core/issues/947. Newer
4852
* versions of reactor-core have easier way to access context but we want to support older
@@ -54,23 +58,18 @@ public static <T> Mono<T> setPublisherSpan(Mono<T> mono, AgentSpan span) {
5458
(scannable, subscriber) -> new SpanFinishingSubscriber<>(subscriber, span));
5559
}
5660

57-
/**
58-
* This makes sure any callback is wrapped in suspend/resume checkpoints. Otherwise, we may end up
59-
* executing these callbacks in different threads without being resumed first.
60-
*/
61-
private static final class SpanFinishingSubscriber<T> implements CoreSubscriber<T>, Subscription {
61+
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> wrapPublisher(
62+
AgentSpan span) {
63+
return Operators.lift((scannable, subscriber) -> new SpanSubscriber<>(subscriber, span));
64+
}
6265

66+
private static class SpanSubscriber<T> implements CoreSubscriber<T>, Subscription {
6367
private final CoreSubscriber<? super T> subscriber;
64-
private final AgentSpan span;
68+
protected final AgentSpan span;
6569
private final Context context;
6670
private volatile Subscription subscription;
67-
private volatile int completed;
6871

69-
@SuppressWarnings("rawtypes")
70-
private static final AtomicIntegerFieldUpdater<SpanFinishingSubscriber> COMPLETED =
71-
AtomicIntegerFieldUpdater.newUpdater(SpanFinishingSubscriber.class, "completed");
72-
73-
public SpanFinishingSubscriber(CoreSubscriber<? super T> subscriber, AgentSpan span) {
72+
public SpanSubscriber(CoreSubscriber<? super T> subscriber, AgentSpan span) {
7473
this.subscriber = subscriber;
7574
this.span = span;
7675
this.context = subscriber.currentContext().put(AgentSpan.class, span);
@@ -93,19 +92,11 @@ public void onNext(T t) {
9392

9493
@Override
9594
public void onError(Throwable t) {
96-
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
97-
span.setError(true);
98-
span.addThrowable(t);
99-
span.finish();
100-
}
10195
subscriber.onError(t);
10296
}
10397

10498
@Override
10599
public void onComplete() {
106-
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
107-
span.finish();
108-
}
109100
subscriber.onComplete();
110101
}
111102

@@ -119,12 +110,51 @@ public void request(long n) {
119110
subscription.request(n);
120111
}
121112

113+
@Override
114+
public void cancel() {
115+
subscription.cancel();
116+
}
117+
}
118+
119+
/**
120+
* This makes sure any callback is wrapped in suspend/resume checkpoints. Otherwise, we may end up
121+
* executing these callbacks in different threads without being resumed first.
122+
*/
123+
private static final class SpanFinishingSubscriber<T> extends SpanSubscriber<T> {
124+
private volatile int completed;
125+
126+
@SuppressWarnings("rawtypes")
127+
private static final AtomicIntegerFieldUpdater<SpanFinishingSubscriber> COMPLETED =
128+
AtomicIntegerFieldUpdater.newUpdater(SpanFinishingSubscriber.class, "completed");
129+
130+
public SpanFinishingSubscriber(CoreSubscriber<? super T> subscriber, AgentSpan span) {
131+
super(subscriber, span);
132+
}
133+
134+
@Override
135+
public void onError(Throwable t) {
136+
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
137+
span.setError(true);
138+
span.addThrowable(t);
139+
span.finish();
140+
}
141+
super.onError(t);
142+
}
143+
144+
@Override
145+
public void onComplete() {
146+
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
147+
span.finish();
148+
}
149+
super.onComplete();
150+
}
151+
122152
@Override
123153
public void cancel() {
124154
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
125155
span.finish();
126156
}
127-
subscription.cancel();
157+
super.cancel();
128158
}
129159
}
130160
}

‎dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,10 @@ public void adviceTransformations(AdviceTransformation transformation) {
2828
.and(takesArguments(1)),
2929
// Cannot reference class directly here because it would lead to class load failure on Java7
3030
packageName + ".DispatcherHandlerAdvice");
31+
transformation.applyAdvice(
32+
isMethod()
33+
.and(named("handleResult"))
34+
.and(takesArgument(0, named("org.springframework.web.server.ServerWebExchange"))),
35+
packageName + ".HandleResultAdvice");
3136
}
3237
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /