Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 163ba8a

Browse files
Enforce new ingestion reason for spark traces (DataDog#6310)
Enforce new ingestion reason for spark traces # Motivation It is critical to keep all spark traces as customers closely monitor job runs. The new ingestion reason will allow tracking of ingested bytes for billing # Additional Notes Added the method AgentSpan setSamplingPriority(final int newPriority, int samplingMechanism) in the AgentSpan interface so that it can be called from an instrumentation
1 parent 00358aa commit 163ba8a

File tree

8 files changed

+51
-1
lines changed

8 files changed

+51
-1
lines changed

‎dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import datadog.trace.api.Config;
66
import datadog.trace.api.DDTags;
77
import datadog.trace.api.DDTraceId;
8+
import datadog.trace.api.sampling.PrioritySampling;
9+
import datadog.trace.api.sampling.SamplingMechanism;
810
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
911
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
1012
import de.thetaphi.forbiddenapis.SuppressForbidden;
@@ -135,6 +137,7 @@ private void initApplicationSpanIfNotInitialized() {
135137
captureApplicationParameters(builder);
136138

137139
applicationSpan = builder.start();
140+
setDataJobsSamplingPriority(applicationSpan);
138141
applicationSpan.setMeasured(true);
139142
}
140143

@@ -203,6 +206,7 @@ private AgentSpan getOrCreateStreamingBatchSpan(
203206
}
204207

205208
batchSpan = builder.start();
209+
setDataJobsSamplingPriority(batchSpan);
206210
streamingBatchSpans.put(batchKey, batchSpan);
207211
return batchSpan;
208212
}
@@ -267,6 +271,7 @@ private AgentSpan getOrCreateSqlSpan(
267271
}
268272

269273
AgentSpan sqlSpan = spanBuilder.start();
274+
setDataJobsSamplingPriority(sqlSpan);
270275
sqlSpans.put(sqlExecutionId, sqlSpan);
271276
return sqlSpan;
272277
}
@@ -321,6 +326,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
321326
captureJobParameters(jobSpanBuilder, jobStart.properties());
322327

323328
AgentSpan jobSpan = jobSpanBuilder.start();
329+
setDataJobsSamplingPriority(jobSpan);
324330
jobSpan.setMeasured(true);
325331

326332
for (int stageId : getSparkJobStageIds(jobStart)) {
@@ -404,6 +410,7 @@ public synchronized void onStageSubmitted(SparkListenerStageSubmitted stageSubmi
404410
.withTag(DDTags.RESOURCE_NAME, stageSubmitted.stageInfo().name())
405411
.start();
406412

413+
setDataJobsSamplingPriority(stageSpan);
407414
stageSpan.setMeasured(true);
408415

409416
stageSpans.put(stageSpanKey(stageId, stageAttemptId), stageSpan);
@@ -551,6 +558,7 @@ private void sendTaskSpan(
551558
taskSpan.setTag("count_towards_task_failures", reason.countTowardsTaskFailures());
552559
}
553560

561+
setDataJobsSamplingPriority(taskSpan);
554562
taskSpan.finish(taskEnd.taskInfo().finishTime() * 1000);
555563
}
556564

@@ -753,6 +761,10 @@ private synchronized void onStreamingQueryProgressEvent(
753761
}
754762
}
755763

