Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventLoop: store Timers in min Pairing Heap [fixup #14996] #15206

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions spec/std/crystal/evented/timers_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ describe Crystal::Evented::Timers do
event0.wake_at = -1.minute
timers.add(pointerof(event0)).should be_true # added new head (next ready)

events = [] of Crystal::Evented::Event*
timers.each { |event| events << event }
events.should eq([
pointerof(event0),
pointerof(event1),
pointerof(event3),
pointerof(event2),
])
# TODO: forcefully dequeue the next ready event, then check the order

# events = [] of Crystal::Evented::Event*
# timers.each { |event| events << event }
# events.should eq([
# pointerof(event0),
# pointerof(event1),
# pointerof(event3),
# pointerof(event2),
# ])
timers.empty?.should be_false
end

Expand Down
150 changes: 150 additions & 0 deletions spec/std/crystal/pointer_pairing_heap_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "spec"
require "../../../src/crystal/pointer_pairing_heap"

private struct Node
getter key : Int32

include Crystal::PointerPairingHeap::Node

def initialize(@key : Int32)
end

def heap_compare(other : Pointer(self)) : Bool
key < other.value.key
end

def inspect(io : IO, indent = 0) : Nil
prv = @heap_previous
nxt = @heap_next
chd = @heap_child

indent.times { io << ' ' }
io << "Node value=" << key
io << " prv=" << prv.try(&.value.key)
io << " nxt=" << nxt.try(&.value.key)
io << " chd=" << chd.try(&.value.key)
io.puts

node = heap_child?
while node
node.value.inspect(io, indent + 2)
node = node.value.heap_next?
end
end
end

describe Crystal::PointerPairingHeap do
it "#add" do
heap = Crystal::PointerPairingHeap(Node).new
node1 = Node.new(1)
node2 = Node.new(2)
node2b = Node.new(2)
node3 = Node.new(3)

# can add distinct nodes
heap.add(pointerof(node3))
heap.add(pointerof(node1))
heap.add(pointerof(node2))

# can add duplicate key (different nodes)
heap.add(pointerof(node2b))

# can't add same node twice
expect_raises(ArgumentError) { heap.add(pointerof(node1)) }

# can re-add removed nodes
heap.delete(pointerof(node3))
heap.add(pointerof(node3))

heap.shift?.should eq(pointerof(node1))
heap.add(pointerof(node1))
end

it "#shift?" do
heap = Crystal::PointerPairingHeap(Node).new
nodes = StaticArray(Node, 10).new { |i| Node.new(i) }

# insert in random order
(0..9).to_a.shuffle.each do |i|
heap.add nodes.to_unsafe + i
end

# removes in ascending order
10.times do |i|
node = heap.shift?
node.should eq(nodes.to_unsafe + i)
end
end

it "#delete" do
heap = Crystal::PointerPairingHeap(Node).new
nodes = StaticArray(Node, 10).new { |i| Node.new(i) }

# insert in random order
(0..9).to_a.shuffle.each do |i|
heap.add nodes.to_unsafe + i
end

# remove some values
heap.delete(nodes.to_unsafe + 3)
heap.delete(nodes.to_unsafe + 7)
heap.delete(nodes.to_unsafe + 1)

# remove tail
heap.delete(nodes.to_unsafe + 9)

# remove head
heap.delete(nodes.to_unsafe + 0)

# repeatedly delete min
[2, 4, 5, 6, 8].each do |i|
heap.shift?.should eq(nodes.to_unsafe + i)
end
heap.shift?.should be_nil
end

it "adds 1000 nodes then shifts them in order" do
heap = Crystal::PointerPairingHeap(Node).new

nodes = StaticArray(Node, 1000).new { |i| Node.new(i) }
(0..999).to_a.shuffle.each { |i| heap.add(nodes.to_unsafe + i) }

i = 0
while node = heap.shift?
node.value.key.should eq(i)
i += 1
end
i.should eq(1000)

heap.shift?.should be_nil
end

it "randomly shift while we add nodes" do
heap = Crystal::PointerPairingHeap(Node).new

nodes = uninitialized StaticArray(Node, 1000)
(0..999).to_a.shuffle.each_with_index { |i, j| nodes[j] = Node.new(i) }

i = 0
removed = 0

# regularly calls delete-min while we insert
loop do
if rand(0..5) == 0
removed += 1 if heap.shift?
else
heap.add(nodes.to_unsafe + i)
break if (i += 1) == 1000
end
end

# exhaust the heap
while heap.shift?
removed += 1
end

# we must have added and removed all nodes _once_
i.should eq(1000)
removed.should eq(1000)
end
end
158 changes: 158 additions & 0 deletions src/crystal/pointer_pairing_heap.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# :nodoc:
#
# Tree of `T` structs referenced as pointers.
# `T` must include `Crystal::PointerPairingHeap::Node`.
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
class Crystal::PointerPairingHeap(T)
module Node
macro included
property? heap_previous : Pointer(self)?
property? heap_next : Pointer(self)?
property? heap_child : Pointer(self)?
end

# Compare self with other. For example:
#
# Use `<` to create a min heap.
# Use `>` to create a max heap.
abstract def heap_compare(other : Pointer(self)) : Bool
end

@head : Pointer(T)?

private def head=(head)
@head = head
head.value.heap_previous = nil if head
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
head
end

def empty?
@head.nil?
end

def first? : Pointer(T)?
@head
end

def shift? : Pointer(T)?
if node = @head
self.head = merge_pairs(node.value.heap_child?)
node.value.heap_child = nil
node
end
end

def add(node : Pointer(T)) : Nil
if node.value.heap_previous? || node.value.heap_next? || node.value.heap_child?
raise ArgumentError.new("The node is already in a Pairing Heap tree")
end
self.head = meld(@head, node)
end

def delete(node : Pointer(T)) : Nil
if previous_node = node.value.heap_previous?
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
next_sibling = node.value.heap_next?

if previous_node.value.heap_next? == node
previous_node.value.heap_next = next_sibling
else
previous_node.value.heap_child = next_sibling
end

if next_sibling
next_sibling.value.heap_previous = previous_node
end

subtree = merge_pairs(node.value.heap_child?)
clear(node)
self.head = meld(@head, subtree)
else
# removing head
self.head = merge_pairs(node.value.heap_child?)
node.value.heap_child = nil
end
end

def clear : Nil
if node = @head
clear_recursive(node)
@head = nil
end
end

private def clear_recursive(node)
child = node.value.heap_child?
while child
clear_recursive(child)
child = child.value.heap_next?
end
clear(node)
end

private def meld(a : Pointer(T), b : Pointer(T)) : Pointer(T)
if a.value.heap_compare(b)
add_child(a, b)
else
add_child(b, a)
end
end

private def meld(a : Pointer(T), b : Nil) : Pointer(T)
a
end

private def meld(a : Nil, b : Pointer(T)) : Pointer(T)
b
end

private def meld(a : Nil, b : Nil) : Nil
end

private def add_child(parent : Pointer(T), node : Pointer(T)) : Pointer(T)
first_child = parent.value.heap_child?
parent.value.heap_child = node

first_child.value.heap_previous = node if first_child
node.value.heap_previous = parent
node.value.heap_next = first_child

parent
end

private def merge_pairs(node : Pointer(T)?) : Pointer(T)?
return unless node

# 1st pass: meld children into pairs (left to right)
tail = nil

while a = node
if b = a.value.heap_next?
node = b.value.heap_next?
root = meld(a, b)
root.value.heap_previous = tail
tail = root
else
a.value.heap_previous = tail
tail = a
break
end
end

# 2nd pass: meld the pairs back into a single tree (right to left)
root = nil

while tail
node = tail.value.heap_previous?
root = meld(root, tail)
tail = node
end

root.value.heap_next = nil if root
root
end

private def clear(node) : Nil
node.value.heap_previous = nil
node.value.heap_next = nil
node.value.heap_child = nil
end
end
8 changes: 8 additions & 0 deletions src/crystal/system/unix/evented/event.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "crystal/pointer_linked_list"
require "crystal/pointer_pairing_heap"

# Information about the event that a `Fiber` is waiting on.
#
Expand Down Expand Up @@ -35,6 +36,9 @@ struct Crystal::Evented::Event
# The event can be added to `Waiters` lists.
include PointerLinkedList::Node

# The event can be added to the `Timers` list.
include PointerPairingHeap::Node

def initialize(@type : Type, @fiber, @index = nil, timeout : Time::Span? = nil)
if timeout
seconds, nanoseconds = System::Time.monotonic
Expand All @@ -55,4 +59,8 @@ struct Crystal::Evented::Event
# NOTE: musn't be changed after registering the event into `Timers`!
def wake_at=(@wake_at)
end

def heap_compare(other : Pointer(self)) : Bool
wake_at < other.value.wake_at
end
end
Loading
Loading