Skip to content

Commit

Permalink
IGNITE-14823 Affinity abbrevation (apache#11083)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored and Якимова Анастасия Николаевна committed Dec 18, 2023
1 parent 46d51c1 commit d7f88d6
Show file tree
Hide file tree
Showing 52 changed files with 235 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public void cacheSharedContext(GridCacheSharedContext<?, ?> cacheSharedContext)
if (cacheId == CU.UNDEFINED_CACHE_ID)
return k -> k == null ? 0 : U.safeAbs(k.hashCode());

AffinityFunction affinity = cacheSharedContext.cacheContext(cacheId).group().affinityFunction();
AffinityFunction aff = cacheSharedContext.cacheContext(cacheId).group().affinityFunction();

return affinity::partition;
return aff::partition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ private static final class HashDistribution extends DistributionFunction {
assert F.isEmpty(assignment) || assignment.size() == 1;
}

AffinityAdapter<Row> affinity = new AffinityAdapter<>(affSrvc.affinity(CU.UNDEFINED_CACHE_ID), k.toIntArray(),
AffinityAdapter<Row> aff = new AffinityAdapter<>(affSrvc.affinity(CU.UNDEFINED_CACHE_ID), k.toIntArray(),
ctx.rowHandler());

return new Partitioned<>(assignments, affinity);
return new Partitioned<>(assignments, aff);
}
}

Expand Down Expand Up @@ -312,9 +312,9 @@ public AffinityDistribution(int cacheId, Object identity) {
assert F.isEmpty(assignment) || assignment.size() == 1;
}

AffinityAdapter<Row> affinity = new AffinityAdapter<>(affSrvc.affinity(cacheId), k.toIntArray(), ctx.rowHandler());
AffinityAdapter<Row> aff = new AffinityAdapter<>(affSrvc.affinity(cacheId), k.toIntArray(), ctx.rowHandler());

return new Partitioned<>(assignments, affinity);
return new Partitioned<>(assignments, aff);
}

/** {@inheritDoc} */
Expand Down
2 changes: 1 addition & 1 deletion modules/checkstyle/src/main/resources/abbrevations.csv
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# First word is wrong full term, other is correct abbrevations to build helpfull error message.
address,addr
addresses,addrs
#affinity,aff
affinity,aff
#argument,arg
array,arr
#attribute,attr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,12 @@ public void testQueryFailover() throws Exception {
stmt.execute(sql);
stmt.execute(sql);

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

