diff --git a/.idea/checkstyle-idea.xml b/.idea/checkstyle-idea.xml index 3cc177b4de90..3d0cf2aa55c0 100644 --- a/.idea/checkstyle-idea.xml +++ b/.idea/checkstyle-idea.xml @@ -24,4 +24,4 @@ - + \ No newline at end of file diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java index 099833a59f19..3454b9ded251 100644 --- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java +++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java @@ -18,6 +18,7 @@ package org.apache.ignite.client; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_BACKGROUND_RECONNECT_INTERVAL; +import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL; import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_TIMEOUT; @@ -115,6 +116,8 @@ class Builder { private @Nullable String name; + long backgroundReResolveAddressesInterval = DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL; + /** * Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port. * If port is not set then Ignite will use the default one - see {@link IgniteClientConfiguration#DFLT_PORT}. @@ -406,6 +409,28 @@ public Builder name(@Nullable String name) { return this; } + /** + * Sets how long the resolved addresses will be considered valid, in milliseconds. Set to {@code 0} for infinite validity. + * + *

Ignite client resolve the provided hostnames into multiple IP addresses, each corresponds to an active cluster node. + * However, additional IP addresses can be collected after updating the DNS records. This property controls how often Ignite + * client will try to re-resolve provided hostnames and connect to newly discovered addresses. + * + * @param backgroundReResolveAddressesInterval Background re-resolve interval, in milliseconds. + * @return This instance. + * @throws IllegalArgumentException When value is less than zero. + */ + public Builder backgroundReResolveAddressesInterval(long backgroundReResolveAddressesInterval) { + if (backgroundReResolveAddressesInterval < 0) { + throw new IllegalArgumentException("backgroundReResolveAddressesInterval [" + + backgroundReResolveAddressesInterval + "] must be a non-negative integer value."); + } + + this.backgroundReResolveAddressesInterval = backgroundReResolveAddressesInterval; + + return this; + } + /** * Builds the client. * @@ -436,7 +461,8 @@ public CompletableFuture buildAsync() { authenticator, operationTimeout, sqlPartitionAwarenessMetadataCacheSize, - name + name, + backgroundReResolveAddressesInterval ); return TcpIgniteClient.startAsync(cfg); diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java index ef708867caa5..e11f0ddd0f4d 100644 --- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java +++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java @@ -43,6 +43,9 @@ public interface IgniteClientConfiguration { /** Default background reconnect interval, in milliseconds. */ long DFLT_BACKGROUND_RECONNECT_INTERVAL = 30_000L; + /** Default interval sets how long the resolved addresses will be considered valid, in milliseconds. */ + long DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL = 30_000L; + /** Default operation timeout, in milliseconds. */ int DFLT_OPERATION_TIMEOUT = 0; @@ -222,4 +225,16 @@ public interface IgniteClientConfiguration { * @return Client name. */ @Nullable String name(); + + /** + * Gets how long the resolved addresses will be considered valid, in milliseconds. Set to {@code 0} for infinite validity. + * Default is {@link #DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL}. + * + *

Ignite client resolve the provided hostnames into multiple IP addresses, each corresponds to an active cluster node. + * However, additional IP addresses can be collected after updating the DNS records. This property controls how often Ignite + * client will try to re-resolve provided hostnames and connect to newly discovered addresses. + * + * @return Background re-resolve interval, in milliseconds. + */ + long backgroundReResolveAddressesInterval(); } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java index 2be085ee1ea8..55df2ee726ab 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java @@ -25,6 +25,7 @@ import org.apache.ignite.client.SslConfiguration; import org.apache.ignite.lang.LoggerFactory; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.VisibleForTesting; /** * Immutable client configuration. @@ -68,6 +69,10 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur private final @Nullable String name; + private final InetAddressResolver addressResolver; + + private final long backgroundReResolveAddressesInterval; + /** * Constructor. * @@ -86,7 +91,10 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur * @param operationTimeout Operation timeout. * @param sqlPartitionAwarenessMetadataCacheSize Size of the cache to store partition awareness metadata. * @param name Client name. + * @param backgroundReResolveAddressesInterval Background re-resolve addresses interval. + * @param addressResolver Address resolver. */ + @VisibleForTesting public IgniteClientConfigurationImpl( @Nullable IgniteClientAddressFinder addressFinder, String[] addresses, @@ -102,7 +110,9 @@ public IgniteClientConfigurationImpl( @Nullable IgniteClientAuthenticator authenticator, long operationTimeout, int sqlPartitionAwarenessMetadataCacheSize, - @Nullable String name + @Nullable String name, + long backgroundReResolveAddressesInterval, + @Nullable InetAddressResolver addressResolver ) { this.addressFinder = addressFinder; @@ -122,6 +132,66 @@ public IgniteClientConfigurationImpl( this.operationTimeout = operationTimeout; this.sqlPartitionAwarenessMetadataCacheSize = sqlPartitionAwarenessMetadataCacheSize; this.name = name; + this.backgroundReResolveAddressesInterval = backgroundReResolveAddressesInterval; + this.addressResolver = addressResolver; + } + + /** + * Constructor. + * + * @param addressFinder Address finder. + * @param addresses Addresses. + * @param connectTimeout Socket connect timeout. + * @param backgroundReconnectInterval Background reconnect interval. + * @param asyncContinuationExecutor Async continuation executor. + * @param heartbeatInterval Heartbeat message interval. + * @param heartbeatTimeout Heartbeat message timeout. + * @param retryPolicy Retry policy. + * @param loggerFactory Logger factory which will be used to create a logger instance for this this particular client when + * needed. + * @param metricsEnabled Whether metrics are enabled. + * @param authenticator Authenticator. + * @param operationTimeout Operation timeout. + * @param sqlPartitionAwarenessMetadataCacheSize Size of the cache to store partition awareness metadata. + * @param name Client name. + */ + public IgniteClientConfigurationImpl( + @Nullable IgniteClientAddressFinder addressFinder, + String[] addresses, + long connectTimeout, + long backgroundReconnectInterval, + @Nullable Executor asyncContinuationExecutor, + long heartbeatInterval, + long heartbeatTimeout, + @Nullable RetryPolicy retryPolicy, + @Nullable LoggerFactory loggerFactory, + @Nullable SslConfiguration sslConfiguration, + boolean metricsEnabled, + @Nullable IgniteClientAuthenticator authenticator, + long operationTimeout, + int sqlPartitionAwarenessMetadataCacheSize, + @Nullable String name, + long backgroundReResolveAddressesInterval + ) { + this( + addressFinder, + addresses, + connectTimeout, + backgroundReconnectInterval, + asyncContinuationExecutor, + heartbeatInterval, + heartbeatTimeout, + retryPolicy, + loggerFactory, + sslConfiguration, + metricsEnabled, + authenticator, + operationTimeout, + sqlPartitionAwarenessMetadataCacheSize, + name, + backgroundReResolveAddressesInterval, + null + ); } /** {@inheritDoc} */ @@ -210,4 +280,18 @@ public int sqlPartitionAwarenessMetadataCacheSize() { public @Nullable String name() { return name; } + + @Override + public long backgroundReResolveAddressesInterval() { + return backgroundReResolveAddressesInterval; + } + + /** + * Gets custom address resolver. + * + * @return Custom address resolver. + */ + @Nullable InetAddressResolver addressResolver() { + return addressResolver; + } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/InetAddressResolver.java b/modules/client/src/main/java/org/apache/ignite/internal/client/InetAddressResolver.java new file mode 100644 index 000000000000..5cfcbb6a20d3 --- /dev/null +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/InetAddressResolver.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * DNS resolver. + */ +@FunctionalInterface +public interface InetAddressResolver { + InetAddressResolver DEFAULT = InetAddress::getAllByName; + + /** + * Resolves the given host name to its IP addresses. + * + * @param host the host name to be resolved + * @return an array of {@code InetAddress} objects representing the IP addresses of the host + * @throws UnknownHostException if the host name could not be resolved + */ + InetAddress[] getAllByName(String host) throws UnknownHostException; +} diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java index c4e8f8ebba65..2f83f0cabd0a 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java @@ -17,9 +17,11 @@ package org.apache.ignite.internal.client; +import static java.util.Objects.requireNonNullElse; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.delayedExecutor; import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.concurrent.CompletableFuture.supplyAsync; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.ExceptionUtils.hasCauseOrSuppressed; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; @@ -29,7 +31,9 @@ import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR; import java.net.ConnectException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -93,7 +97,7 @@ public final class ReliableChannel implements AutoCloseable { private final AtomicInteger curChIdx = new AtomicInteger(); /** Client configuration. */ - private final IgniteClientConfiguration clientCfg; + private final IgniteClientConfigurationImpl clientCfg; /** Node channels by name (consistent id). */ private final Map nodeChannelsByName = new ConcurrentHashMap<>(); @@ -134,6 +138,9 @@ public final class ReliableChannel implements AutoCloseable { /** Inflights. */ private final ClientTransactionInflights inflights; + /** Address resolver. */ + private final InetAddressResolver addressResolver; + /** * A validator that is called when a connection to a node is established, * if it throws an exception, the network channel to that node will be closed. @@ -152,7 +159,7 @@ public final class ReliableChannel implements AutoCloseable { */ ReliableChannel( ClientChannelFactory chFactory, - IgniteClientConfiguration clientCfg, + IgniteClientConfigurationImpl clientCfg, ClientMetricSource metrics, HybridTimestampTracker observableTimeTracker, @Nullable ChannelValidator channelValidator @@ -168,6 +175,8 @@ public final class ReliableChannel implements AutoCloseable { connMgr.start(clientCfg); inflights = new ClientTransactionInflights(); + + addressResolver = requireNonNullElse(clientCfg.addressResolver(), InetAddressResolver.DEFAULT); } /** {@inheritDoc} */ @@ -414,20 +423,46 @@ public CompletableFuture getChannelAsync(@Nullable String preferr * @return host:port_range address lines parsed as {@link InetSocketAddress} as a key. Value is the amount of appearences of an address * in {@code addrs} parameter. */ - private static Map parsedAddresses(String[] addrs) { + private static Map parsedAddresses(InetAddressResolver addressResolver, String[] addrs) { if (addrs == null || addrs.length == 0) { throw new IgniteException(CONFIGURATION_ERR, "Empty addresses"); } - Collection ranges = new ArrayList<>(addrs.length); + Collection parsedAddrs = new ArrayList<>(addrs.length); for (String a : addrs) { - ranges.add(HostAndPort.parse(a, IgniteClientConfiguration.DFLT_PORT, "Failed to parse Ignite server address")); + parsedAddrs.add(HostAndPort.parse(a, IgniteClientConfiguration.DFLT_PORT, "Failed to parse Ignite server address")); } - return ranges.stream() - .map(p -> InetSocketAddress.createUnresolved(p.host(), p.port())) - .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum)); + var map = new HashMap(parsedAddrs.size()); + + for (HostAndPort addr : parsedAddrs) { + try { + // Special handling for "localhost" to avoid unnecessary DNS resolution. + if ("localhost".equalsIgnoreCase(addr.host())) { + map.merge(InetSocketAddress.createUnresolved(addr.host(), addr.port()), 1, Integer::sum); + + continue; + } + + for (InetAddress inetAddr : addressResolver.getAllByName(addr.host())) { + // Preserve unresolved address if the resolved address equals to the original host string. + if (Objects.equals(addr.host(), inetAddr.getHostAddress())) { + map.merge(InetSocketAddress.createUnresolved(addr.host(), addr.port()), 1, Integer::sum); + + continue; + } + + var sockAddr = new InetSocketAddress(inetAddr, addr.port()); + map.merge(sockAddr, 1, Integer::sum); + } + } catch (UnknownHostException e) { + var sockAddr = InetSocketAddress.createUnresolved(addr.host(), addr.port()); + map.merge(sockAddr, 1, Integer::sum); + } + } + + return map; } /** @@ -472,12 +507,13 @@ private void onChannelFailure(ClientChannel ch) { private void onChannelFailure(ClientChannelHolder hld, @Nullable ClientChannel ch) { chFailLsnrs.forEach(Runnable::run); + if (scheduledChannelsReinit.compareAndSet(false, true)) { + // Refresh addresses and reinit channels. + initChannelHolders(); + } + // Roll current channel even if a topology changes. To help find working channel faster. rollCurrentChannel(hld); - - if (scheduledChannelsReinit.get()) { - channelsInitAsync(); - } } /** @@ -508,7 +544,7 @@ private boolean shouldStopChannelsReinit() { /** * Init channel holders to all nodes. * - * @return boolean wheter channels was reinited. + * @return boolean whether channels were reinitialized. */ private synchronized boolean initChannelHolders() { List holders = channels; @@ -526,11 +562,12 @@ private synchronized boolean initChannelHolders() { } if (!Arrays.equals(hostAddrs, prevHostAddrs)) { - newAddrs = parsedAddresses(hostAddrs); + newAddrs = parsedAddresses(addressResolver, hostAddrs); prevHostAddrs = hostAddrs; } - } else if (holders == null) { - newAddrs = parsedAddresses(clientCfg.addresses()); + } else { + // Re-resolve DNS. + newAddrs = parsedAddresses(addressResolver, clientCfg.addresses()); } if (newAddrs == null) { @@ -541,9 +578,7 @@ private synchronized boolean initChannelHolders() { Set allAddrs = new HashSet<>(newAddrs.keySet()); if (holders != null) { - for (int i = 0; i < holders.size(); i++) { - ClientChannelHolder h = holders.get(i); - + for (ClientChannelHolder h : holders) { curAddrs.put(h.chCfg.getAddress(), h); allAddrs.add(h.chCfg.getAddress()); } @@ -611,6 +646,14 @@ private synchronized boolean initChannelHolders() { curChannelsGuard.writeLock().unlock(); } + // Schedule the background re-resolve of addresses. + if (clientCfg.backgroundReResolveAddressesInterval() > 0L) { + supplyAsync( + this::initChannelHolders, + delayedExecutor(clientCfg.backgroundReResolveAddressesInterval(), TimeUnit.MILLISECONDS) + ); + } + return true; } @@ -773,7 +816,14 @@ private void onObservableTimestampReceived(long newTs) { } private void onPartitionAssignmentChanged(long timestamp) { - partitionAssignmentTimestamp.updateAndGet(curTs -> Math.max(curTs, timestamp)); + var old = partitionAssignmentTimestamp.getAndUpdate(curTs -> Math.max(curTs, timestamp)); + + if (timestamp > old) { + // New assignment timestamp, topology change possible. + if (scheduledChannelsReinit.compareAndSet(false, true)) { + initChannelHolders(); + } + } } /** @@ -954,6 +1004,20 @@ private CompletableFuture getOrCreateChannelAsync() { return ch; } + private void rollNodeChannelsByName() { + List holders = channels; + + for (ClientChannelHolder h : holders) { + if (h != this && h.serverNode != null && Objects.equals(serverNode.id(), h.serverNode.id())) { + nodeChannelsByName.put(h.serverNode.name(), h); + + return; + } + } + + nodeChannelsByName.remove(serverNode.name(), this); + } + /** * Close channel. */ @@ -972,7 +1036,7 @@ private synchronized void closeChannel() { var oldServerNode = serverNode; if (oldServerNode != null) { - nodeChannelsByName.remove(oldServerNode.name(), this); + rollNodeChannelsByName(); } chFut = null; @@ -988,7 +1052,7 @@ void close() { var oldServerNode = serverNode; if (oldServerNode != null) { - nodeChannelsByName.remove(oldServerNode.name(), this); + rollNodeChannelsByName(); } closeChannel(); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java index 5fc1972f752e..c840ea61028e 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java @@ -103,7 +103,7 @@ public class TcpIgniteClient implements IgniteClient { * @param channelValidator A validator that is called when a connection to a node is established, * if it throws an exception, the network channel to that node will be closed. */ - private TcpIgniteClient(IgniteClientConfiguration cfg, HybridTimestampTracker observableTimeTracker, + private TcpIgniteClient(IgniteClientConfigurationImpl cfg, HybridTimestampTracker observableTimeTracker, @Nullable ChannelValidator channelValidator) { this(TcpClientChannel::createAsync, cfg, observableTimeTracker, channelValidator); } @@ -117,7 +117,10 @@ private TcpIgniteClient(IgniteClientConfiguration cfg, HybridTimestampTracker ob * @param channelValidator A validator that is called when a connection to a node is established, * if it throws an exception, the network channel to that node will be closed. */ - private TcpIgniteClient(ClientChannelFactory chFactory, IgniteClientConfiguration cfg, HybridTimestampTracker observableTimeTracker, + private TcpIgniteClient( + ClientChannelFactory chFactory, + IgniteClientConfigurationImpl cfg, + HybridTimestampTracker observableTimeTracker, @Nullable ChannelValidator channelValidator) { assert chFactory != null; assert cfg != null; @@ -173,7 +176,7 @@ private CompletableFuture initAsync() { * @param cfg Thin client configuration. * @return Future representing pending completion of the operation. */ - public static CompletableFuture startAsync(IgniteClientConfiguration cfg) { + public static CompletableFuture startAsync(IgniteClientConfigurationImpl cfg) { return startAsync(cfg, HybridTimestampTracker.atomicTracker(null), null); } @@ -186,7 +189,9 @@ public static CompletableFuture startAsync(IgniteClientConfigurati * if it throws an exception, the network channel to that node will be closed. * @return Future representing pending completion of the operation. */ - public static CompletableFuture startAsync(IgniteClientConfiguration cfg, HybridTimestampTracker observableTimeTracker, + public static CompletableFuture startAsync( + IgniteClientConfigurationImpl cfg, + HybridTimestampTracker observableTimeTracker, @Nullable ChannelValidator channelValidator) { ErrorGroups.initialize(); diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientDnsDiscoveryTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientDnsDiscoveryTest.java new file mode 100644 index 000000000000..157ca5c70e63 --- /dev/null +++ b/modules/client/src/test/java/org/apache/ignite/client/ClientDnsDiscoveryTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.client.IgniteClientConfigurationImpl; +import org.apache.ignite.internal.client.InetAddressResolver; +import org.apache.ignite.internal.client.ReliableChannel; +import org.apache.ignite.internal.client.TcpIgniteClient; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.network.ClusterNode; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Tests client DNS resolution. + */ +class ClientDnsDiscoveryTest extends BaseIgniteAbstractTest { + protected static final String DEFAULT_TABLE = "DEFAULT_TEST_TABLE"; + + private static TestServer server1; + + private static TestServer server2; + + private static TestServer server3; + + private static String loopbackAddress; + + private static String hostAddress; + + private static int port; + + @BeforeAll + static void setUp() throws UnknownHostException { + loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress(); + hostAddress = InetAddress.getLocalHost().getHostAddress(); + + server1 = TestServer.builder() + .listenAddresses(loopbackAddress) + .nodeName("server1") + .clusterId(AbstractClientTest.clusterId) + .build(); + + port = server1.port(); + + server2 = TestServer.builder() + .listenAddresses(hostAddress) + .nodeName("server2") + .clusterId(AbstractClientTest.clusterId) + .port(port) + .build(); + + server3 = TestServer.builder() + .nodeName("server3") + .clusterId(AbstractClientTest.clusterId) + .port(port + 1) + .build(); + } + + @AfterAll + static void tearDown() throws Exception { + closeAll(server1, server2, server3); + } + + @Test + void testClientResolvesAllHostNameAddresses() { + String[] addresses = {"my-cluster:" + port}; + + // One invalid and one valid address. + AtomicReference resolvedAddressesRef = new AtomicReference<>(new String[]{loopbackAddress, "1.1.1.1"}); + + try (var client = TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L, resolvedAddressesRef)).join()) { + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server1", client.connections().get(0).name()); + } + } + + @Test + void testClientConnectToAllNodes() throws InterruptedException { + String[] addresses = {"my-cluster:" + port, "my-cluster:" + server3.port()}; + + // All nodes addresses. + AtomicReference resolvedAddressesRef = new AtomicReference<>(new String[]{loopbackAddress, hostAddress}); + Set allNodeNames = Set.of("server1", "server2", "server3"); + + try (var client = TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L, resolvedAddressesRef)).join()) { + assertDoesNotThrow(() -> client.tables().tables()); + + // Wait until client connects to all nodes. + assertTrue(IgniteTestUtils.waitForCondition( + () -> client.connections().stream().map(ClusterNode::name).allMatch(allNodeNames::contains), 1000), + () -> "Client should have three connections: " + client.connections().size()); + } + } + + @Test + void testClientRefreshesDnsOnNodeFailure() throws Exception { + // One valid address. + AtomicReference resolvedAddressesRef = new AtomicReference<>(new String[]{loopbackAddress}); + + try (var server4 = TestServer.builder().listenAddresses(loopbackAddress).nodeName("server4").clusterId(AbstractClientTest.clusterId) + .build(); + var ignored = TestServer.builder().listenAddresses(hostAddress).nodeName("server5").clusterId(AbstractClientTest.clusterId) + .port(server4.port()).build(); + var client = TcpIgniteClient.startAsync(getClientConfiguration(new String[]{"my-cluster:" + server4.port()}, + 0L, resolvedAddressesRef)).join()) { + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server4", client.connections().get(0).name()); + + // Both nodes. + resolvedAddressesRef.set(new String[]{hostAddress}); + + // Stop first node. + server4.close(); + + Thread.sleep(100L); // Wait for channels were reinitialized. + + // Client should reconnect to the second node. + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server5", client.connections().get(0).name()); + } + } + + @Test + void testClientRefreshesDnsOnPrimaryReplicaChange() throws Exception { + String[] addresses = {"my-cluster:" + port}; + + // One valid address points to first node. + AtomicReference resolvedAddressesRef = new AtomicReference<>(new String[]{loopbackAddress}); + + try (var client = TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L, resolvedAddressesRef)).join()) { + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server1", client.connections().get(0).name()); + + // Change address to second node. + resolvedAddressesRef.set(new String[]{hostAddress}); + + ReliableChannel ch = ((TcpIgniteClient) client).channel(); + server1.placementDriver().setReplicas(List.of("server3", "server2", "server1", "server2"), + 1, + 1, + ch.partitionAssignmentTimestamp() + 1); + + Thread.sleep(100L); // Wait for channels were reinitialized. + + // Client should reconnect to the second node. + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server2", client.connections().get(0).name()); + } + } + + @Test + void testClientRefreshesDnsByTimeout() throws Exception { + String[] addresses = {"my-cluster:" + port}; + + // One valid address points to first node. + AtomicReference resolvedAddressesRef = new AtomicReference<>(new String[]{loopbackAddress}); + + try (var client = TcpIgniteClient.startAsync(getClientConfiguration(addresses, 500L, resolvedAddressesRef)).join()) { + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server1", client.connections().get(0).name()); + + // Change address to second node. + resolvedAddressesRef.set(new String[]{hostAddress}); + + Thread.sleep(500L); // Wait for background dns refresh. + + // Client should reconnect to the second node. + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server2", client.connections().get(0).name()); + } + } + + @Test + void testMultipleIpsSameNode() throws InterruptedException { + String[] addresses = {"my-cluster:" + server3.port()}; + + // One node. + AtomicReference resolvedAddressesRef = new AtomicReference<>(new String[]{loopbackAddress, hostAddress}); + + try (var client = TcpIgniteClient.startAsync(getClientConfiguration(addresses, 0L, resolvedAddressesRef)).join()) { + assertDoesNotThrow(() -> client.tables().tables()); + assertEquals("server3", client.connections().get(0).name()); + + ReliableChannel ch = ((TcpIgniteClient) client).channel(); + + List channels = IgniteTestUtils.getFieldValue(ch, "channels"); + assertEquals(2, channels.size()); + + // Update to another IPs for the same node. + resolvedAddressesRef.set(new String[]{hostAddress}); + + server3.placementDriver().setReplicas(List.of("server3", "server2", "server1", "server2"), + 1, + 1, + ch.partitionAssignmentTimestamp() + 1); + + // Wait until client connects to all nodes. + assertTrue(IgniteTestUtils.waitForCondition(() -> IgniteTestUtils.getFieldValue(ch, "channels").size() == 1, 1000), + () -> "Client should have three connections: " + client.connections().size()); + + // Client should reconnect to the second ips. + assertDoesNotThrow(() -> client.tables().tables()); + } + } + + private static IgniteClientConfigurationImpl getClientConfiguration( + String[] addresses, + long backgroundReResolveAddressesInterval, + AtomicReference resolvedAddressesRef + ) { + InetAddressResolver addressResolver = (addr) -> { + if ("my-cluster".equals(addr)) { + String[] resolved = resolvedAddressesRef.get(); + InetAddress[] result = new InetAddress[resolved.length]; + + for (int i = 0; i < resolved.length; i++) { + result[i] = InetAddress.getByName(resolved[i]); + } + + return result; + } else { + return InetAddress.getAllByName(addr); + } + }; + + return new IgniteClientConfigurationImpl( + null, + addresses, + 500, + 0, + null, + 50, + 50, + new RetryLimitPolicy(), + null, + null, + false, + null, + 1000, + 1000, + "my-client", + backgroundReResolveAddressesInterval, + addressResolver + ); + } +} diff --git a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java index 7a72dc401e60..24d655656a90 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java @@ -137,9 +137,11 @@ public void testDisabledRequestHandling() { String nodeName = "server-2"; FakeIgnite ignite = new FakeIgnite(nodeName); - try (TestServer testServer = - new TestServer(0, ignite, null, null, nodeName, UUID.randomUUID(), null, null, false, null)) { - + try (TestServer testServer = TestServer.builder() + .nodeName(nodeName) + .ignite(ignite) + .enableRequestHandling(false) + .build()) { Builder clientBuilder = IgniteClient.builder() .addresses("127.0.0.1:" + testServer.port()) .retryPolicy(new RetryLimitPolicy().retryLimit(0)) @@ -165,9 +167,11 @@ public void testEnableRequestHandlingDuringConnectionEstablishment() throws Exce String nodeName = "server-2"; FakeIgnite ignite = new FakeIgnite(nodeName); - try (TestServer testServer = - new TestServer(0, ignite, null, null, nodeName, UUID.randomUUID(), null, null, false, null)) { - + try (TestServer testServer = TestServer.builder() + .nodeName(nodeName) + .ignite(ignite) + .enableRequestHandling(false) + .build()) { Builder clientBuilder = IgniteClient.builder() .addresses("127.0.0.1:" + testServer.port()) .retryPolicy(new RetryLimitPolicy().retryLimit(0)) diff --git a/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java index 01f85d2ea235..7c8344d43809 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/FeatureCompatibilityTest.java @@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.BitSet; -import java.util.UUID; import org.apache.ignite.client.fakes.FakeIgnite; import org.apache.ignite.internal.TestHybridClock; import org.apache.ignite.internal.client.ClientChannel; @@ -34,6 +33,7 @@ import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; /** @@ -48,12 +48,19 @@ public class FeatureCompatibilityTest extends BaseIgniteAbstractTest { private void startServer(@Nullable BitSet features) { ignite = new FakeIgnite("server-1", new TestHybridClock(System::currentTimeMillis)); - testServer = new TestServer(0, ignite, reqId -> false, null, "server-1", UUID.randomUUID(), null, null, true, features); + + testServer = TestServer.builder() + .ignite(ignite) + .nodeName("server-1") + .features(features) + .shouldDropConnection(reqId -> false) + .build(); client = IgniteClient.builder().addresses("127.0.0.1:" + testServer.port()).build(); } - private void stopServer() throws Exception { + @AfterEach + public void stopServer() throws Exception { closeAll(client, testServer); } @@ -61,15 +68,11 @@ private void stopServer() throws Exception { public void testDirectMappingEnabled() throws Exception { startServer(null); - try { - ReliableChannel ch = ((TcpIgniteClient) client).channel(); + ReliableChannel ch = ((TcpIgniteClient) client).channel(); - ClientChannel ch0 = ch.getChannelAsync(null).join(); + ClientChannel ch0 = ch.getChannelAsync(null).join(); - assertTrue(ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS, TX_PIGGYBACK)); - } finally { - stopServer(); - } + assertTrue(ch0.protocolContext().allFeaturesSupported(TX_DIRECT_MAPPING, TX_DELAYED_ACKS, TX_PIGGYBACK)); } @Test @@ -78,16 +81,12 @@ public void testDirectMappingDisabled() throws Exception { features.set(TX_DIRECT_MAPPING.featureId()); startServer(features); - try { - ReliableChannel ch = ((TcpIgniteClient) client).channel(); + ReliableChannel ch = ((TcpIgniteClient) client).channel(); - ClientChannel ch0 = ch.getChannelAsync(null).join(); + ClientChannel ch0 = ch.getChannelAsync(null).join(); - assertFalse(ch0.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING)); - assertFalse(ch0.protocolContext().isFeatureSupported(TX_DELAYED_ACKS)); - assertFalse(ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK)); - } finally { - stopServer(); - } + assertFalse(ch0.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING)); + assertFalse(ch0.protocolContext().isFeatureSupported(TX_DELAYED_ACKS)); + assertFalse(ch0.protocolContext().isFeatureSupported(TX_PIGGYBACK)); } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java index 83f626b2385a..2604dfc9283a 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampComputePropagationTest.java @@ -23,7 +23,6 @@ import static org.apache.ignite.internal.util.IgniteUtils.closeAll; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; @@ -61,7 +60,11 @@ public class ObservableTimestampComputePropagationTest extends BaseIgniteAbstrac @BeforeAll public static void startServers() { var ignite1 = new FakeIgnite("server-1", new TestHybridClock(serverTimestamp::get)); - testServer = new TestServer(0, ignite1, null, null, "server-1", UUID.randomUUID(), null, null, true, null); + + testServer = TestServer.builder() + .ignite(ignite1) + .nodeName("server-1") + .build(); } @AfterAll diff --git a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java index fcfba93f6347..ef7f4c6b3ecb 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java @@ -21,7 +21,6 @@ import static org.apache.ignite.internal.util.IgniteUtils.closeAll; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.client.fakes.FakeIgnite; import org.apache.ignite.internal.TestHybridClock; @@ -49,7 +48,10 @@ public class ObservableTimestampPropagationTest extends BaseIgniteAbstractTest { @BeforeAll public static void startServer2() { ignite = new FakeIgnite("server-2", new TestHybridClock(currentServerTimestamp::get)); - testServer = new TestServer(0, ignite, null, null, "server-2", UUID.randomUUID(), null, null, true, null); + testServer = TestServer.builder() + .ignite(ignite) + .nodeName("server-2") + .build(); client = IgniteClient.builder().addresses("127.0.0.1:" + testServer.port()).build(); } diff --git a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java index 559f67556f6c..c1f551fdf4ef 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java @@ -100,11 +100,15 @@ public void testClientRepairsBackgroundConnectionsPeriodically(boolean reconnect Builder builder = IgniteClient.builder() .addresses("127.0.0.1:10901", "127.0.0.1:10902", "127.0.0.1:10903") .backgroundReconnectInterval(reconnectEnabled ? 50 : 0) - .heartbeatInterval(50); + .heartbeatInterval(100) + .backgroundReResolveAddressesInterval(0L); try (var client = builder.build()) { waitForConnections(client, 2); + // Skip channels refresh on partition assignment during connection. + Thread.sleep(100L); + server2.close(); waitForConnections(client, 1); diff --git a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java index 98e284e3d0f8..be5ca44560b4 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java @@ -247,7 +247,7 @@ public void testRetryPolicyConvertOpAllOperationsSupported() throws IllegalAcces @Test public void testRetryReadPolicyAllOperationsSupported() { var plc = new RetryReadPolicy(); - var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, null, 0, 0, null, null, null, false, null, 0, 1024, null); + var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, null, 0, 0, null, null, null, false, null, 0, 1024, null, 0); for (var op : ClientOperationType.values()) { var ctx = new RetryPolicyContextImpl(cfg, op, 0, null); @@ -336,6 +336,7 @@ private IgniteClient getClient(@Nullable RetryPolicy retryPolicy, @Nullable Logg .addresses("127.0.0.1:" + server.port()) .retryPolicy(retryPolicy) .loggerFactory(loggerFactory) + .backgroundReResolveAddressesInterval(0) .build(); } diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java index e7e30665e6b6..4121420d3ea2 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java @@ -106,6 +106,10 @@ public class TestServer implements AutoCloseable { private final FakeIgnite ignite; + public static Builder builder() { + return new Builder(); + } + /** * Constructor. * @@ -151,6 +155,7 @@ public TestServer( securityConfiguration, port, true, + null, null ); } @@ -171,7 +176,8 @@ public TestServer( @Nullable SecurityConfiguration securityConfiguration, @Nullable Integer port, boolean enableRequestHandling, - @Nullable BitSet features + @Nullable BitSet features, + String @Nullable [] listenAddresses ) { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); @@ -199,6 +205,7 @@ public TestServer( .changePort(port != null ? port : getFreePort()) .changeIdleTimeoutMillis(idleTimeout) .changeSendServerExceptionStackTraceToClient(true) + .changeListenAddresses(listenAddresses == null ? new String[0] : listenAddresses) ).join(); bootstrapFactory = new NettyBootstrapFactory(cfg.getConfiguration(NetworkExtensionConfiguration.KEY).network(), "TestServer-"); @@ -390,4 +397,97 @@ private static int getFreePort() { throw new IOError(e); } } + + /** + * Builder. + */ + public static class Builder { + private long idleTimeout = 1000; + private @Nullable FakeIgnite ignite; + private @Nullable Function shouldDropConnection; + private @Nullable Function responseDelay; + private @Nullable String nodeName; + private UUID clusterId = UUID.randomUUID(); + private @Nullable SecurityConfiguration securityConfiguration; + private @Nullable Integer port; + private boolean enableRequestHandling = true; + private @Nullable BitSet features; + private @Nullable String[] listenAddresses; + + public Builder idleTimeout(long idleTimeout) { + this.idleTimeout = idleTimeout; + return this; + } + + public Builder ignite(FakeIgnite ignite) { + this.ignite = ignite; + return this; + } + + public Builder shouldDropConnection(@Nullable Function shouldDropConnection) { + this.shouldDropConnection = shouldDropConnection; + return this; + } + + public Builder responseDelay(@Nullable Function responseDelay) { + this.responseDelay = responseDelay; + return this; + } + + public Builder nodeName(@Nullable String nodeName) { + this.nodeName = nodeName; + return this; + } + + public Builder clusterId(UUID clusterId) { + this.clusterId = clusterId; + return this; + } + + public Builder securityConfiguration(@Nullable SecurityConfiguration securityConfiguration) { + this.securityConfiguration = securityConfiguration; + return this; + } + + public Builder port(@Nullable Integer port) { + this.port = port; + return this; + } + + public Builder enableRequestHandling(boolean enableRequestHandling) { + this.enableRequestHandling = enableRequestHandling; + return this; + } + + public Builder features(@Nullable BitSet features) { + this.features = features; + return this; + } + + public Builder listenAddresses(@Nullable String... listenAddresses) { + this.listenAddresses = listenAddresses; + return this; + } + + /** + * Builds the test server. + * + * @return Test server. + */ + public TestServer build() { + return new TestServer( + idleTimeout, + ignite == null ? new FakeIgnite() : ignite, + shouldDropConnection, + responseDelay, + nodeName, + clusterId != null ? clusterId : UUID.randomUUID(), + securityConfiguration, + port, + enableRequestHandling, + features, + listenAddresses + ); + } + } } diff --git a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java index 37acfcb9f211..0a477c561cef 100644 --- a/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java +++ b/modules/jdbc/src/main/java/org/apache/ignite/jdbc/IgniteJdbcDriver.java @@ -308,7 +308,8 @@ private TcpIgniteClient createIgniteClient( extractAuthenticationConfiguration(connectionProperties), IgniteClientConfiguration.DFLT_OPERATION_TIMEOUT, connectionProperties.getPartitionAwarenessMetadataCacheSize(), - JdbcDatabaseMetadata.DRIVER_NAME + JdbcDatabaseMetadata.DRIVER_NAME, + IgniteClientConfigurationImpl.DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL ); ChannelValidator channelValidator = ctx -> { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java index 53e3ed5bcf07..6686fc6d9282 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientChannelValidatorTest.java @@ -214,7 +214,8 @@ private IgniteClient startClient(long reconnectInterval, ChannelValidator channe null, IgniteClientConfiguration.DFLT_OPERATION_TIMEOUT, IgniteClientConfiguration.DFLT_SQL_PARTITION_AWARENESS_METADATA_CACHE_SIZE, - null + null, + IgniteClientConfiguration.DFLT_BACKGROUND_RE_RESOLVE_ADDRESSES_INTERVAL ); return await(TcpIgniteClient.startAsync( diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java index 45a4b8cdb345..20336aa36956 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientConnectionTest.java @@ -19,6 +19,8 @@ import static org.apache.ignite.lang.ErrorGroups.Table.TABLE_NOT_FOUND_ERR; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -27,6 +29,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.internal.client.ClientChannel; import org.apache.ignite.internal.client.TcpIgniteClient; @@ -119,9 +123,17 @@ void testHeartbeat() { @Test void testExceptionHasHint() { - IgniteException ex = assertThrows(IgniteException.class, () -> client().sql().execute(null, "select x from bad")); - assertEquals("To see the full stack trace set clientConnector.sendServerExceptionStackTraceToClient:true", - ex.getCause().getCause().getCause().getCause().getMessage()); + // Execute on all nodes to collect all types of exception. + List causes = IntStream.range(0, client().configuration().addresses().length) + .mapToObj(i -> { + IgniteException ex = assertThrows(IgniteException.class, () -> client().sql().execute(null, "select x from bad")); + + return ex.getCause().getCause().getCause().getCause().getMessage(); + }) + .collect(Collectors.toList()); + + assertThat(causes, + hasItem(containsString("To see the full stack trace set clientConnector.sendServerExceptionStackTraceToClient:true"))); } @Test