Skip to content

Commit f385add

Browse files
authored
1 parent 02e98a8 commit f385add

17 files changed

+1225
-723
lines changed

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,9 @@ private abstract class XdsWatcherBase<T extends ResourceUpdate>
632632

633633
@Nullable
634634
private StatusOr<T> data;
635+
@Nullable
636+
@SuppressWarnings("unused")
637+
private Status ambientError;
635638

636639

637640
private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
@@ -640,42 +643,39 @@ private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
640643
}
641644

642645
@Override
643-
public void onError(Status error) {
644-
checkNotNull(error, "error");
646+
public void onResourceChanged(StatusOr<T> update) {
645647
if (cancelled) {
646648
return;
647649
}
648-
// Don't update configuration on error, if we've already received configuration
649-
if (!hasDataValue()) {
650-
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
651-
String.format("Error retrieving %s: %s: %s",
652-
toContextString(), error.getCode(), error.getDescription())));
653-
maybePublishConfig();
654-
}
655-
}
650+
ambientError = null;
651+
if (update.hasValue()) {
652+
data = update;
653+
subscribeToChildren(update.getValue());
654+
} else {
655+
Status status = update.getStatus();
656+
Status translatedStatus = Status.UNAVAILABLE.withDescription(
657+
String.format("Error retrieving %s: %s. Details: %s%s",
658+
toContextString(),
659+
status.getCode(),
660+
status.getDescription() != null ? status.getDescription() : "",
661+
nodeInfo()));
656662

657-
@Override
658-
public void onResourceDoesNotExist(String resourceName) {
659-
if (cancelled) {
660-
return;
663+
data = StatusOr.fromStatus(translatedStatus);
661664
}
662-
663-
checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
664-
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
665-
toContextString() + " does not exist" + nodeInfo()));
666665
maybePublishConfig();
667666
}
668667

