Skip to content

Commit

Permalink
IGNITE-20656 Java thin: Service awareness (#11003)
Browse files Browse the repository at this point in the history
Co-authored-by: Pavel Tupitsyn <ptupitsyn@apache.org>
(cherry picked from commit 7bcf232)
  • Loading branch information
Vladsz83 authored and NSAmelchev committed Nov 10, 2023
1 parent 4075297 commit dedcf99
Show file tree
Hide file tree
Showing 20 changed files with 951 additions and 23 deletions.
16 changes: 16 additions & 0 deletions docs/_docs/services/services.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ tab:C++[]
//== Accessing Services from Compute Tasks
// TODO the @ServiceResource annotation

== Service Awareness [[service_awareness]]
For link:../thin-clients/java-thin-client.adoc#java_thin_client[Java Thin Client] you can activate Service Awareness.
To do that, enable link:../thin-clients/java-thin-client.adoc#partition_awareness[Partition Awareness].

Without Service Awareness, the invocation requests are sent to a random node. If it has no service
instance deployed, the request is redirected to a different node. This additional network hop adds overhead.

With Service Awareness, the thin client knows where service instances are deployed and sends the request to the correct node.

[NOTE]
====
The service topology is updated asynchronously starting with the first service invocation.
Thus, some invocation redirects are still possible.
====


== Un-deploying Services

To undeploy a service, use the `IgniteServices.cancel(serviceName)` or `IgniteServices.cancelAll()` methods.
Expand Down
9 changes: 7 additions & 2 deletions docs/_docs/thin-clients/java-thin-client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Java Thin Client
= Java Thin Client [[java_thin_client]]

:sourceCodeFile: {javaCodeDir}/JavaThinClient.java
== Overview
Expand Down Expand Up @@ -75,7 +75,7 @@ include::{sourceCodeFile}[tag=connect-to-many-nodes,indent=0]

Note that the code above provides a failover mechanism in case of server node failures. Refer to the <<Handling Node Failures>> section for more information.

== Partition Awareness
== Partition Awareness [partition_awareness]

include::includes/partition-awareness.adoc[]

Expand Down Expand Up @@ -116,6 +116,11 @@ The code snippet shows how an example implementation might look like if you want

Also, you can check a link:https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/client/ClientKubernetesPutGetExample.java#L50[real example] of the interface implementation. `ThinClientKubernetesAddressFinder` is created to handle scalable Kubernetes environment.

[NOTE]
====
Partition Awareness also enables link:../services/services.adoc#service_awareness[Service Awareness]
====

== Using Key-Value API

The Java thin client supports most of the key-value operations available in the thick client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2053,14 +2053,14 @@ public void testBaselineAddOnNotActiveCluster() throws Exception {
String testOutStr = testOut.toString();

// Ignite instase 1 can be logged only in arguments list.
boolean isInstanse1Found = Arrays.stream(testOutStr.split("\n"))
boolean isInstance1Found = Arrays.stream(testOutStr.split("\n"))
.filter(s -> s.contains("Arguments:"))
.noneMatch(s -> s.contains(getTestIgniteInstanceName() + "1"));

assertTrue(testOutStr, testOutStr.contains("Node not found for consistent ID:"));

if (commandHandler.equals(CLI_CMD_HND))
assertFalse(testOutStr, isInstanse1Found);
assertFalse(testOutStr, isInstance1Found);
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ private TopologyNodes(AffinityTopologyVersion topVer, UUID nodeId) {
public Iterable<UUID> nodes() {
return Collections.unmodifiableCollection(nodes);
}

/**
* @return Topology version.
*/
public AffinityTopologyVersion version() {
return topVer;
}
}

/** Holder of a mapper factory and cacheName. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ public enum ClientOperation {
/** Get service descriptors. */
SERVICE_GET_DESCRIPTOR(7002),

/** Get service topology. */
SERVICE_GET_TOPOLOGY(7003),

/** Get or create an AtomicLong by name. */
ATOMIC_LONG_CREATE(9000),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,25 @@
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.client.ClientClusterGroup;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.ClientServiceDescriptor;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.service.ServiceCallContextImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.platform.PlatformServiceMethod;
import org.apache.ignite.platform.PlatformType;
import org.apache.ignite.services.ServiceCallContext;
Expand All @@ -45,6 +52,9 @@
* Implementation of {@link ClientServices}.
*/
class ClientServicesImpl implements ClientServices {
/** Max service topology update period in mills. */
static final int SRV_TOP_UPDATE_PERIOD = 10_000;

/** Channel. */
private final ReliableChannel ch;

Expand All @@ -57,13 +67,23 @@ class ClientServicesImpl implements ClientServices {
/** Cluster group. */
private final ClientClusterGroupImpl grp;

/** Logger. */
private final IgniteLogger log;

/** Services topology. {@code Null} if partition awareness is not enabled. */
private final Map<String, ServiceTopology> servicesTopologies;

/** Constructor. */
ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl grp) {
ClientServicesImpl(ReliableChannel ch, ClientBinaryMarshaller marsh, ClientClusterGroupImpl grp, IgniteLogger log) {
this.ch = ch;
this.marsh = marsh;
this.grp = grp;

utils = new ClientUtils(marsh);

this.log = log;

servicesTopologies = ch.partitionAwarenessEnabled ? new ConcurrentHashMap<>() : Collections.emptyMap();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -165,7 +185,91 @@ private ClientServiceDescriptorImpl readServiceDescriptor(BinaryReaderExImpl rea
ClientServices withClusterGroup(ClientClusterGroupImpl grp) {
A.notNull(grp, "grp");

return new ClientServicesImpl(ch, marsh, grp);
return new ClientServicesImpl(ch, marsh, grp, log);
}

/**
* Keeps topology of certain service and its update progress meta.
*/
private final class ServiceTopology {
/** The service name. */
private final String srvcName;

/** If {@code true}, topology update of current service is in progress. */
private final AtomicBoolean updateInProgress = new AtomicBoolean();

/** Time of the last received topology. */
private volatile long lastUpdateRequestTime;

/** UUID of the nodes with at least one service instance. */
private volatile List<UUID> nodes = Collections.emptyList();

/** Last cluster topology version when current service topology was actual. */
private volatile AffinityTopologyVersion lastAffTop;

/** */
private ServiceTopology(String name) {
srvcName = name;
}

/**
* @return {@code True} if update of the service topology is required. {@code False} otherwise.
*/
private boolean isUpdateRequired(AffinityTopologyVersion curAffTop) {
return lastAffTop == null || curAffTop.topologyVersion() > lastAffTop.topologyVersion()
|| U.nanosToMillis(System.nanoTime() - lastUpdateRequestTime) >= SRV_TOP_UPDATE_PERIOD;
}

/**
* Asynchronously updates the service topology.
*/
private void updateTopologyAsync() {
AffinityTopologyVersion curAffTop = ch.affinityContext().lastTopology().version();

if (!updateInProgress.compareAndSet(false, true))
return;

ch.serviceAsync(
ClientOperation.SERVICE_GET_TOPOLOGY,
req -> utils.writeObject(req.out(), srvcName),
resp -> {
int cnt = resp.in().readInt();

List<UUID> res = new ArrayList<>(cnt);

for (int i = 0; i < cnt; ++i)
res.add(new UUID(resp.in().readLong(), resp.in().readLong()));

return res;
}).whenComplete((nodes, err) -> {
if (err == null) {
this.nodes = nodes;
lastAffTop = curAffTop;
lastUpdateRequestTime = System.nanoTime();

if (log.isDebugEnabled()) {
log.debug("Topology of service '" + srvcName + "' has been updated. The " +
"service instance nodes: " + nodes);
}
}
else
log.error("Failed to update topology of the service '" + srvcName + "'.", err);

updateInProgress.set(false);
});
}

/**
* Provides last known service topology and asynchronously updates it if required.
*
* @return Last known service topology.
*/
public List<UUID> getAndUpdate() {
if (isUpdateRequired(ch.affinityContext().lastTopology().version()))
updateTopologyAsync();

return nodes;
}
}

/**
Expand Down Expand Up @@ -215,14 +319,27 @@ private ServiceInvocationHandler(

return ch.service(ClientOperation.SERVICE_INVOKE,
req -> writeServiceInvokeRequest(req, nodeIds, method, args),
res -> utils.readObject(res.in(), false, method.getReturnType())
res -> utils.readObject(res.in(), false, method.getReturnType()),
serviceTopology()
);
}
catch (ClientError e) {
throw new ClientException(e);
}
}

/**
* @return Actual known service topology or empty list if: service topology is not enabled, not supported or
* not received yet.
*/
private List<UUID> serviceTopology() {
if (!ch.partitionAwarenessEnabled
|| !ch.applyOnDefaultChannel(c -> c.protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.SERVICE_TOPOLOGY), null))
return Collections.emptyList();

return servicesTopologies.computeIfAbsent(name, ServiceTopology::new).getAndUpdate();
}

/**
* @param ch Payload output channel.
* @param nodeIds Node IDs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ public enum ProtocolBitmaskFeature {
INDEX_QUERY(14),

/** IndexQuery limit. */
INDEX_QUERY_LIMIT(15);
INDEX_QUERY_LIMIT(15),

/** Service topology. */
SERVICE_TOPOLOGY(16);

/** */
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -74,7 +75,7 @@ final class ReliableChannel implements AutoCloseable {
private volatile int curChIdx = -1;

/** Partition awareness enabled. */
private final boolean partitionAwarenessEnabled;
final boolean partitionAwarenessEnabled;

/** Cache partition awareness context. */
private final ClientCacheAffinityContext affinityCtx;
Expand Down Expand Up @@ -182,7 +183,32 @@ public <T> T service(
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException, ClientError {
return applyOnDefaultChannel(channel -> channel.service(op, payloadWriter, payloadReader), op);
return service(op, payloadWriter, payloadReader, Collections.emptyList());
}

/**
* Send request to one of the passed nodes and handle response.
*
* @throws ClientException Thrown by {@code payloadWriter} or {@code payloadReader}.
* @throws ClientAuthenticationException When user name or password is invalid.
* @throws ClientAuthorizationException When user has no permission to perform operation.
* @throws ClientProtocolError When failed to handshake with server.
* @throws ClientServerError When failed to process request on server.
*/
public <T> T service(
ClientOperation op,
Consumer<PayloadOutputChannel> payloadWriter,
Function<PayloadInputChannel, T> payloadReader,
List<UUID> targetNodes
) throws ClientException, ClientError {
if (F.isEmpty(targetNodes))
return applyOnDefaultChannel(channel -> channel.service(op, payloadWriter, payloadReader), op);

return applyOnNodeChannelWithFallback(
targetNodes.get(ThreadLocalRandom.current().nextInt(targetNodes.size())),
channel -> channel.service(op, payloadWriter, payloadReader),
op
);
}

/**
Expand Down Expand Up @@ -943,7 +969,7 @@ class ClientChannelHolder {
/** Channel. */
private volatile ClientChannel ch;

/** ID of the last server node that {@link ch} is or was connected to. */
/** ID of the last server node that {@link #ch} is or was connected to. */
private volatile UUID serverNodeId;

/** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public class TcpIgniteClient implements IgniteClient {
/** Serializer/deserializer. */
private final ClientUtils serDes;

/** Logger. */
private final IgniteLogger log;

/**
* Private constructor. Use {@link TcpIgniteClient#start(ClientConfiguration)} to create an instance of
* {@code TcpIgniteClient}.
Expand All @@ -122,6 +125,8 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration cfg
) throws ClientException {
log = NullLogger.whenNull(cfg.getLogger());

final ClientBinaryMetadataHandler metadataHnd = new ClientBinaryMetadataHandler();

ClientMarshallerContext marshCtx = new ClientMarshallerContext();
Expand Down Expand Up @@ -159,7 +164,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {

compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());

services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup(), log);

lsnrsRegistry = new ClientCacheEntryListenersRegistry();
}
Expand Down Expand Up @@ -475,8 +480,6 @@ private void retrieveBinaryConfiguration(ClientConfiguration cfg) {
if (clusterCfg == null)
return;

IgniteLogger log = NullLogger.whenNull(cfg.getLogger());

if (log.isDebugEnabled())
log.debug("Cluster binary configuration retrieved: " + clusterCfg);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
INDEX_QUERY(14),

/** IndexQuery limit. */
INDEX_QUERY_LIMIT(15);
INDEX_QUERY_LIMIT(15),

/** Service topology. */
SERVICE_TOPOLOGY(16);

/** */
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
Expand Down
Loading

0 comments on commit dedcf99

Please sign in to comment.