Skip to content

Commit

Permalink
IGNITE-20528 Support cache objects transformation for StandaloneKernal (
Browse files Browse the repository at this point in the history
  • Loading branch information
anton-vinogradov authored Oct 24, 2023
1 parent a1c13de commit 2aeba5c
Show file tree
Hide file tree
Showing 28 changed files with 320 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public StringData(String s) {
/**
*
*/
protected static class CompressionTransformer extends TestCacheObjectTransformerManagerAdapter {
protected static class CompressionTransformer extends TestCacheObjectTransformerProcessorAdapter {
/** Comptession type. */
protected static volatile CompressionType type = CompressionType.defaultType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
Expand Down Expand Up @@ -207,6 +208,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
*/
public MaintenanceRegistry maintenanceRegistry();

/**
* Gets transformation processor.
*
* @return Transformation processor.
*/
public CacheObjectTransformerProcessor transformer();

/**
* Gets system view manager.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.maintenance.MaintenanceProcessor;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
Expand Down Expand Up @@ -332,6 +333,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@GridToStringExclude
private MaintenanceProcessor maintenanceProc;

/** */
@GridToStringExclude
private CacheObjectTransformerProcessor transProc;

/** */
@GridToStringExclude
private List<GridComponent> comps = new LinkedList<>();
Expand Down Expand Up @@ -585,6 +590,8 @@ else if (comp instanceof DurableBackgroundTasksProcessor)
durableBackgroundTasksProcessor = (DurableBackgroundTasksProcessor)comp;
else if (comp instanceof MaintenanceProcessor)
maintenanceProc = (MaintenanceProcessor)comp;
else if (comp instanceof CacheObjectTransformerProcessor)
transProc = (CacheObjectTransformerProcessor)comp;
else if (comp instanceof PerformanceStatisticsProcessor)
perfStatProc = (PerformanceStatisticsProcessor)comp;
else if (comp instanceof IndexProcessor)
Expand Down Expand Up @@ -687,6 +694,11 @@ public void addHelper(Object helper) {
return maintenanceProc;
}

/** {@inheritDoc} */
@Override public CacheObjectTransformerProcessor transformer() {
return transProc;
}

/** {@inheritDoc} */
@Override public GridCacheProcessor cache() {
return cacheProc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.maintenance.MaintenanceProcessor;
Expand Down Expand Up @@ -917,8 +918,7 @@ public void start(
throw new IgniteCheckedException("User attribute has illegal name: '" + name + "'. Note that all names " +
"starting with '" + ATTR_PREFIX + "' are reserved for internal use.");

List<PluginProvider> plugins = cfg.getPluginProviders() != null && cfg.getPluginProviders().length > 0 ?
Arrays.asList(cfg.getPluginProviders()) : U.allPluginProviders();
List<PluginProvider> plugins = U.allPluginProviders(cfg);

// Spin out SPIs & managers.
try {
Expand Down Expand Up @@ -1101,6 +1101,11 @@ public void start(
startProcessor(new DistributedConfigurationProcessor(ctx));
startProcessor(new DurableBackgroundTasksProcessor(ctx));

CacheObjectTransformerProcessor transProc = createComponent(CacheObjectTransformerProcessor.class, ctx);

if (transProc != null)
startProcessor(transProc);

startTimer.finishGlobalStage("Start processors");

// Start plugins.
Expand Down Expand Up @@ -3321,6 +3326,9 @@ private static <T extends GridComponent> T createComponent(Class<T> cls, GridKer
if (cls.equals(IgniteRestProcessor.class))
return (T)new GridRestProcessor(ctx);

if (cls.equals(CacheObjectTransformerProcessor.class))
return null;

Class<T> implCls = null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import java.nio.ByteBuffer;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
import org.apache.ignite.internal.processors.GridProcessor;
import org.jetbrains.annotations.Nullable;

/**
* Provides cache object's bytes transformation (eg. encryption, compression, etc).
*/
public interface CacheObjectTransformerManager extends GridCacheSharedManager {
public interface CacheObjectTransformerProcessor extends GridProcessor {
/**
* Transforms the data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ private void consumeSegment(Path segment) {
.log(log)
.binaryMetadataFileStoreDir(binaryMeta)
.marshallerMappingFileStoreDir(marshaller)
.igniteConfigurationModifier((cfg) -> cfg.setPluginProviders(igniteCfg.getPluginProviders()))
.keepBinary(cdcCfg.isKeepBinary())
.filesOrDirs(segment.toFile())
.addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.nio.ByteBuffer;
import org.apache.ignite.events.CacheObjectTransformedEvent;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor;
import org.apache.ignite.internal.util.typedef.internal.U;

import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_TRANSFORMED;
Expand All @@ -28,12 +28,12 @@
/** */
public class CacheObjectTransformerUtils {
/** */
private static CacheObjectTransformerManager transformer(CacheObjectValueContext ctx) {
return ctx.kernalContext().cache().context().transformer();
private static CacheObjectTransformerProcessor transformer(CacheObjectValueContext ctx) {
return ctx.kernalContext().transformer();
}

/**
* Transforms bytes according to {@link CacheObjectTransformerManager} when specified.
* Transforms bytes according to {@link CacheObjectTransformerProcessor} when specified.
* @param bytes Given bytes.
* @param ctx Context.
* @return Transformed bytes.
Expand All @@ -43,15 +43,15 @@ public static byte[] transformIfNecessary(byte[] bytes, CacheObjectValueContext
}

/**
* Transforms bytes according to {@link CacheObjectTransformerManager} when specified.
* Transforms bytes according to {@link CacheObjectTransformerProcessor} when specified.
* @param bytes Given bytes.
* @param ctx Context.
* @return Transformed bytes.
*/
public static byte[] transformIfNecessary(byte[] bytes, int offset, int length, CacheObjectValueContext ctx) {
assert bytes[offset] != TRANSFORMED;

CacheObjectTransformerManager transformer = transformer(ctx);
CacheObjectTransformerProcessor transformer = transformer(ctx);

if (transformer == null)
return bytes;
Expand All @@ -64,7 +64,7 @@ public static byte[] transformIfNecessary(byte[] bytes, int offset, int length,

byte[] res = toArray(transformed);

if (ctx.kernalContext().event().isRecordable(EVT_CACHE_OBJECT_TRANSFORMED)) {
if (recordable(ctx, EVT_CACHE_OBJECT_TRANSFORMED)) {
ctx.kernalContext().event().record(
new CacheObjectTransformedEvent(ctx.kernalContext().discovery().localNode(),
"Object transformed",
Expand All @@ -79,7 +79,7 @@ public static byte[] transformIfNecessary(byte[] bytes, int offset, int length,
else {
byte[] res = detachIfNecessary(bytes, offset, length);

if (ctx.kernalContext().event().isRecordable(EVT_CACHE_OBJECT_TRANSFORMED)) {
if (recordable(ctx, EVT_CACHE_OBJECT_TRANSFORMED)) {
ctx.kernalContext().event().record(
new CacheObjectTransformedEvent(ctx.kernalContext().discovery().localNode(),
"Object transformation was cancelled.",
Expand Down Expand Up @@ -117,14 +117,14 @@ public static byte[] restoreIfNecessary(byte[] bytes, CacheObjectValueContext ct
if (bytes[0] != TRANSFORMED)
return bytes;

CacheObjectTransformerManager transformer = transformer(ctx);
CacheObjectTransformerProcessor transformer = transformer(ctx);

ByteBuffer src = ByteBuffer.wrap(bytes, 1, bytes.length - 1); // Skipping TRANSFORMED.
ByteBuffer restored = transformer.restore(src);

byte[] res = toArray(restored);

if (ctx.kernalContext().event().isRecordable(EVT_CACHE_OBJECT_TRANSFORMED)) {
if (recordable(ctx, EVT_CACHE_OBJECT_TRANSFORMED)) {
ctx.kernalContext().event().record(
new CacheObjectTransformedEvent(ctx.kernalContext().discovery().localNode(),
"Object restored",
Expand Down Expand Up @@ -158,4 +158,13 @@ private static byte[] toArray(ByteBuffer buf) {
return buf.array();
}
}

/**
* @param ctx Context.
* @param type Type.
*/
private static boolean recordable(CacheObjectValueContext ctx, int type) {
return ctx.kernalContext().event() != null // Can be null at external usage (via StandaloneGridKernalContext)
&& ctx.kernalContext().event().isRecordable(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
Expand Down Expand Up @@ -3074,8 +3073,6 @@ private GridCacheSharedContext createSharedContext(
if (snapshotMgr == null)
snapshotMgr = new IgniteSnapshotManager(ctx);

CacheObjectTransformerManager transMgr = ctx.plugins().createComponent(CacheObjectTransformerManager.class);

GridCacheIoManager ioMgr = new GridCacheIoManager();
CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager();
GridCacheSharedTtlCleanupManager ttl = new GridCacheSharedTtlCleanupManager();
Expand Down Expand Up @@ -3106,8 +3103,7 @@ private GridCacheSharedContext createSharedContext(
jta,
storeSesLsnrs,
mvccCachingMgr,
diagnosticMgr,
transMgr
diagnosticMgr
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cache.transform.CacheObjectTransformerManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
Expand Down Expand Up @@ -140,9 +139,6 @@ public class GridCacheSharedContext<K, V> {
/** Mvcc caching manager. */
private MvccCachingManager mvccCachingMgr;

/** Cache objects transformation manager. */
private CacheObjectTransformerManager transMgr;

/** Cache contexts map. */
private final ConcurrentHashMap<Integer, GridCacheContext<K, V>> ctxMap;

Expand Down Expand Up @@ -229,8 +225,7 @@ public GridCacheSharedContext(
CacheJtaManagerAdapter jtaMgr,
Collection<CacheStoreSessionListener> storeSesLsnrs,
MvccCachingManager mvccCachingMgr,
CacheDiagnosticManager diagnosticMgr,
CacheObjectTransformerManager transMgr
CacheDiagnosticManager diagnosticMgr
) {
this.kernalCtx = kernalCtx;

Expand All @@ -253,8 +248,7 @@ public GridCacheSharedContext(
ttlMgr,
evictMgr,
mvccCachingMgr,
diagnosticMgr,
transMgr
diagnosticMgr
);

this.storeSesLsnrs = storeSesLsnrs;
Expand Down Expand Up @@ -432,8 +426,7 @@ void onReconnected(boolean active) throws IgniteCheckedException {
ttlMgr,
evictMgr,
mvccCachingMgr,
diagnosticMgr,
transMgr
diagnosticMgr
);

this.mgrs = mgrs;
Expand Down Expand Up @@ -482,8 +475,7 @@ private void setManagers(
GridCacheSharedTtlCleanupManager ttlMgr,
PartitionsEvictManager evictMgr,
MvccCachingManager mvccCachingMgr,
CacheDiagnosticManager diagnosticMgr,
CacheObjectTransformerManager transMgr
CacheDiagnosticManager diagnosticMgr
) {
this.diagnosticMgr = add(mgrs, diagnosticMgr);
this.mvccMgr = add(mgrs, mvccMgr);
Expand All @@ -506,7 +498,6 @@ private void setManagers(
this.ttlMgr = add(mgrs, ttlMgr);
this.evictMgr = add(mgrs, evictMgr);
this.mvccCachingMgr = add(mgrs, mvccCachingMgr);
this.transMgr = add(mgrs, transMgr);
}

/**
Expand Down Expand Up @@ -876,13 +867,6 @@ public CacheDiagnosticManager diagnostic() {
return diagnosticMgr;
}

/**
* @return Cache objects transformation manager.
*/
public CacheObjectTransformerManager transformer() {
return transMgr;
}

/**
* @return Node ID.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2645,7 +2645,7 @@ public GridCloseableIterator<CacheDataRow> partitionRowIterator(GridKernalContex
GridCacheSharedContext<?, ?> sctx = new GridCacheSharedContext<>(ctx, null, null, null,
null, null, null, null, null,
null, null, null, null, null,
null, null, null, null, null, null);
null, null, null, null, null);

return new DataPageIterator(sctx, coctx, pageStore, partId);
}
Expand Down
Loading

0 comments on commit 2aeba5c

Please sign in to comment.