669668
@Override
670-
public void onChanged(T update) {
671-
checkNotNull(update, "update");
669+
public void onAmbientError(Status error) {
672670
if (cancelled) {
673671
return;
674672
}
675-
676-
this.data = StatusOr.fromValue(update);
677-
subscribeToChildren(update);
678-
maybePublishConfig();
673+
ambientError = error.withDescription(
674+
String.format("Ambient error for %s: %s. Details: %s%s",
675+
toContextString(),
676+
error.getCode(),
677+
error.getDescription() != null ? error.getDescription() : "",
678+
nodeInfo()));
679679
}
680680

681681
protected abstract void subscribeToChildren(T update);

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 62 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.grpc.ServerServiceDefinition;
4343
import io.grpc.Status;
4444
import io.grpc.StatusException;
45+
import io.grpc.StatusOr;
4546
import io.grpc.SynchronizationContext;
4647
import io.grpc.SynchronizationContext.ScheduledHandle;
4748
import io.grpc.internal.GrpcUtil;
@@ -401,18 +402,30 @@ private DiscoveryState(String resourceName) {
401402
}
402403

403404
@Override
404-
public void onChanged(final LdsUpdate update) {
405+
public void onResourceChanged(final StatusOr<LdsUpdate> update) {
405406
if (stopped) {
406407
return;
407408
}
408-
logger.log(Level.FINEST, "Received Lds update {0}", update);
409-
if (update.listener() == null) {
410-
onResourceDoesNotExist("Non-API");
409+
410+
if (!update.hasValue()) {
411+
Status status = update.getStatus();
412+
StatusException statusException = Status.UNAVAILABLE.withDescription(
413+
String.format("Listener %s unavailable: %s", resourceName, status.getDescription()))
414+
.withCause(status.asException())
415+
.asException();
416+
handleConfigNotFoundOrMismatch(statusException);
411417
return;
412418
}
413419

414-
String ldsAddress = update.listener().address();
415-
if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
420+
final LdsUpdate ldsUpdate = update.getValue();
421+
logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate);
422+
if (ldsUpdate.listener() == null) {
423+
handleConfigNotFoundOrMismatch(
424+
Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException());
425+
return;
426+
}
427+
String ldsAddress = ldsUpdate.listener().address();
428+
if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP
416429
|| !ipAddressesMatch(ldsAddress)) {
417430
handleConfigNotFoundOrMismatch(
418431
Status.UNKNOWN.withDescription(
@@ -421,16 +434,15 @@ public void onChanged(final LdsUpdate update) {
421434
listenerAddress, ldsAddress)).asException());
422435
return;
423436
}
437+
424438
if (!pendingRds.isEmpty()) {
425439
// filter chain state has not yet been applied to filterChainSelectorManager and there
426-
// are two sets of sslContextProviderSuppliers, so we release the old ones.
427440
releaseSuppliersInFlight();
428441
pendingRds.clear();
429442
}
430443

431-
filterChains = update.listener().filterChains();
432-
defaultFilterChain = update.listener().defaultFilterChain();
433-
// Filters are loaded even if the server isn't serving yet.
444+
filterChains = ldsUpdate.listener().filterChains();
445+
defaultFilterChain = ldsUpdate.listener().defaultFilterChain();
434446
updateActiveFilters();
435447

436448
List<FilterChain> allFilterChains = filterChains;
@@ -469,43 +481,33 @@ public void onChanged(final LdsUpdate update) {
469481
}
470482
}
471483

472-
private boolean ipAddressesMatch(String ldsAddress) {
473-
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
474-
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
475-
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
476-
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
477-
return false;
478-
}
479-
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
480-
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
481-
return listenerIp.equals(ldsIp);
482-
}
483-
484-
@Override
485-
public void onResourceDoesNotExist(final String resourceName) {
486-
if (stopped) {
487-
return;
488-
}
489-
StatusException statusException = Status.UNAVAILABLE.withDescription(
490-
String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
491-
xdsClient.getBootstrapInfo().node().getId())).asException();
492-
handleConfigNotFoundOrMismatch(statusException);
493-
}
494-
495484
@Override
496-
public void onError(final Status error) {
485+
public void onAmbientError(final Status error) {
497486
if (stopped) {
498487
return;
499488
}
500489
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
501490
Status errorWithNodeId = error.withDescription(
502491
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
503492
logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
493+
504494
if (!isServing) {
505495
listener.onNotServing(errorWithNodeId.asException());
506496
}
507497
}
508498

499+
private boolean ipAddressesMatch(String ldsAddress) {
500+
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
501+
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
502+
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
503+
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
504+
return false;
505+
}
506+
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
507+
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
508+
return listenerIp.equals(ldsIp);
509+
}
510+
509511
private void shutdown() {
510512
stopped = true;
511513
cleanUpRouteDiscoveryStates();
@@ -794,54 +796,42 @@ private RouteDiscoveryState(String resourceName) {
794796
}
795797

796798
@Override
797-
public void onChanged(final RdsUpdate update) {
798-
syncContext.execute(new Runnable() {
799-
@Override
800-
public void run() {
801-
if (!routeDiscoveryStates.containsKey(resourceName)) {
802-
return;
803-
}
804-
if (savedVirtualHosts == null && !isPending) {
805-
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
806-
}
807-
savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
808-
updateRdsRoutingConfig();
809-
maybeUpdateSelector();
799+
public void onResourceChanged(final StatusOr<RdsUpdate> update) {
800+
syncContext.execute(() -> {
801+
if (!routeDiscoveryStates.containsKey(resourceName)) {
802+
return; // Watcher has been cancelled.
810803
}
811-
});
812-
}
813804

814-
@Override
815-
public void onResourceDoesNotExist(final String resourceName) {
816-
syncContext.execute(new Runnable() {
817-
@Override
818-
public void run() {
819-
if (!routeDiscoveryStates.containsKey(resourceName)) {
820-
return;
805+
if (update.hasValue()) {
806+
if (savedVirtualHosts == null && !isPending) {
807+
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
821808
}
822-
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
809+
savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts);
810+
} else {
811+
logger.log(Level.WARNING, "Rds {0} unavailable: {1}",
812+
new Object[]{resourceName, update.getStatus()});
823813
savedVirtualHosts = null;
824-
updateRdsRoutingConfig();
825-
maybeUpdateSelector();
826814
}
815+
// In both cases, a change has occurred that requires a config update.
816+
updateRdsRoutingConfig();
817+
maybeUpdateSelector();
827818
});
828819
}
829820

830821
@Override
831-
public void onError(final Status error) {
832-
syncContext.execute(new Runnable() {
833-
@Override
834-
public void run() {
835-
if (!routeDiscoveryStates.containsKey(resourceName)) {
836-
return;
837-
}
838-
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
839-
Status errorWithNodeId = error.withDescription(
840-
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
841-
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
842-
new Object[]{resourceName, errorWithNodeId});
843-
maybeUpdateSelector();
822+
public void onAmbientError(final Status error) {
823+
syncContext.execute(() -> {
824+
if (!routeDiscoveryStates.containsKey(resourceName)) {
825+
return; // Watcher has been cancelled.
844826
}
827+
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
828+
Status errorWithNodeId = error.withDescription(
829+
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
830+
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
831+
new Object[]{resourceName, errorWithNodeId});
832+
833+
// Per gRFC A88, ambient errors should not trigger a configuration change.
834+
// Therefore, we do NOT call maybeUpdateSelector() here.
845835
});
846836
}
847837

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,9 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
262262
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
263263
if (serverFeatures != null) {
264264
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
265-
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
265+
if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
266+
ignoreResourceDeletion = true;
267+
}
266268
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
267269
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
268270
}

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,10 @@ private void handleRpcStreamClosed(Status status) {
457457
if (responseReceived) {
458458
// A closed ADS stream after a successful response is not considered an error. Servers may
459459
// close streams for various reasons during normal operation, such as load balancing or
460-
// underlying connection hitting its max connection age limit (see gRFC A9).
460+
// underlying connection hitting its max connection age limit (see gRFC A9).
461461
if (!status.isOk()) {
462462
newStatus = Status.OK;
463-
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
463+
logger.log(XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
464464
+ "response was received, so this will not be treated as an error. Cause: {2}",
465465
status.getCode(), status.getDescription(), status.getCause());
466466
} else {

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.protobuf.Any;
2828
import io.grpc.ExperimentalApi;
2929
import io.grpc.Status;
30+
import io.grpc.StatusOr;
3031
import io.grpc.xds.client.Bootstrapper.ServerInfo;
3132
import java.net.URI;
3233
import java.net.URISyntaxException;
@@ -139,30 +140,29 @@ public interface ResourceUpdate {}
139140

140141
/**
141142
* Watcher interface for a single requested xDS resource.
143+
*
144+
* <p>Note that we expect that the implementer to:
145+
* - Comply with the guarantee to not generate certain statuses by the library:
146+
* https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
147+
* propagated to the channel, override it with {@link io.grpc.Status.Code#UNAVAILABLE}.
148+
* - Keep {@link Status} description in one form or another, as it contains valuable debugging
149+
* information.
142150
*/
143151
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
144152
public interface ResourceWatcher<T extends ResourceUpdate> {
145153

146154
/**
147-
* Called when the resource discovery RPC encounters some transient error.
148-
*
149-
* <p>Note that we expect that the implementer to:
150-
* - Comply with the guarantee to not generate certain statuses by the library:
151-
* https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
152-
* propagated to the channel, override it with {@link io.grpc.Status.Code#UNAVAILABLE}.
153-
* - Keep {@link Status} description in one form or another, as it contains valuable debugging
154-
* information.
155+
* Called to deliver a resource update or an error. If an error is passed after a valid
156+
* resource has been delivered, the watcher should stop using the previously delivered
157+
* resource.
155158
*/
156-
void onError(Status error);
159+
void onResourceChanged(StatusOr<T> update);
157160

158161
/**
159-
* Called when the requested resource is not available.
160-
*
161-
* @param resourceName name of the resource requested in discovery request.
162-
*/
163-
void onResourceDoesNotExist(String resourceName);
164-
165-
void onChanged(T update);
162+
* Called to deliver a transient error that should not affect the watcher's use of any
163+
* previously received resource.
164+
* */
165+
void onAmbientError(Status error);
166166
}
167167

168168
/**

0 commit comments

Comments
 (0)