Integer part = ((PartitionSingleNode)affinityCache.partitionResult(
Integer part = ((PartitionSingleNode)affCache.partitionResult(
new QualifiedSQLQuery("PUBLIC", sql)).partitionResult().tree()).value();

UUID nodeId = affinityCache.cacheDistribution(GridCacheUtils.cacheId(cacheName))[part];
UUID nodeId = affCache.cacheDistribution(GridCacheUtils.cacheId(cacheName))[part];

int gridIdx = new Integer(Ignition.ignite(nodeId).name().substring(getTestIgniteInstanceName().length()));
stopGrid(gridIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,15 @@ public void testChangeTopologyDetectionWithinPartitionDistributionResponse() thr

stmt.executeQuery(sqlQry);

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

startGrid(3);

stmt.executeQuery(sqlQry);

AffinityCache recreatedAffinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache recreatedAffCache = GridTestUtils.getFieldValue(conn, "affinityCache");

assertTrue(recreatedAffinityCache.version().compareTo(affinityCache.version()) > 0);
assertTrue(recreatedAffCache.version().compareTo(affCache.version()) > 0);
}

/**
Expand All @@ -392,15 +392,15 @@ public void testChangeTopologyDetectionWithinQueryExecutionResponse() throws Exc
stmt.executeQuery(sqlQry);
stmt.executeQuery(sqlQry);

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

startGrid(4);

stmt.executeQuery("select * from Person where _key = 2");

AffinityCache recreatedAffinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache recreatedAffCache = GridTestUtils.getFieldValue(conn, "affinityCache");

assertTrue(recreatedAffinityCache.version().compareTo(affinityCache.version()) > 0);
assertTrue(recreatedAffCache.version().compareTo(affCache.version()) > 0);
}

/**
Expand All @@ -415,15 +415,15 @@ public void testChangeTopologyDetectionWithinPartitionAwarenessUnrelatedQuery()

ResultSet rs = stmt.executeQuery(sqlQry);

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

startGrid(5);

rs.getMetaData();

AffinityCache recreatedAffinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache recreatedAffCache = GridTestUtils.getFieldValue(conn, "affinityCache");

assertTrue(recreatedAffinityCache.version().compareTo(affinityCache.version()) > 0);
assertTrue(recreatedAffCache.version().compareTo(affCache.version()) > 0);
}

/**
Expand All @@ -445,9 +445,9 @@ public void testPartitionAwarenessIsSkippedIfItIsSwitchedOff() throws Exception

stmt.executeQuery("select * from \"" + cacheName + "\".Person where _key = 1");

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

assertNull("Affinity cache is not null.", affinityCache);
assertNull("Affinity cache is not null.", affCache);
}
}

Expand All @@ -470,9 +470,9 @@ public void testPartitionAwarenessIsSkippedByDefault() throws Exception {

stmt.executeQuery("select * from \"" + cacheName + "\".Person where _key = 1");

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

assertNull("Affinity cache is not null.", affinityCache);
assertNull("Affinity cache is not null.", affCache);
}
}

Expand Down Expand Up @@ -500,10 +500,10 @@ public void testAffinityCacheStoresSchemaBindedQueries() throws Exception {

stmt.execute("select * from \"" + cacheName.toUpperCase() + "\".Person where _key = 1");

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

GridBoundedLinkedHashMap<QualifiedSQLQuery, JdbcThinPartitionResultDescriptor> sqlCache =
GridTestUtils.getFieldValue(affinityCache, "sqlCache");
GridTestUtils.getFieldValue(affCache, "sqlCache");

Set<String> schemas = sqlCache.keySet().stream().map(QualifiedSQLQuery::schemaName).collect(Collectors.toSet());

Expand Down Expand Up @@ -534,13 +534,13 @@ public void testAffinityCacheCompactsPartitionDistributions() throws Exception {
stmt.execute("select * from \"" + cacheName + "\".Person where _key = 2");
stmt.execute("select * from \"" + cacheName + "\".Person where _key = 2");

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

GridBoundedLinkedHashMap<QualifiedSQLQuery, JdbcThinPartitionResultDescriptor> sqlCache =
GridTestUtils.getFieldValue(affinityCache, "sqlCache");
GridTestUtils.getFieldValue(affCache, "sqlCache");

GridBoundedLinkedHashMap<Integer, UUID[]> cachePartitionsDistribution =
GridTestUtils.getFieldValue(affinityCache, "cachePartitionsDistribution");
GridTestUtils.getFieldValue(affCache, "cachePartitionsDistribution");

assertEquals("Sql sub-cache of affinity cache has unexpected number of elements.",
2, sqlCache.size());
Expand Down Expand Up @@ -588,13 +588,13 @@ public void testPartitionAwarenessLimitedCacheSize() throws Exception {
stmt.executeQuery("select * from \"" + cacheName2 + "\".Person where _key = 1");
stmt.executeQuery("select * from \"" + cacheName2 + "\".Person where _key = 1");

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

GridBoundedLinkedHashMap<Integer, UUID[]> partitionsDistributionCache =
GridTestUtils.getFieldValue(affinityCache, "cachePartitionsDistribution");
GridTestUtils.getFieldValue(affCache, "cachePartitionsDistribution");

GridBoundedLinkedHashMap<QualifiedSQLQuery, JdbcThinPartitionResultDescriptor> sqlCache =
GridTestUtils.getFieldValue(affinityCache, "sqlCache");
GridTestUtils.getFieldValue(affCache, "sqlCache");

assertEquals("Unexpected count of partitions distributions.", 1,
partitionsDistributionCache.size());
Expand Down Expand Up @@ -650,9 +650,9 @@ protected void verifyPartitionResultIsNull(String sqlQry, int expRowsCnt) throws

assertEquals("Rows counter doesn't match expected value.", expRowsCnt, rowCntr);

AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache");
AffinityCache affCache = GridTestUtils.getFieldValue(conn, "affinityCache");

PartitionResult gotPartRes = affinityCache.partitionResult(
PartitionResult gotPartRes = affCache.partitionResult(
new QualifiedSQLQuery("default", sqlQry)).partitionResult();

assertNull("Partition result descriptor is not null.", gotPartRes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ protected <R> GridClientFuture<R> withReconnectHandling(ClientProjectionClosure<
*/
protected <R> GridClientFuture<R> withReconnectHandling(ClientProjectionClosure<R> c, String cacheName,
@Nullable Object affKey) {
GridClientDataAffinity affinity = client.affinity(cacheName);
GridClientDataAffinity aff = client.affinity(cacheName);

// If pinned (fixed-nodes) or no affinity provided use balancer.
if (nodes != null || affinity == null || affKey == null)
if (nodes != null || aff == null || affKey == null)
return withReconnectHandling(c);

try {
Expand All @@ -191,7 +191,7 @@ protected <R> GridClientFuture<R> withReconnectHandling(ClientProjectionClosure<
throw new GridServerUnreachableException("Failed to get affinity node (no nodes in topology were " +
"accepted by the filter): " + filter);

GridClientNode node = affinity.node(affKey, prjNodes);
GridClientNode node = aff.node(affKey, prjNodes);

for (int i = 0; i < RETRY_CNT; i++) {
GridClientConnection conn = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ public class GridClientDataImpl extends GridClientAbstractProjection<GridClientD
@Override public <K> UUID affinity(K key) throws GridClientException {
A.notNull(key, "key");

GridClientDataAffinity affinity = client.affinity(cacheName);
GridClientDataAffinity aff = client.affinity(cacheName);

if (affinity == null)
if (aff == null)
return null;

Collection<? extends GridClientNode> prj = projectionNodes();
Expand All @@ -270,7 +270,7 @@ public class GridClientDataImpl extends GridClientAbstractProjection<GridClientD
throw new GridClientException("Failed to get affinity node (projection node set for cache is empty): " +
cacheName());

GridClientNode node = affinity.node(key, prj);
GridClientNode node = aff.node(key, prj);

assert node != null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,9 +510,9 @@ private void tryInit() throws GridClientException, InterruptedException {
overallCaches.putAll(node.caches());

for (Map.Entry<String, GridClientCacheMode> entry : overallCaches.entrySet()) {
GridClientDataAffinity affinity = affinity(entry.getKey());
GridClientDataAffinity aff = affinity(entry.getKey());

if (affinity instanceof GridClientPartitionAffinity && entry.getValue() !=
if (aff instanceof GridClientPartitionAffinity && entry.getValue() !=
GridClientCacheMode.PARTITIONED)
log.warning(GridClientPartitionAffinity.class.getSimpleName() + " is used for a cache configured " +
"for non-partitioned mode [cacheName=" + entry.getKey() + ", cacheMode=" + entry.getValue() + ']');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@ public Collection<Integer> cacheIds() {
* @return Affinity node id or {@code null} if affinity node can't be determined for given cache and key.
*/
public UUID affinityNode(IgniteBinary binary, int cacheId, Object key) {
CacheAffinityInfo affinityInfo = cacheAffinity.get(cacheId);
CacheAffinityInfo affInfo = cacheAffinity.get(cacheId);

if (affinityInfo == null || affinityInfo == NOT_APPLICABLE_CACHE_AFFINITY_INFO)
if (affInfo == null || affInfo == NOT_APPLICABLE_CACHE_AFFINITY_INFO)
return null;

Object binaryKey = binary.toBinary(key);

if (!affinityInfo.keyCfg.isEmpty()) {
if (!affInfo.keyCfg.isEmpty()) {
int typeId = binary.typeId(key.getClass().getName());

Integer fieldId = affinityInfo.keyCfg.get(typeId);
Integer fieldId = affInfo.keyCfg.get(typeId);

if (fieldId != null) {
if (binaryKey instanceof BinaryObjectExImpl)
Expand All @@ -104,7 +104,7 @@ public UUID affinityNode(IgniteBinary binary, int cacheId, Object key) {
}
}

return affinityInfo.nodeForKey(binaryKey);
return affInfo.nodeForKey(binaryKey);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ public ClientIgniteSetImpl(
};

if (colocated) {
Object affinityKey = name.hashCode();
Object affKey = name.hashCode();

return ch.affinityService(cacheId, affinityKey, ClientOperation.OP_SET_ITERATOR_START, payloadWriter, payloadReader);
return ch.affinityService(cacheId, affKey, ClientOperation.OP_SET_ITERATOR_START, payloadWriter, payloadReader);
}

return ch.service(ClientOperation.OP_SET_ITERATOR_START, payloadWriter, payloadReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,15 +1162,15 @@ private UUID[] retrieveCacheDistribution(int cacheId, int partCnt) throws IOExce

assert res.status() == ClientListenerResponse.STATUS_SUCCESS;

AffinityTopologyVersion resAffinityVer = res.affinityVersion();
AffinityTopologyVersion resAffVer = res.affinityVersion();

if (affinityCache.version().compareTo(resAffinityVer) < 0) {
if (affinityCache.version().compareTo(resAffVer) < 0) {
affinityCache = new AffinityCache(
resAffinityVer,
resAffVer,
connProps.getPartitionAwarenessPartitionDistributionsCacheSize(),
connProps.getPartitionAwarenessSqlCacheSize());
}
else if (affinityCache.version().compareTo(resAffinityVer) > 0) {
else if (affinityCache.version().compareTo(resAffVer) > 0) {
// Jdbc thin affinity cache is binded to the newer affinity topology version, so we should ignore retrieved
// partition distribution. Given situation might occur in case of concurrent race and is not
// possible in single-threaded jdbc thin client, so it's a reserve for the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private void printCachesConfig(
private static Map<String, Object> mapToPairs(CacheConfiguration cfg) {
Map<String, Object> params = new LinkedHashMap<>();

CacheAffinityConfiguration affinityCfg = cfg.getAffinityConfiguration();
CacheAffinityConfiguration affCfg = cfg.getAffinityConfiguration();
CacheNearConfiguration nearCfg = cfg.getNearConfiguration();
CacheRebalanceConfiguration rebalanceCfg = cfg.getRebalanceConfiguration();
CacheEvictionConfiguration evictCfg = cfg.getEvictionConfiguration();
Expand Down Expand Up @@ -225,11 +225,11 @@ private static Map<String, Object> mapToPairs(CacheConfiguration cfg) {
params.put("Write Synchronization Mode", cfg.getWriteSynchronizationMode());
params.put("Invalidate", cfg.isInvalidate());

params.put("Affinity Function", affinityCfg.getFunction());
params.put("Affinity Backups", affinityCfg.getPartitionedBackups());
params.put("Affinity Partitions", affinityCfg.getPartitions());
params.put("Affinity Exclude Neighbors", affinityCfg.isExcludeNeighbors());
params.put("Affinity Mapper", affinityCfg.getMapper());
params.put("Affinity Function", affCfg.getFunction());
params.put("Affinity Backups", affCfg.getPartitionedBackups());
params.put("Affinity Partitions", affCfg.getPartitions());
params.put("Affinity Exclude Neighbors", affCfg.isExcludeNeighbors());
params.put("Affinity Mapper", affCfg.getMapper());

params.put("Rebalance Mode", rebalanceCfg.getMode());
params.put("Rebalance Batch Size", rebalanceCfg.getBatchSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ public IdealAffinityAssignment calculate(
boolean skipCalculation = true;

for (DiscoveryEvent event : events.events()) {
boolean affinityNode = CU.affinityNode(event.eventNode(), nodeFilter);
boolean affNode = CU.affinityNode(event.eventNode(), nodeFilter);

if (affinityNode || event.type() == EVT_DISCOVERY_CUSTOM_EVT) {
if (affNode || event.type() == EVT_DISCOVERY_CUSTOM_EVT) {
skipCalculation = false;

break;
Expand Down Expand Up @@ -465,17 +465,17 @@ private void recalculateBaselineAssignment(
List<ClusterNode> sorted,
BaselineTopology blt
) {
List<ClusterNode> baselineAffinityNodes = blt.createBaselineView(sorted, nodeFilter);
List<ClusterNode> baselineAffNodes = blt.createBaselineView(sorted, nodeFilter);

List<List<ClusterNode>> calculated = aff.assignPartitions(new GridAffinityFunctionContextImpl(
baselineAffinityNodes,
baselineAffNodes,
prevAssignment != null ? prevAssignment.assignment() : null,
events != null ? events.lastEvent() : null,
topVer,
backups
));

baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffinityNodes, calculated);
baselineAssignment = IdealAffinityAssignment.create(topVer, baselineAffNodes, calculated);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ public static Map<Object, Set<Integer>> calculatePrimaries(
Map<Object, Set<Integer>> primaryPartitions = U.newHashMap(nodesSize);

for (int size = assignment.size(), p = 0; p < size; p++) {
List<ClusterNode> affinityNodes = assignment.get(p);
List<ClusterNode> affNodes = assignment.get(p);

if (!affinityNodes.isEmpty()) {
ClusterNode primary = affinityNodes.get(0);
if (!affNodes.isEmpty()) {
ClusterNode primary = affNodes.get(0);

primaryPartitions.computeIfAbsent(primary.consistentId(),
id -> new HashSet<>(U.capacity(size / nodesSize * 2))).add(p);
Expand Down
Loading

0 comments on commit d7f88d6

Please sign in to comment.