Skip to content

Commit 4813c6d

Browse files
authored
core,xds: Fix backend_service plumbing for subchannel metrics (#12735)
This PR fixes #12432. Subchannel metrics read backend_service from EAG attributes, but xDS currently only populates the resolution result attribute. As a result, grpc.lb.backend_service is left unset for subchannel metrics in the cds path. This change adds an internal EAG-level backend_service attribute in cds and has InternalSubchannel read that attribute for subchannel metrics, while keeping a fallback to the existing resolution result attribute. This PR is intentionally scoped to subchannel metrics only and does not attempt the broader #12431 plumbing changes.
1 parent 6737eb5 commit 4813c6d

File tree

6 files changed

+80
-8
lines changed

6 files changed

+80
-8
lines changed

api/src/main/java/io/grpc/EquivalentAddressGroup.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ public final class EquivalentAddressGroup {
5555
*/
5656
public static final Attributes.Key<String> ATTR_LOCALITY_NAME =
5757
Attributes.Key.create("io.grpc.EquivalentAddressGroup.LOCALITY");
58+
/**
59+
* The backend service associated with this EquivalentAddressGroup.
60+
*/
61+
@Attr
62+
static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
63+
Attributes.Key.create("io.grpc.EquivalentAddressGroup.BACKEND_SERVICE");
5864
/**
5965
* Endpoint weight for load balancing purposes. While the type is Long, it must be a valid uint32.
6066
* Must not be zero. The weight is proportional to the other endpoints; if an endpoint's weight is

api/src/main/java/io/grpc/InternalEquivalentAddressGroup.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,10 @@ private InternalEquivalentAddressGroup() {}
2626
* twice that of another endpoint, it is intended to receive twice the load.
2727
*/
2828
public static final Attributes.Key<Long> ATTR_WEIGHT = EquivalentAddressGroup.ATTR_WEIGHT;
29+
30+
/**
31+
* The backend service associated with this EquivalentAddressGroup.
32+
*/
33+
public static final Attributes.Key<String> ATTR_BACKEND_SERVICE =
34+
EquivalentAddressGroup.ATTR_BACKEND_SERVICE;
2935
}

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.grpc.HttpConnectProxiedSocketAddress;
4343
import io.grpc.InternalChannelz;
4444
import io.grpc.InternalChannelz.ChannelStats;
45+
import io.grpc.InternalEquivalentAddressGroup;
4546
import io.grpc.InternalInstrumented;
4647
import io.grpc.InternalLogId;
4748
import io.grpc.InternalWithLogId;
@@ -606,8 +607,8 @@ public void run() {
606607
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
607608
gotoNonErrorState(READY);
608609
subchannelMetrics.recordConnectionAttemptSucceeded(/* target= */ target,
609-
/* backendService= */ getAttributeOrDefault(
610-
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
610+
/* backendService= */ getBackendServiceOrDefault(
611+
addressIndex.getCurrentEagAttributes()),
611612
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
612613
EquivalentAddressGroup.ATTR_LOCALITY_NAME),
613614
/* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
@@ -638,17 +639,17 @@ public void run() {
638639
addressIndex.reset();
639640
gotoNonErrorState(IDLE);
640641
subchannelMetrics.recordDisconnection(/* target= */ target,
641-
/* backendService= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
642-
NameResolver.ATTR_BACKEND_SERVICE),
642+
/* backendService= */ getBackendServiceOrDefault(
643+
addressIndex.getCurrentEagAttributes()),
643644
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
644645
EquivalentAddressGroup.ATTR_LOCALITY_NAME),
645646
/* disconnectError= */ disconnectError.toErrorString(),
646647
/* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
647648
.get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
648649
} else if (pendingTransport == transport) {
649650
subchannelMetrics.recordConnectionAttemptFailed(/* target= */ target,
650-
/* backendService= */getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
651-
NameResolver.ATTR_BACKEND_SERVICE),
651+
/* backendService= */ getBackendServiceOrDefault(
652+
addressIndex.getCurrentEagAttributes()),
652653
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
653654
EquivalentAddressGroup.ATTR_LOCALITY_NAME));
654655
Preconditions.checkState(state.getState() == CONNECTING,
@@ -711,6 +712,14 @@ private String getAttributeOrDefault(Attributes attributes, Attributes.Key<Strin
711712
String value = attributes.get(key);
712713
return value == null ? "" : value;
713714
}
715+
716+
private String getBackendServiceOrDefault(Attributes attributes) {
717+
String value = attributes.get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE);
718+
if (value == null) {
719+
value = attributes.get(NameResolver.ATTR_BACKEND_SERVICE);
720+
}
721+
return value == null ? "" : value;
722+
}
714723
}
715724

716725
// All methods are called in syncContext

core/src/test/java/io/grpc/internal/InternalSubchannelTest.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import io.grpc.ConnectivityStateInfo;
4949
import io.grpc.EquivalentAddressGroup;
5050
import io.grpc.InternalChannelz;
51+
import io.grpc.InternalEquivalentAddressGroup;
5152
import io.grpc.InternalLogId;
5253
import io.grpc.InternalWithLogId;
5354
import io.grpc.LoadBalancer;
@@ -1510,7 +1511,7 @@ public void subchannelStateChanges_triggersAttemptFailedMetric() {
15101511
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
15111512
SocketAddress addr = mock(SocketAddress.class);
15121513
Attributes eagAttributes = Attributes.newBuilder()
1513-
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
1514+
.set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
15141515
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY)
15151516
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
15161517
.build();
@@ -1564,7 +1565,7 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() {
15641565
// 2. Setup Subchannel with attributes
15651566
SocketAddress addr = mock(SocketAddress.class);
15661567
Attributes eagAttributes = Attributes.newBuilder()
1567-
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
1568+
.set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
15681569
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY)
15691570
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
15701571
.build();
@@ -1631,6 +1632,45 @@ public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() {
16311632
inOrder.verifyNoMoreInteractions();
16321633
}
16331634

