Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
SamMousa committed Mar 7, 2021
1 parent c227c0c commit b93993e
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 65 deletions.
5 changes: 4 additions & 1 deletion LedgerFactory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ LedgerFactory.createLedger = function(table, send, registerReceiveHandler, autho
local listSync = ListSync:new(stateManager, send, registerReceiveHandler, authorizationHandler)

stateManager:setUpdateInterval(500)
stateManager:setBatchSize(500)
stateManager:setBatchSize(5)

return {
getListSync = function()
Expand All @@ -40,6 +40,9 @@ LedgerFactory.createLedger = function(table, send, registerReceiveHandler, autho
submitEntry = function(entry)
return sortedList:uniqueInsert(entry)
end,
reset = function()
stateManager:reset()
end,
addStateChangedListener = function(callback)
-- We hide the state manager from this callback
--
Expand Down
172 changes: 145 additions & 27 deletions ListSync.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ local ListSync, _ = LibStub:NewLibrary("EventSourcing/ListSync", 1)
if not ListSync then
return end

local MESSAGE = {
WEEKHASH = 'weekhash',
REQUESTWEEK = "requestweek"
}
local StateManager = LibStub("EventSourcing/StateManager")
local LogEntry = LibStub("EventSourcing/LogEntry")
local Util = LibStub("EventSourcing/Util")
Expand All @@ -14,9 +18,6 @@ function ListSync:new(stateManager, sendAddonMessage, registerReceiveHandler, au
if getmetatable(stateManager) ~= StateManager then
error("stateManager must be an instance of StateManager")
end



o = {}
setmetatable(o, self)
self.__index = self
Expand All @@ -26,10 +27,27 @@ function ListSync:new(stateManager, sendAddonMessage, registerReceiveHandler, au
self.authorizationHandler = authorizationHandler

self._stateManager = stateManager
self._weekHashCache = {
-- numeric counter for checking if the list has changed
state = stateManager:getSortedList():state(),
entries = {}
}

local playerName = UnitName("player")
registerReceiveHandler(function(message, distribution, sender)
if type(message) ~= 'table' then
print(string.format("Received message that is not a table from %s", sender))
print(message)

registerReceiveHandler(function(_, message, distribution, sender)
return
end

if sender == playerName then
print("Ignoring message from self [[DEBUG: NOT IGNORING]]")
-- return
end
-- our messages have a type, this way we can use 1 prefix for all communication
if message.type == "fullSync" then
if message.type == "bulkSync" then
-- handle full sync
local count = 0
for i, v in ipairs(message.data) do
Expand All @@ -51,7 +69,42 @@ function ListSync:new(stateManager, sendAddonMessage, registerReceiveHandler, au
else
print(string.format("Dropping event from sender %s", sender))
end
elseif message.type == MESSAGE.WEEKHASH then
-- We received an announce
local hash, count = self:weekHash(message.week)
if hash == message.hash and count == message.count then
print(string.format("Received week hash from %s, we are in sync", sender))
else
print(string.format("Received week hash from %s, we are NOT in sync", sender))
print("Requesting this week")
self:send({
type = MESSAGE.REQUESTWEEK,
week = message.week,
hash = message.hash
})
end
elseif self:isSendingEnabled() and message.type == MESSAGE.REQUESTWEEK then
-- If we don't have the same week hash we ignore the request
local hash, count = self:weekHash(message.week)
if hash ~= message.hash or count ~= message.count then
print(string.format("Ignoring week request for week %d with hash %d from %s, we are not in sync",
message.week, message.hash))
return
end

if distribution == "WHISPER" then
self:weekSyncViaWhisper(sender, message.week)
elseif distribution == "GUILD" then
-- We need to prevent hammering the guild comms with updates.
-- Every agent has an upper limit on sending a week twice (ie send at most once every minute)
--
-- temp fix: always respond via whisper.
self:weekSyncViaWhisper(sender, message.week)
end
elseif message.type ~= nil then
print(string.format("Received unknown message type %s from %s", message.type, sender))
else
print("Received message in unknown table format")
end

end)
Expand All @@ -64,10 +117,10 @@ end
]]--
function ListSync:transmitViaGuild(entry)
if self.authorizationHandler(entry, UnitName("player")) then
self.sendAddonMessage({
self:send({
type = "singleEntry",
data = entry:toList()
}, "GUILD", nil, "BULK")
}, "GUILD")
end
end

Expand All @@ -90,6 +143,46 @@ function ListSync:fullSyncViaWhisper(target)
type = "fullSync",
data = data
}
self:send(message, "WHISPER", target)

end


function ListSync:send(message, distribution, target)
self.sendAddonMessage(message, distribution, target, "BULK", function(_, sent, total)
print(string.format("Sent %d of %d", sent, total))
end)
end

function ListSync:weekSyncViaWhisper(target, week)
local data = {}

for entry in self:weekEntryIterator(week) do
local list = v:toList()
table.insert(list, v:class())
table.insert(data, list)
end
local message = {
type = "bulkSync",
data = data
}
self.sendAddonMessage(message, "WHISPER", target, "BULK", function(_, sent, total)
print(string.format("Sent %d of %d", sent, total))
end)
end

function ListSync:fullSyncViaWhisper(target)
local data = {}
for _, v in ipairs(self._stateManager:getSortedList():entries()) do
self._stateManager:castLogEntry(v)
local list = v:toList()
table.insert(list, v:class())
table.insert(data, list)
end
local message = {
type = "bulkSync",
data = data
}
self.sendAddonMessage(message, "WHISPER", target, "BULK", function(_, sent, total)
print(string.format("Sent %d of %d", sent, total))
end)
Expand All @@ -105,11 +198,17 @@ function ListSync:enableSending()
-- Get week hash for the last 4 weeks.
local currentWeek = Util.WeekNumber(Util.time())
local hashes = {}
print("Announcing hashes of last 4 weeks")
for i = 0, 3 do
hashes[currentWeek - i] = self:weekHash(currentWeek - i)
local hash, count = self:weekHash(currentWeek - i)
local message = {
type = MESSAGE.WEEKHASH,
hash = hash,
count = count,
week = currentWeek - i
}
self:send(message, "GUILD")
end

Util.DumpTable(hashes)
end)
end

Expand All @@ -120,34 +219,53 @@ function ListSync:disableSending()
end
end

function ListSync:weekHash(week)
function ListSync:weekEntryIterator(week)
local sortedList = self._stateManager:getSortedList()

local position = sortedList:searchGreaterThanOrEqual({t = Util.WeekStart(week) })
if (position == nil) then
return 0
local stateManager = self._stateManager
local entries = sortedList:entries()

return function()
while position ~= nil and position <= #entries do
local entry = entries[position]
stateManager:castLogEntry(entry)
position = position + 1
if entry:weekNumber() == week then
return entry
else
return nil
end
end
end
end

--[[
Get the hash and number of events in a week.
Result is cached using the sortedList state.
]]--
function ListSync:weekHash(week)
local adler32 = Util.IntegerChecksumCoroutine()
local stateManager = self._stateManager
local entries = sortedList:entries()

local result, hash
while position <= #entries do
local entry = entries[position]
if entry == nil then
print("Entry is nil", position, #entries)
error("exit listsync:139")
end
stateManager:castLogEntry(entry)
if entry:weekNumber() == week then
local count = 0

local state = self._stateManager:getSortedList():state()
if (self._weekHashCache.state ~= state) then
self._weekHashCache = {
state = state,
entries = {}
}
end
if self._weekHashCache.entries[week] == nil then
for entry in self:weekEntryIterator(week) do
result, hash = coroutine.resume(adler32, LogEntry.time(entry))
count = count + 1
if not result then
error(hash)
end
else
break
end
position = position + 1
self._weekHashCache.entries[week] = {hash or 0, count }
end
return hash or 0
return self._weekHashCache.entries[week][1], self._weekHashCache.entries[week][2]
end
13 changes: 12 additions & 1 deletion SortedList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ function SortedList:new(data, compare, unique)
end
o._compare = compare
o._unique = unique or false
o._state = 1
return o
end

Expand All @@ -38,10 +39,14 @@ function SortedList:length()
return #self._entries
end

function SortedList:state()
return self._state
end
function SortedList:insert(element)
if (self._unique) then
error("This list only supports uniqueInsert")
end
self._state = self._state + 1
-- since we expect elements to be mostly appended, we do a shortcut check.
if (#self._entries == 0 or self._compare(self._entries[#self._entries], element) == -1) then
table.insert(self._entries, element)
Expand All @@ -63,6 +68,7 @@ end
@returns bool indicating whether a new element was inserted
]]--
function SortedList:uniqueInsert(element)
self._state = self._state + 1
if (#self._entries == 0 or self._compare(self._entries[#self._entries], element) == -1) then
table.insert(self._entries, element)
return true
Expand All @@ -79,7 +85,12 @@ function SortedList:uniqueInsert(element)
return true
end


function SortedList:wipe()
for i, _ in ipairs(self._entries) do
self._entries[i] = nil
end
self._state = self._state + 1
end
-- We don't return a value since we are change the table, this makes it clear for consuming code
--function SortedList:cast(table, compare)
-- if (table._entries == nil) then
Expand Down
11 changes: 11 additions & 0 deletions StateManager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ end
function StateManager:updateState()
local entries = self.list:entries()
local applied = 0
if self.lastAppliedIndex > #entries then
self.lastAppliedIndex = 0
end
while applied < self.batchSize and self.lastAppliedIndex < #entries do
local entry = entries[self.lastAppliedIndex + 1]
self:castLogEntry(entry)
Expand All @@ -181,6 +184,14 @@ function StateManager:lag()
return #self.list:entries() - self.lastAppliedIndex, #self.uncommittedEntries
end

function StateManager:logSize()
return #self.list:entries()
end

function StateManager:reset()
self.list:wipe()
self.lastAppliedIndex = 0
end

function StateManager:trigger(event)
for _, callback in ipairs(self.listeners[event] or {}) do
Expand Down
Loading

0 comments on commit b93993e

Please sign in to comment.