764+
private void setDataJobsSamplingPriority(AgentSpan span) {
765+
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
766+
}
767+
756768
private AgentTracer.SpanBuilder buildSparkSpan(String spanName, Properties properties) {
757769
AgentTracer.SpanBuilder builder =
758770
tracer.buildSpan(spanName).withSpanType("spark").withTag("app_id", appId);

‎dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/DatabricksParentContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public AgentTrace getTrace() {
8181

8282
@Override
8383
public int getSamplingPriority() {
84-
return PrioritySampling.SAMPLER_KEEP;
84+
return PrioritySampling.UNSET;
8585
}
8686

8787
@Override

‎dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkStructuredStreamingTest.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package datadog.trace.instrumentation.spark
22

33
import datadog.trace.agent.test.AgentTestRunner
44
import datadog.trace.api.Platform
5+
import datadog.trace.api.sampling.PrioritySampling
6+
import datadog.trace.api.sampling.SamplingMechanism
57
import org.apache.spark.sql.Encoders
68
import org.apache.spark.sql.execution.streaming.MemoryStream
79
import org.apache.spark.sql.SparkSession
@@ -69,6 +71,8 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
6971
resourceName "test-query"
7072
spanType "spark"
7173
parent()
74+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
75+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
7276
tags {
7377
defaultTags()
7478
// Streaming tags
@@ -174,6 +178,8 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
174178
operationName "spark.streaming_batch"
175179
spanType "spark"
176180
assert span.tags["streaming_query.batch_id"] == 1
181+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
182+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
177183
parent()
178184
}
179185
span {

‎dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import datadog.trace.agent.test.AgentTestRunner
44
import datadog.trace.api.DDSpanId
55
import datadog.trace.api.DDTraceId
66
import datadog.trace.api.Platform
7+
import datadog.trace.api.sampling.PrioritySampling
8+
import datadog.trace.api.sampling.SamplingMechanism
79
import datadog.trace.test.util.Flaky
810
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
911
import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -51,6 +53,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
5153
resourceName "spark.application"
5254
spanType "spark"
5355
errored false
56+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
57+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
5458
parent()
5559
}
5660
span {
@@ -254,6 +258,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
254258
spanType "spark"
255259
traceId 8944764253919609482G
256260
parentSpanId 15104224823446433673G
261+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
262+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
257263
assert span.tags["databricks_job_id"] == "1234"
258264
assert span.tags["databricks_job_run_id"] == "5678"
259265
assert span.tags["databricks_task_run_id"] == "9012"
@@ -275,6 +281,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
275281
spanType "spark"
276282
traceId 5240384461065211484G
277283
parentSpanId 14128229261586201946G
284+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
285+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
278286
assert span.tags["databricks_job_id"] == "3456"
279287
assert span.tags["databricks_job_run_id"] == "901"
280288
assert span.tags["databricks_task_run_id"] == "7890"
@@ -296,6 +304,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
296304
spanType "spark"
297305
traceId 2235374731114184741G
298306
parentSpanId 8956125882166502063G
307+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
308+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
299309
assert span.tags["databricks_job_id"] == "123"
300310
assert span.tags["databricks_job_run_id"] == "8765"
301311
assert span.tags["databricks_task_run_id"] == "456"
@@ -316,6 +326,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
316326
operationName "spark.job"
317327
spanType "spark"
318328
parent()
329+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
330+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
319331
assert span.tags["databricks_job_id"] == null
320332
assert span.tags["databricks_job_run_id"] == "8765"
321333
assert span.tags["databricks_task_run_id"] == null
@@ -429,6 +441,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
429441
spanType "spark"
430442
traceId 8944764253919609482G
431443
parentSpanId 15104224823446433673G
444+
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
445+
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
432446
}
433447
span {
434448
operationName "spark.job"

‎internal-api/src/main/java/datadog/trace/api/sampling/SamplingMechanism.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public class SamplingMechanism {
2121
public static final byte REMOTE_USER_RATE = 6;
2222
/** Span Sampling Rate (single span sampled on account of a span sampling rule) */
2323
public static final byte SPAN_SAMPLING_RATE = 8;
24+
/** Data Jobs */
25+
public static final byte DATA_JOBS = 10;
2426
/** Force override sampling decision from external source, like W3C traceparent. */
2527
public static final byte EXTERNAL_OVERRIDE = Byte.MIN_VALUE;
2628

@@ -40,6 +42,7 @@ public static boolean validateWithSamplingPriority(int mechanism, int priority)
4042
return priority == USER_DROP || priority == USER_KEEP;
4143

4244
case APPSEC:
45+
case DATA_JOBS:
4346
return priority == PrioritySampling.USER_KEEP;
4447

4548
case EXTERNAL_OVERRIDE:

‎internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentSpan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ public interface AgentSpan extends MutableSpan, IGSpanInfo {
135135

136136
Integer forceSamplingDecision();
137137

138+
AgentSpan setSamplingPriority(final int newPriority, int samplingMechanism);
139+
138140
TraceConfig traceConfig();
139141

140142
void addLink(AgentSpanLink link);

‎internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,11 @@ public Integer forceSamplingDecision() {
657657
return null;
658658
}
659659

660+
@Override
661+
public AgentSpan setSamplingPriority(int newPriority, int samplingMechanism) {
662+
return this;
663+
}
664+
660665
@Override
661666
public Integer getSamplingPriority() {
662667
return (int) PrioritySampling.UNSET;

‎internal-api/src/test/groovy/datadog/trace/api/sampling/SamplingMechanismTest.groovy

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ class SamplingMechanismTest extends Specification {
7979
APPSEC | userDropX | false
8080
APPSEC | userKeepX | false
8181

82+
DATA_JOBS | UNSET | false
83+
DATA_JOBS | SAMPLER_DROP | false
84+
DATA_JOBS | SAMPLER_KEEP | false
85+
DATA_JOBS | USER_DROP | false
86+
DATA_JOBS | USER_KEEP | true
87+
DATA_JOBS | userDropX | false
88+
DATA_JOBS | userKeepX | false
89+
8290
EXTERNAL_OVERRIDE | UNSET | false
8391
EXTERNAL_OVERRIDE | SAMPLER_DROP | false
8492
EXTERNAL_OVERRIDE | SAMPLER_KEEP | false

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /