Skip to content

Commit

Permalink
Merge pull request #25 from yuriy-glotanov/2020.1beta
Browse files Browse the repository at this point in the history
fix indexframe rsync bug
  • Loading branch information
interference-project authored Mar 5, 2020
2 parents 466f507 + 2ea00b7 commit 750f20e
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 33 deletions.
Binary file removed interference-2019.3.jar
Binary file not shown.
1 change: 0 additions & 1 deletion interference-2019.3.jar.md5

This file was deleted.

Binary file modified interference-2020.1.jar
Binary file not shown.
1 change: 1 addition & 0 deletions interference-2020.1.jar.md5
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
c790f85b766e557a220bfbc57ba76047 *interference-2020.1.jar
4 changes: 4 additions & 0 deletions src/main/java/su/interference/persistent/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,18 @@ public DataFile(int type, int nodeId, int remoteId) {
//normal order of access - lock datafile, then lock table<framedata>
//but, in case of allocate undo space this may looks as:
//lock datafile<undo> - lock table<framedata> - try lock datafile
//todo deprecated started param
public synchronized FrameData createNewFrame(FrameData frame, int frameType, long allocId, boolean started, boolean external, DataObject t, Session s, LLT llt) throws Exception {
//deadlock bug fix
//instead this.allocateFrame we lock Table<FrameData> first
final FrameData bd = t.allocateFrame(this, t, s, llt);
final boolean setcurrenable = external?false:t.getName().equals("su.interference.persistent.UndoChunk")?false:true;
//allocated for rframe
if (allocId>0) { bd.setAllocId(allocId); }

//todo deprecated
if (started) { bd.setStarted(1); }

final Frame db = frameType == 0 ? new DataFrame(bd, t) : new IndexFrame(bd, frameType, t);
db.setObjectId(t.getObjectId());
bd.setFrame(db);
Expand Down
30 changes: 22 additions & 8 deletions src/main/java/su/interference/persistent/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ this software and associated documentation files (the "Software"), to deal in
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -102,6 +103,8 @@ public class Table implements DataObject, ResultSet {
@Column
private AtomicLong incValue;

@Transient
private Map<Integer, Long> ixstartfs = new HashMap<>();
@Transient
private AtomicLong idValue2;
@Transient
Expand Down Expand Up @@ -1212,6 +1215,7 @@ protected void delete (final Object o, final Session s, LLT extllt) throws Excep
if (extllt == null) { llt.commit(); }
}

//todo deprecated started param
public synchronized FrameData createNewFrame(final FrameData frame, final int fileId, final int frameType, final long allocId, final boolean started, final boolean setlbs, final boolean external, final Session s, final LLT llt) throws Exception {
final DataFile df = Storage.getStorage().getDataFileById(fileId);
final FrameData bd = df.createNewFrame(frame, frameType, allocId, started, external, this, s, llt);
Expand Down Expand Up @@ -1354,7 +1358,7 @@ public Boolean call() throws Exception {
List<FrameData> bb = Instance.getInstance().getTableById(getObjectId()).getFrames();

for (FrameData b : bb) {
if (b.getStarted()==1) {
if (b.getStarted() > 0) {
startframes.add(b.getFrameId());
}
}
Expand All @@ -1368,7 +1372,7 @@ public Boolean call() throws Exception {
throw new InternalException();
}
//frame must be local or remote chain started (RCS)
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() != 1) {
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() == 0) {
throw new InternalException();
}
IndexFrame el = bd.getIndexFrame();
Expand Down Expand Up @@ -1705,14 +1709,24 @@ public synchronized void remove (ValueSet key, Object o, Session s, LLT llt) thr
removeObjects(key, o, s, llt);
}

public synchronized void storeFrames(List<SyncFrame> frames, int sourceNodeId, LLT llt, Session s) throws Exception {
for (SyncFrame b : frames) {
if (b.isStarted()) {
ixstartfs.put(sourceNodeId, b.getBd().getFrameId());
b.getBd().setStarted(sourceNodeId);
}
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), b.getBd().getFrame().getFrame(), llt, s);
}
}

@Deprecated
public synchronized List<Chunk> getContent(Session s) throws IOException, InternalException, NoSuchMethodException, InvocationTargetException, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException {
ArrayList<Chunk> res = new ArrayList<Chunk>();
res.addAll(getLocalContent(this.fileStart+this.frameStart, s));
//todo need performance optimizing
List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
for (FrameData b : bb) {
if (b.getStarted()==1) {
if (b.getStarted() > 0) {
res.addAll(getLocalContent(b.getFrameId(), s));
}
}
Expand All @@ -1729,7 +1743,7 @@ private synchronized List<Chunk> getLocalContent(long start, Session s) throws I
return res;
}
//frame must be local or remote chain started (RCS)
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() != 1) {
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() == 0) {
return res;
}
IndexFrame el = bd.getIndexFrame();
Expand Down Expand Up @@ -1770,7 +1784,7 @@ private synchronized ArrayList<FrameData> getLeafFrames (Session s) throws IOExc
//todo need performance optimizing
List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
for (FrameData b : bb) {
if (b.getStarted()==1) {
if (b.getStarted() > 0) {
res.addAll(getLocalLeafFrames(b.getFrameId(), s));
}
}
Expand All @@ -1783,7 +1797,7 @@ private synchronized ArrayList<FrameData> getLocalLeafFrames (long start, Sessio
boolean cnue = true;
FrameData bd = Instance.getInstance().getFrameById(start);
//frame must be local or remote chain started (RCS)
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() != 1) {
if (bd.getFrameId() != bd.getAllocId() && bd.getStarted() == 0) {
return res;
}
IndexFrame el = bd.getIndexFrame();
Expand Down Expand Up @@ -1866,7 +1880,7 @@ public synchronized DataChunk getObjectByKey (ValueSet key) throws IOException,
//todo need performance optimizing
final List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
for (FrameData b : bb) {
if (b.getStarted() == 1) {
if (b.getStarted() > 0) {
final DataChunk dc_ = getLocalObjectByKey(b.getFrameId(), key);
if (dc_ != null) {
return dc_;
Expand All @@ -1883,7 +1897,7 @@ public synchronized List<DataChunk> getObjectsByKey (ValueSet key) throws IOExce
//todo need performance optimizing
final List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
for (FrameData b : bb) {
if (b.getStarted() == 1) {
if (b.getStarted() > 0) {
r.addAll(getLocalObjectsByKey(b.getFrameId(), key));
}
}
Expand Down
34 changes: 10 additions & 24 deletions src/main/java/su/interference/transport/SyncFrameEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
Session s = Session.getDntmSession();
HashMap<Long, Long> hmap = new HashMap<Long, Long>();
HashMap<Long, Long> hmap2 = new HashMap<Long, Long>();
//ArrayList<TransFrame> tframes = new ArrayList<>();
LLT llt = LLT.getLLT();
for (SyncFrame b : sb) {
if (b.isAllowR()) {
Expand All @@ -84,19 +83,14 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
for (DataFile f : dfs) {
final int order = (f.getFileId() % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
if (order == allocOrder) {
//final LLT llt = LLT.getLLT();
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), b.isStarted(), false, true, s, llt);
//llt.commit();
//bd.setAllocId(b.getAllocId());
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt);
bd.setFrame(null);
b.setDf(f);
}
}
logger.info("create replicated frame with allocId "+b.getAllocId()+" ptr "+bd.getPtr());
} else {
if (b.getObjectId() == bd.getObjectId()) {
bd.setStarted(b.isStarted()?1:0);
//s.persist(bd, llt);
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
logger.info("rframe bd found with allocId=" + b.getAllocId());
} else {
Expand All @@ -106,9 +100,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
s.delete(bd);
} else {
bd.setStarted(b.isStarted()?1:0);
bd.setObjectId(b.getObjectId());
//s.persist(bd, llt);
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
}
}
Expand All @@ -117,7 +109,6 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
b.setBd(bd);
hmap.put(b.getAllocId(), bd.getFrameId());
hmap2.put(b.getFrameId(), bd.getFrameId());
//tframes.addAll(b.getTframes());
}
}
//updateTransFrames(tframes, hmap2, s);
Expand Down Expand Up @@ -187,8 +178,8 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
//b.getBd().setFrame(null);
}

List<SyncFrame> nframes = new ArrayList<>();
List<SyncFrame> lframes = new ArrayList<>();
final Map<Integer, List<SyncFrame>> storemap = new HashMap<>();

for (SyncFrame b : sb) {
try {
if (b.isAllowR()) {
Expand All @@ -209,29 +200,24 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
frame.setRes05(lcF);
frame.setRes06(parentB);
frame.setRes07(lcB);
//b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
b.getBd().setFrame(frame);
if (b.getFrameType() == 1) {
lframes.add(b);
}
if (b.getFrameType() == 2) {
nframes.add(b);
}
logger.info("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
}
}
if (storemap.get(t.getObjectId()) == null) {
storemap.put(t.getObjectId(), new ArrayList<>());
}
storemap.get(t.getObjectId()).add(b);
}
} catch (Exception e) {
e.printStackTrace();
}
//b.getBd().setFrame(null);
}

for (SyncFrame b : lframes) {
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), b.getBd().getFrame().getFrame(), llt, s);
}
for (SyncFrame b : nframes) {
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), b.getBd().getFrame().getFrame(), llt, s);
for (Map.Entry<Integer, List<SyncFrame>> entry : storemap.entrySet()) {
final Table t = Instance.getInstance().getTableById(entry.getKey());
t.storeFrames(entry.getValue(), this.getCallbackNodeId(), llt, s);
}

llt.commit();
Expand Down

0 comments on commit 750f20e

Please sign in to comment.