1635+
@Test
1636+
public void subchannelStateChanges_backendServiceFallsBackToResolutionResultAttr() {
1637+
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
1638+
SocketAddress addr = mock(SocketAddress.class);
1639+
Attributes eagAttributes = Attributes.newBuilder()
1640+
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
1641+
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, LOCALITY)
1642+
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
1643+
.build();
1644+
List<EquivalentAddressGroup> addressGroups =
1645+
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes));
1646+
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
1647+
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
1648+
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
1649+
LoadBalancer.CreateSubchannelArgs createSubchannelArgs =
1650+
LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build();
1651+
internalSubchannel = new InternalSubchannel(
1652+
createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider,
1653+
mockTransportFactory, fakeClock.getScheduledExecutorService(),
1654+
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz,
1655+
CallTracer.getDefaultFactory().create(), subchannelTracer, logId,
1656+
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
1657+
Collections.emptyList(), AUTHORITY, mockMetricRecorder
1658+
);
1659+
1660+
internalSubchannel.obtainActiveTransport();
1661+
MockClientTransportInfo transportInfo = transports.poll();
1662+
assertNotNull(transportInfo);
1663+
transportInfo.listener.transportReady();
1664+
fakeClock.runDueTasks();
1665+
1666+
verify(mockMetricRecorder).addLongCounter(
1667+
eqMetricInstrumentName("grpc.subchannel.connection_attempts_succeeded"),
1668+
eq(1L),
1669+
eq(Arrays.asList(AUTHORITY)),
1670+
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
1671+
);
1672+
}
1673+
16341674
private void assertNoCallbackInvoke() {
16351675
while (fakeExecutor.runDueTasks() > 0) {}
16361676
assertEquals(0, callbackInvokes.size());

xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.Attributes;
2828
import io.grpc.EquivalentAddressGroup;
2929
import io.grpc.HttpConnectProxiedSocketAddress;
30+
import io.grpc.InternalEquivalentAddressGroup;
3031
import io.grpc.InternalLogId;
3132
import io.grpc.LoadBalancer;
3233
import io.grpc.LoadBalancerProvider;
@@ -369,6 +370,7 @@ StatusOr<ClusterResolutionResult> edsUpdateToResult(
369370
String localityName = localityName(locality);
370371
Attributes attr =
371372
endpoint.eag().getAttributes().toBuilder()
373+
.set(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE, clusterName)
372374
.set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY, locality)
373375
.set(EquivalentAddressGroup.ATTR_LOCALITY_NAME, localityName)
374376
.set(io.grpc.xds.XdsAttributes.ATTR_LOCALITY_WEIGHT,

xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import io.grpc.ConnectivityState;
6161
import io.grpc.EquivalentAddressGroup;
6262
import io.grpc.HttpConnectProxiedSocketAddress;
63+
import io.grpc.InternalEquivalentAddressGroup;
6364
import io.grpc.LoadBalancer;
6465
import io.grpc.LoadBalancer.Helper;
6566
import io.grpc.LoadBalancer.PickResult;
@@ -317,14 +318,20 @@ public void edsClustersWithRingHashEndpointLbPolicy() throws Exception {
317318
// LOCALITY1 are equally weighted.
318319
assertThat(addr1.getAddresses())
319320
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.1", 8080)));
321+
assertThat(addr1.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE))
322+
.isEqualTo(CLUSTER);
320323
assertThat(addr1.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
321324
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10);
322325
assertThat(addr2.getAddresses())
323326
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.0.2", 8080)));
327+
assertThat(addr2.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE))
328+
.isEqualTo(CLUSTER);
324329
assertThat(addr2.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
325330
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x0AAAAAAA /* 1/12 */ : 10);
326331
assertThat(addr3.getAddresses())
327332
.isEqualTo(Arrays.asList(newInetSocketAddress("127.0.1.1", 8080)));
333+
assertThat(addr3.getAttributes().get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE))
334+
.isEqualTo(CLUSTER);
328335
assertThat(addr3.getAttributes().get(io.grpc.xds.XdsAttributes.ATTR_SERVER_WEIGHT))
329336
.isEqualTo(CdsLoadBalancer2.pickFirstWeightedShuffling ? 0x6AAAAAAA /* 5/6 */ : 50 * 60);
330337
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
@@ -920,6 +927,8 @@ public void onlyLogicalDnsCluster_endpointsResolved() {
920927
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(
921928
newInetSocketAddress("127.0.2.1", 9000), newInetSocketAddress("127.0.2.2", 9000)))),
922929
childBalancer.addresses);
930+
assertThat(childBalancer.addresses.get(0).getAttributes()
931+
.get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE)).isEqualTo(CLUSTER);
923932
assertThat(childBalancer.addresses.get(0).getAttributes()
924933
.get(XdsInternalAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME + ":9000");
925934
}

0 commit comments

Comments
 (0)