Skip to content

Commit

Permalink
feat: add historical data sync
Browse files Browse the repository at this point in the history
  • Loading branch information
SamMousa committed May 3, 2021
1 parent 7eeb33f commit 7c17f55
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 21 deletions.
10 changes: 7 additions & 3 deletions source/AdvertiseHashMessage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ local Message = LibStub("EventSourcing/Message")

local AdvertiseHashMessage = Message:extend('AHM')

function AdvertiseHashMessage:new()
function AdvertiseHashMessage:new(firstWeek, entryCount, stateHash, lag)
local o = Message.new(self)
o.hashes = {}
o.firstWeek = firstWeek
o.totalEntryCount = entryCount
o.stateHash = stateHash
o.lag = lag
return o
end

Expand All @@ -21,8 +25,8 @@ function AdvertiseHashMessage:hashCount()
return #self.hashes
end

function Factory.create()
return AdvertiseHashMessage:new()
function Factory.create(firstWeek, entryCount, stateHash, lag)
return AdvertiseHashMessage:new(firstWeek, entryCount, stateHash, lag)
end

function Factory.type()
Expand Down
74 changes: 56 additions & 18 deletions source/ListSync.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ local BulkDataMessage = LibStub("EventSourcing/Message/BulkData")
local Message = LibStub("EventSourcing/Message")


local ADVERTISEMENT_TIMEOUT = 30



Expand Down Expand Up @@ -103,23 +104,36 @@ local function requestWeekInhibitorCheck(listSync, week)
end

local function handleAdvertiseMessage(message, sender, distribution, stateManager, listSync)
for _, weekHashCount in ipairs(message.hashes) do
-- This is the number of entries we expect to have after all data from advertisements in this message have been synced
local projectedEntries = stateManager:getSortedList():length()
-- First we check every week's hash
for _, whc in ipairs(message.hashes) do
local week, hash, count = unpack(whc)

-- If sender has priority over us we remove our advertisement, this will prevent us from sending data.
if sender < listSync.playerName then
listSync.advertisedWeeks[weekHashCount[1]] = nil
listSync.advertisedWeeks[week] = nil
end

local hash, count = weekHash(listSync, weekHashCount[1])
advertiseWeekHashInhibitorSet(listSync, weekHashCount[1])
if hash == weekHashCount[2] and count == weekHashCount[3] then
listSync.logger:Info("Received week %s hash from %s, we are in sync", weekHashCount[1], sender)
elseif requestWeekInhibitorCheck(listSync, weekHashCount[1]) then
listSync.logger:Info("Requesting data for week %s", weekHashCount[1])
requestWeekInhibitorSet(listSync, weekHashCount[1])
send(listSync, RequestWeekMessage.create(weekHashCount[1]), "GUILD")
local localHash, localCount = weekHash(listSync, week)
advertiseWeekHashInhibitorSet(listSync, week)
if localHash == hash and localCount == count then
listSync.logger:Info("Received week %s hash from %s, we are in sync", week, sender)
else
projectedEntries = projectedEntries + count
if requestWeekInhibitorCheck(listSync, week) then
listSync.logger:Info("Requesting data for week %s", week)
requestWeekInhibitorSet(listSync, week)
send(listSync, RequestWeekMessage.create(week), "GUILD")
end
end
end
-- Then we check data set properties to decide if we might be far behind.
if projectedEntries < message.totalEntryCount then
-- We have fewer entries than the sender
end


end

local function handleWeekDataMessage(message, sender, distribution, stateManager, listSync)
Expand Down Expand Up @@ -171,20 +185,20 @@ local function handleRequestWeekMessage(message, sender, distribution, stateMana
end


local function handleMessage(self, message, distribution, sender)
if sender == self.playerName then
local function handleMessage(listSync, message, distribution, sender)
if sender == listSync.playerName then
return
end

if not Message.cast(message) then
self.logger:Warning("Ignoring invalid message from %s", sender)
listSync.logger:Warning("Ignoring invalid message from %s", sender)
return
end

-- We use pairs() because we don't care about order.
-- This allows us to insert handlers with a key (and to easily remove them later)
for _, handler in pairs(self.messageHandlers[message.type] or {}) do
handler(message, sender, distribution, self._stateManager, self)
for _, handler in pairs(listSync.messageHandlers[message.type] or {}) do
handler(message, sender, distribution, listSync._stateManager, listSync)
end
end

Expand Down Expand Up @@ -217,6 +231,7 @@ function ListSync:new(stateManager, sendMessage, registerReceiveHandler, authori

o.advertiseTicker = nil
o.advertiseCount = 4
o.advertiseRollingOffset = 0
o.send = sendMessage
o.sendLargeMessage = sendLargeMessage or sendMessage
o.authorizationHandler = authorizationHandler
Expand Down Expand Up @@ -317,16 +332,39 @@ function ListSync:enableSending()

-- Get week hash for the last weeks.
local now = Util.time()
local firstWeek = self._stateManager:getSortedList():head():weekNumber()
local currentWeek = Util.WeekNumber(now)
self.logger:Info("Announcing hashes of last %d weeks + 1 random week", self.advertiseCount)
local message = AdvertiseHashMessage.create()
self.logger:Info("Announcing hashes of last %d weeks + %d rolling weeks starting at %d", self.advertiseCount, self.advertiseCount, self.advertiseRollingOffset)
local message = AdvertiseHashMessage.create(
firstWeek,
self._stateManager:getSortedList():length(),
self._stateManager:stateHash(),
self._stateManager:lag()
)
-- recent weeks
for i = 0, self.advertiseCount - 1 do
if (advertiseWeekHashInhibitorCheckOrSet(self, currentWeek - i)) then
local hash, count = weekHash(self, currentWeek - i)
message:addHash(currentWeek - i, hash, count)
self.advertisedWeeks[currentWeek - i] = now + 30
self.advertisedWeeks[currentWeek - i] = now + ADVERTISEMENT_TIMEOUT
end
end

-- historical rolling weeks
self.advertiseRollingOffset = self.advertiseRollingOffset + self.advertiseCount
if self.advertiseRollingOffset > currentWeek then
self.advertiseRollingOffset = 0
end

for i = 0, self.advertiseCount - 1 do
local checkWeek = currentWeek - self.advertiseRollingOffset - i
if (advertiseWeekHashInhibitorCheckOrSet(self, checkWeek)) then
local hash, count = weekHash(self, checkWeek)
message:addHash(checkWeek, hash, count)
self.advertisedWeeks[checkWeek] = now + ADVERTISEMENT_TIMEOUT
end
end

if (message:hashCount() > 0) then
self.logger:Trace("Sending hashes for %d weeks", message:hashCount())
send(self, message, "GUILD")
Expand Down
6 changes: 6 additions & 0 deletions source/SortedList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ function SortedList:uniqueInsert(element)
return true
end

function SortedList:head()
if #self._entries < 1 then
error("List is empty")
end
return self._entries[1]
end
function SortedList:wipe()
for i, _ in ipairs(self._entries) do
self._entries[i] = nil
Expand Down

0 comments on commit 7c17f55

Please sign in to comment.