diff --git a/source/AdvertiseHashMessage.lua b/source/AdvertiseHashMessage.lua index 3669ba7..d3de11f 100644 --- a/source/AdvertiseHashMessage.lua +++ b/source/AdvertiseHashMessage.lua @@ -8,9 +8,13 @@ local Message = LibStub("EventSourcing/Message") local AdvertiseHashMessage = Message:extend('AHM') -function AdvertiseHashMessage:new() +function AdvertiseHashMessage:new(firstWeek, entryCount, stateHash, lag) local o = Message.new(self) o.hashes = {} + o.firstWeek = firstWeek + o.totalEntryCount = entryCount + o.stateHash = stateHash + o.lag = lag return o end @@ -21,8 +25,8 @@ function AdvertiseHashMessage:hashCount() return #self.hashes end -function Factory.create() - return AdvertiseHashMessage:new() +function Factory.create(firstWeek, entryCount, stateHash, lag) + return AdvertiseHashMessage:new(firstWeek, entryCount, stateHash, lag) end function Factory.type() diff --git a/source/ListSync.lua b/source/ListSync.lua index d6b2e14..4e94cc8 100644 --- a/source/ListSync.lua +++ b/source/ListSync.lua @@ -16,6 +16,7 @@ local BulkDataMessage = LibStub("EventSourcing/Message/BulkData") local Message = LibStub("EventSourcing/Message") +local ADVERTISEMENT_TIMEOUT = 30 @@ -103,23 +104,36 @@ local function requestWeekInhibitorCheck(listSync, week) end local function handleAdvertiseMessage(message, sender, distribution, stateManager, listSync) - for _, weekHashCount in ipairs(message.hashes) do + -- This is the number of entries we expect to have after all data from advertisements in this message have been synced + local projectedEntries = stateManager:getSortedList():length() + -- First we check every week's hash + for _, whc in ipairs(message.hashes) do + local week, hash, count = unpack(whc) -- If sender has priority over us we remove our advertisement, this will prevent us from sending data. if sender < listSync.playerName then - listSync.advertisedWeeks[weekHashCount[1]] = nil + listSync.advertisedWeeks[week] = nil end - local hash, count = weekHash(listSync, weekHashCount[1]) - advertiseWeekHashInhibitorSet(listSync, weekHashCount[1]) - if hash == weekHashCount[2] and count == weekHashCount[3] then - listSync.logger:Info("Received week %s hash from %s, we are in sync", weekHashCount[1], sender) - elseif requestWeekInhibitorCheck(listSync, weekHashCount[1]) then - listSync.logger:Info("Requesting data for week %s", weekHashCount[1]) - requestWeekInhibitorSet(listSync, weekHashCount[1]) - send(listSync, RequestWeekMessage.create(weekHashCount[1]), "GUILD") + local localHash, localCount = weekHash(listSync, week) + advertiseWeekHashInhibitorSet(listSync, week) + if localHash == hash and localCount == count then + listSync.logger:Info("Received week %s hash from %s, we are in sync", week, sender) + else + projectedEntries = projectedEntries + count + if requestWeekInhibitorCheck(listSync, week) then + listSync.logger:Info("Requesting data for week %s", week) + requestWeekInhibitorSet(listSync, week) + send(listSync, RequestWeekMessage.create(week), "GUILD") + end end end + -- Then we check data set properties to decide if we might be far behind. + if projectedEntries < message.totalEntryCount then + -- We have fewer entries than the sender + end + + end local function handleWeekDataMessage(message, sender, distribution, stateManager, listSync) @@ -171,20 +185,20 @@ local function handleRequestWeekMessage(message, sender, distribution, stateMana end -local function handleMessage(self, message, distribution, sender) - if sender == self.playerName then +local function handleMessage(listSync, message, distribution, sender) + if sender == listSync.playerName then return end if not Message.cast(message) then - self.logger:Warning("Ignoring invalid message from %s", sender) + listSync.logger:Warning("Ignoring invalid message from %s", sender) return end -- We use pairs() because we don't care about order. -- This allows us to insert handlers with a key (and to easily remove them later) - for _, handler in pairs(self.messageHandlers[message.type] or {}) do - handler(message, sender, distribution, self._stateManager, self) + for _, handler in pairs(listSync.messageHandlers[message.type] or {}) do + handler(message, sender, distribution, listSync._stateManager, listSync) end end @@ -217,6 +231,7 @@ function ListSync:new(stateManager, sendMessage, registerReceiveHandler, authori o.advertiseTicker = nil o.advertiseCount = 4 + o.advertiseRollingOffset = 0 o.send = sendMessage o.sendLargeMessage = sendLargeMessage or sendMessage o.authorizationHandler = authorizationHandler @@ -317,16 +332,39 @@ function ListSync:enableSending() -- Get week hash for the last weeks. local now = Util.time() + local firstWeek = self._stateManager:getSortedList():head():weekNumber() local currentWeek = Util.WeekNumber(now) - self.logger:Info("Announcing hashes of last %d weeks + 1 random week", self.advertiseCount) - local message = AdvertiseHashMessage.create() + self.logger:Info("Announcing hashes of last %d weeks + %d rolling weeks starting at %d", self.advertiseCount, self.advertiseCount, self.advertiseRollingOffset) + local message = AdvertiseHashMessage.create( + firstWeek, + self._stateManager:getSortedList():length(), + self._stateManager:stateHash(), + self._stateManager:lag() + ) + -- recent weeks for i = 0, self.advertiseCount - 1 do if (advertiseWeekHashInhibitorCheckOrSet(self, currentWeek - i)) then local hash, count = weekHash(self, currentWeek - i) message:addHash(currentWeek - i, hash, count) - self.advertisedWeeks[currentWeek - i] = now + 30 + self.advertisedWeeks[currentWeek - i] = now + ADVERTISEMENT_TIMEOUT end end + + -- historical rolling weeks + self.advertiseRollingOffset = self.advertiseRollingOffset + self.advertiseCount + if self.advertiseRollingOffset > currentWeek then + self.advertiseRollingOffset = 0 + end + + for i = 0, self.advertiseCount - 1 do + local checkWeek = currentWeek - self.advertiseRollingOffset - i + if (advertiseWeekHashInhibitorCheckOrSet(self, checkWeek)) then + local hash, count = weekHash(self, checkWeek) + message:addHash(checkWeek, hash, count) + self.advertisedWeeks[checkWeek] = now + ADVERTISEMENT_TIMEOUT + end + end + if (message:hashCount() > 0) then self.logger:Trace("Sending hashes for %d weeks", message:hashCount()) send(self, message, "GUILD") diff --git a/source/SortedList.lua b/source/SortedList.lua index 4ef341f..dcb8504 100644 --- a/source/SortedList.lua +++ b/source/SortedList.lua @@ -83,6 +83,12 @@ function SortedList:uniqueInsert(element) return true end +function SortedList:head() + if #self._entries < 1 then + error("List is empty") + end + return self._entries[1] +end function SortedList:wipe() for i, _ in ipairs(self._entries) do self._entries[i] = nil