Skip to content

Commit

Permalink
IGNITE-14823 Event abbrevation (#11110)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Dec 19, 2023
1 parent dc68f6b commit a0b9af0
Show file tree
Hide file tree
Showing 24 changed files with 137 additions and 136 deletions.
3 changes: 2 additions & 1 deletion modules/checkstyle/src/main/resources/abbrevations.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ database,db
default,dflt
destination,dest
directory,dir
#event,evt
event,evt
events,evts
#exception,e,e2,ex
#execute,exec
#frequency,freq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,20 @@ public static EventListenerDemultiplexer create(ClientConfiguration cfg) {
if (F.isEmpty(cfg.getEventListeners()))
return NO_OP;

List<RequestEventListener> qryEventListeners = new ArrayList<>();
List<ConnectionEventListener> connEventListeners = new ArrayList<>();
List<RequestEventListener> qryEvtListeners = new ArrayList<>();
List<ConnectionEventListener> connEvtListeners = new ArrayList<>();

for (EventListener l: cfg.getEventListeners()) {
if (l instanceof RequestEventListener)
qryEventListeners.add((RequestEventListener)l);
qryEvtListeners.add((RequestEventListener)l);
else if (l instanceof ConnectionEventListener)
connEventListeners.add((ConnectionEventListener)l);
connEvtListeners.add((ConnectionEventListener)l);
}

if (F.isEmpty(qryEventListeners) && F.isEmpty(connEventListeners))
if (F.isEmpty(qryEvtListeners) && F.isEmpty(connEvtListeners))
return NO_OP;

return new EventListenerDemultiplexer(qryEventListeners, connEventListeners, NullLogger.whenNull(cfg.getLogger()));
return new EventListenerDemultiplexer(qryEvtListeners, connEvtListeners, NullLogger.whenNull(cfg.getLogger()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,10 @@ public IdealAffinityAssignment calculate(
don't belong to affinity for current group (client node or filtered by nodeFilter). */
boolean skipCalculation = true;

for (DiscoveryEvent event : events.events()) {
boolean affNode = CU.affinityNode(event.eventNode(), nodeFilter);
for (DiscoveryEvent evt : events.events()) {
boolean affNode = CU.affinityNode(evt.eventNode(), nodeFilter);

if (affNode || event.type() == EVT_DISCOVERY_CUSTOM_EVT) {
if (affNode || evt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
skipCalculation = false;

break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ private EventAdapter startWaitForLocal(BinaryRawReaderEx reader, IgniteEvents ev

IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null;

int[] eventTypes = readEventTypes(reader);
int[] evtTypes = readEventTypes(reader);

return (EventAdapter)events.waitForLocal(filter, eventTypes);
return (EventAdapter)events.waitForLocal(filter, evtTypes);
}

/**
Expand All @@ -268,9 +268,9 @@ private IgniteFuture<EventAdapter> startWaitForLocalAsync(BinaryRawReaderEx read

IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null;

int[] eventTypes = readEventTypes(reader);
int[] evtTypes = readEventTypes(reader);

return events.waitForLocalAsync(filter, eventTypes);
return events.waitForLocalAsync(filter, evtTypes);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,21 +213,21 @@ protected void init() throws IgniteCheckedException {
else {
assert evtType == EVT_NODE_JOINED || evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED;

final ClusterNode eventNode = evt.eventNode();
final ClusterNode evtNode = evt.eventNode();

final Map<IgniteUuid, ServiceInfo> deployedServices = srvcProc.deployedServices();

if (evtType == EVT_NODE_LEFT || evtType == EVT_NODE_FAILED) {
deployedServices.forEach((srvcId, desc) -> {
if (desc.topologySnapshot().containsKey(eventNode.id()) ||
(desc.cacheName() != null && !eventNode.isClient())) // If affinity service
if (desc.topologySnapshot().containsKey(evtNode.id()) ||
(desc.cacheName() != null && !evtNode.isClient())) // If affinity service
toDeploy.put(srvcId, desc);
});
}
else {
toDeploy.putAll(deployedServices);

toDeploy.putAll(srvcProc.servicesReceivedFromJoin(eventNode.id()));
toDeploy.putAll(srvcProc.servicesReceivedFromJoin(evtNode.id()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,30 @@ public void testQuerySuccessEvents() {

assertEquals(2, evSet.size());

RequestStartEvent startEvent = (RequestStartEvent)evSet.get(RequestStartEvent.class);
RequestStartEvent startEvt = (RequestStartEvent)evSet.get(RequestStartEvent.class);

assertTrue(startEvent.requestId() >= 0);
assertTrue(startEvt.requestId() >= 0);

ConnectionDescription connDesc = startEvent.connectionDescription();
ConnectionDescription connDesc = startEvt.connectionDescription();
assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvent.operationCode());
assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvent.operationName());
assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvt.operationCode());
assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvt.operationName());

RequestSuccessEvent successEvent = (RequestSuccessEvent)evSet.get(RequestSuccessEvent.class);
assertEquals(successEvent.requestId(), successEvent.requestId());
RequestSuccessEvent successEvt = (RequestSuccessEvent)evSet.get(RequestSuccessEvent.class);
assertEquals(successEvt.requestId(), successEvt.requestId());

connDesc = startEvent.connectionDescription();
connDesc = startEvt.connectionDescription();
assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvent.operationCode());
assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvent.operationName());
assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvt.operationCode());
assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvt.operationName());

assertTrue(System.nanoTime() - startTime >= successEvent.elapsedTime(TimeUnit.NANOSECONDS));
assertTrue(System.nanoTime() - startTime >= successEvt.elapsedTime(TimeUnit.NANOSECONDS));

}
}
Expand All @@ -131,32 +131,32 @@ public void testQueryFailEvents() {
catch (ClientException err) {
assertEquals(2, evSet.size());

RequestStartEvent startEvent = (RequestStartEvent)evSet.get(RequestStartEvent.class);
RequestStartEvent startEvt = (RequestStartEvent)evSet.get(RequestStartEvent.class);

assertTrue(startEvent.requestId() >= 0);
assertTrue(startEvt.requestId() >= 0);

ConnectionDescription connDesc = startEvent.connectionDescription();
ConnectionDescription connDesc = startEvt.connectionDescription();
assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
assertEquals(ClientOperation.CACHE_PUT.code(), startEvent.operationCode());
assertEquals(ClientOperation.CACHE_PUT.name(), startEvent.operationName());
assertEquals(ClientOperation.CACHE_PUT.code(), startEvt.operationCode());
assertEquals(ClientOperation.CACHE_PUT.name(), startEvt.operationName());

RequestFailEvent failEvent = (RequestFailEvent)evSet.get(RequestFailEvent.class);
assertEquals(failEvent.requestId(), failEvent.requestId());
RequestFailEvent failEvt = (RequestFailEvent)evSet.get(RequestFailEvent.class);
assertEquals(failEvt.requestId(), failEvt.requestId());

connDesc = startEvent.connectionDescription();
connDesc = startEvt.connectionDescription();
assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
assertEquals(ClientOperation.CACHE_PUT.code(), startEvent.operationCode());
assertEquals(ClientOperation.CACHE_PUT.name(), startEvent.operationName());
assertEquals(ClientOperation.CACHE_PUT.code(), startEvt.operationCode());
assertEquals(ClientOperation.CACHE_PUT.name(), startEvt.operationName());

assertEquals(err, failEvent.throwable());
assertEquals(err, failEvt.throwable());

assertTrue(System.nanoTime() - startTime >= failEvent.elapsedTime(TimeUnit.NANOSECONDS));
assertTrue(System.nanoTime() - startTime >= failEvt.elapsedTime(TimeUnit.NANOSECONDS));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ public void testInMemoryClusterIdWithClientReconnect() throws Exception {

IgniteEx client0 = startGrid("client0");

AtomicBoolean reconnectEvent = new AtomicBoolean(false);
AtomicBoolean reconnectEvt = new AtomicBoolean(false);

client0.events().localListen((e) -> {
reconnectEvent.set(true);
reconnectEvt.set(true);

return true;
}, EventType.EVT_CLIENT_NODE_RECONNECTED);
Expand All @@ -157,7 +157,7 @@ public void testInMemoryClusterIdWithClientReconnect() throws Exception {
assertNotSame(oldId, cluster0.id());
assertNotSame(oldTag, cluster0.tag());

assertTrue(GridTestUtils.waitForCondition(reconnectEvent::get, 10_000));
assertTrue(GridTestUtils.waitForCondition(reconnectEvt::get, 10_000));

assertEquals("OldID " + oldId, cluster0.id(), client0.cluster().id());
assertEquals(cluster0.tag(), client0.cluster().tag());
Expand Down Expand Up @@ -348,9 +348,9 @@ public void testTagChangedEvent() throws Exception {
UUID clusterId = ig.cluster().id();
String generatedTag = ig.cluster().tag();

AtomicReference<UUID> clusterIdFromEvent = new AtomicReference<>(null);
AtomicReference<String> oldTagFromEvent = new AtomicReference<>(null);
AtomicReference<String> newTagFromEvent = new AtomicReference<>(null);
AtomicReference<UUID> clusterIdFromEvt = new AtomicReference<>(null);
AtomicReference<String> oldTagFromEvt = new AtomicReference<>(null);
AtomicReference<String> newTagFromEvt = new AtomicReference<>(null);

AtomicBoolean evtFired = new AtomicBoolean(false);

Expand All @@ -359,9 +359,9 @@ public void testTagChangedEvent() throws Exception {

ClusterTagUpdatedEvent tagUpdatedEvt = (ClusterTagUpdatedEvent)evt;

clusterIdFromEvent.set(tagUpdatedEvt.clusterId());
oldTagFromEvent.set(tagUpdatedEvt.previousTag());
newTagFromEvent.set(tagUpdatedEvt.newTag());
clusterIdFromEvt.set(tagUpdatedEvt.clusterId());
oldTagFromEvt.set(tagUpdatedEvt.previousTag());
newTagFromEvt.set(tagUpdatedEvt.newTag());

return true;
}, EventType.EVT_CLUSTER_TAG_UPDATED);
Expand All @@ -370,9 +370,9 @@ public void testTagChangedEvent() throws Exception {

assertTrue(GridTestUtils.waitForCondition(evtFired::get, 10_000));

assertEquals(clusterId, clusterIdFromEvent.get());
assertEquals(generatedTag, oldTagFromEvent.get());
assertEquals(CUSTOM_TAG_0, newTagFromEvent.get());
assertEquals(clusterId, clusterIdFromEvt.get());
assertEquals(generatedTag, oldTagFromEvt.get());
assertEquals(CUSTOM_TAG_0, newTagFromEvt.get());
}

/**
Expand All @@ -389,25 +389,25 @@ public void testTagChangedEventMultinodeWithRemoteFilter() throws Exception {
UUID clusterId = ig0.cluster().id();
String generatedTag = ig0.cluster().tag();

AtomicReference<UUID> eventNodeId = new AtomicReference<>(null);
AtomicReference<UUID> evtNodeId = new AtomicReference<>(null);

AtomicReference<UUID> clusterIdFromEvent = new AtomicReference<>(null);
AtomicReference<String> oldTagFromEvent = new AtomicReference<>(null);
AtomicReference<String> newTagFromEvent = new AtomicReference<>(null);
AtomicReference<UUID> clusterIdFromEvt = new AtomicReference<>(null);
AtomicReference<String> oldTagFromEvt = new AtomicReference<>(null);
AtomicReference<String> newTagFromEvt = new AtomicReference<>(null);

AtomicBoolean evtFired = new AtomicBoolean(false);

ig0.events(ig0.cluster().forRemotes()).remoteListen(
(IgniteBiPredicate<UUID, Event>)(uuid, event) -> {
eventNodeId.set(uuid);
evtNodeId.set(uuid);

evtFired.set(true);

ClusterTagUpdatedEvent tagUpdatedEvt = (ClusterTagUpdatedEvent)event;

clusterIdFromEvent.set(tagUpdatedEvt.clusterId());
oldTagFromEvent.set(tagUpdatedEvt.previousTag());
newTagFromEvent.set(tagUpdatedEvt.newTag());
clusterIdFromEvt.set(tagUpdatedEvt.clusterId());
oldTagFromEvt.set(tagUpdatedEvt.previousTag());
newTagFromEvt.set(tagUpdatedEvt.newTag());

return true;
},
Expand All @@ -417,10 +417,10 @@ public void testTagChangedEventMultinodeWithRemoteFilter() throws Exception {

assertTrue(GridTestUtils.waitForCondition(evtFired::get, 10_000));

assertEquals(ig1.localNode().id(), eventNodeId.get());
assertEquals(ig1.localNode().id(), evtNodeId.get());

assertEquals(clusterId, clusterIdFromEvent.get());
assertEquals(generatedTag, oldTagFromEvent.get());
assertEquals(CUSTOM_TAG_0, newTagFromEvent.get());
assertEquals(clusterId, clusterIdFromEvt.get());
assertEquals(generatedTag, oldTagFromEvt.get());
assertEquals(CUSTOM_TAG_0, newTagFromEvt.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
eventSpi.setExpireCount(50);
MemoryEventStorageSpi evtSpi = new MemoryEventStorageSpi();
evtSpi.setExpireCount(50);

cfg.setEventStorageSpi(eventSpi);
cfg.setEventStorageSpi(evtSpi);

return cfg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest {
cfg.setPeerClassLoadingEnabled(false);
cfg.setTimeServerPortRange(200);

MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
eventSpi.setExpireCount(100);
MemoryEventStorageSpi evtSpi = new MemoryEventStorageSpi();
evtSpi.setExpireCount(100);

cfg.setEventStorageSpi(eventSpi);
cfg.setEventStorageSpi(evtSpi);

((TcpCommunicationSpi)cfg.getCommunicationSpi()).setLocalPortRange(200);
((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,15 +596,15 @@ public void sync() {
},
this.causes);

for (Event event : evts) {
for (Event evt : evts) {
// Events returned from localQuery() are ordered by increasing local ID. Update the sync ID
// within a finally block to avoid applying duplicate events if the delegate listener
// throws an exception while processing the event.
try {
applyInternal(event);
applyInternal(evt);
}
finally {
this.syncedId = event.localOrder();
this.syncedId = evt.localOrder();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,8 +1203,8 @@ && equalOldValue(e, exp)) {
if (dup) {
for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
if (!e.isEmpty()) {
for (CacheEntryEvent<?, ?> event : e)
log.error("Got duplicate event: " + event);
for (CacheEntryEvent<?, ?> evt : e)
log.error("Got duplicate event: " + evt);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ private void executeContinuousQuery(IgniteCache<Object, Object> cache) throws Ex

qry.setLocalListener(
new CacheEntryUpdatedListener<Integer, String>() {
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts)
throws CacheEntryListenerException {
for (CacheEntryEvent<? extends Integer, ? extends String> event : events)
System.out.println("Key = " + event.getKey() + ", Value = " + event.getValue());
for (CacheEntryEvent<? extends Integer, ? extends String> evt : evts)
System.out.println("Key = " + evt.getKey() + ", Value = " + evt.getValue());
}
}
);
Expand Down
Loading

0 comments on commit a0b9af0

Please sign in to comment.