Skip to content

Commit

Permalink
20367: Fixes issue where loading an entity could fail randomly with a…
Browse files Browse the repository at this point in the history
… deadlock or exception, also code cleanup for garbage collection (#136)
  • Loading branch information
howsohazard authored May 23, 2024
1 parent befe510 commit 51ac4e9
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 105 deletions.
11 changes: 6 additions & 5 deletions src/Amalgam/AmalgamMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,8 @@ PLATFORM_MAIN_CONSOLE
}
else if(run_tracefile)
{
std::istream *trace_stream = new std::ifstream(tracefile);
int ret = RunAmalgamTrace(trace_stream, &std::cout, random_seed);
delete trace_stream;
std::ifstream trace_stream(tracefile);
int ret = RunAmalgamTrace(&trace_stream, &std::cout, random_seed);

if(profile_opcodes || profile_labels)
PerformanceProfiler::PrintProfilingInformation(profile_out_file, profile_count);
Expand All @@ -237,7 +236,9 @@ PLATFORM_MAIN_CONSOLE
//run the standard amlg command line interface
EntityExternalInterface::LoadEntityStatus status;
std::string file_type = "";
Entity *entity = asset_manager.LoadEntityFromResourcePath(amlg_file_to_run, file_type, false, true, false, true, random_seed, status);
Entity *entity = asset_manager.LoadEntityFromResourcePath(amlg_file_to_run, file_type,
false, true, false, true, random_seed, nullptr, status);

if(!status.loaded)
return 1;

Expand Down Expand Up @@ -279,7 +280,7 @@ PLATFORM_MAIN_CONSOLE

//execute the entity
entity->Execute(0, num_steps_executed, 0, num_nodes_allocated, StringInternPool::NOT_A_STRING_ID, call_stack,
false, &write_listeners, print_listener);
false, nullptr, &write_listeners, print_listener);

//clean up the nodes created here
entity->evaluableNodeManager.FreeNodeTree(call_stack);
Expand Down
7 changes: 4 additions & 3 deletions src/Amalgam/AssetManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ bool AssetManager::StoreResourcePathFromProcessedResourcePaths(EvaluableNode *co

Entity *AssetManager::LoadEntityFromResourcePath(std::string &resource_path, std::string &file_type,
bool persistent, bool load_contained_entities, bool escape_filename, bool escape_contained_filenames,
std::string default_random_seed, EntityExternalInterface::LoadEntityStatus &status)
std::string default_random_seed, Interpreter *calling_interpreter, EntityExternalInterface::LoadEntityStatus &status)
{
std::string resource_base_path;
Entity *new_entity = new Entity();
Expand All @@ -176,7 +176,8 @@ Entity *AssetManager::LoadEntityFromResourcePath(std::string &resource_path, std

ExecutionCycleCount max_num_steps = 0, num_steps_executed = 0;
size_t max_num_nodes = 0, num_nodes_allocated = 0;
new_entity->Execute(max_num_steps, num_steps_executed, max_num_nodes, num_nodes_allocated, StringInternPool::NOT_A_STRING_ID, call_stack);
new_entity->Execute(max_num_steps, num_steps_executed, max_num_nodes, num_nodes_allocated,
StringInternPool::NOT_A_STRING_ID, call_stack, false, calling_interpreter);
return new_entity;
}

Expand Down Expand Up @@ -244,7 +245,7 @@ Entity *AssetManager::LoadEntityFromResourcePath(std::string &resource_path, std
std::string default_seed = new_entity->CreateRandomStreamFromStringAndRand(entity_name);
std::string contained_resource_path = resource_base_path + ce_file_base + "." + ce_extension;
Entity *contained_entity = LoadEntityFromResourcePath(contained_resource_path, file_type,
false, true, false, escape_contained_filenames, default_seed, status);
false, true, false, escape_contained_filenames, default_seed, calling_interpreter, status);
if(!status.loaded)
{
delete new_entity;
Expand Down
5 changes: 3 additions & 2 deletions src/Amalgam/AssetManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ class AssetManager
//if file_type is not an empty string, it will use the specified file_type instead of the filename's extension
// if persistent is true, then it will keep the resource updated based on any calls to UpdateEntity
//if the resource does not have a metadata file, will use default_random_seed as its seed
Entity *LoadEntityFromResourcePath(std::string &resource_path, std::string &file_type, bool persistent, bool load_contained_entities,
bool escape_filename, bool escape_contained_filenames, std::string default_random_seed, EntityExternalInterface::LoadEntityStatus &status);
Entity *LoadEntityFromResourcePath(std::string &resource_path, std::string &file_type, bool persistent,
bool load_contained_entities, bool escape_filename, bool escape_contained_filenames,
std::string default_random_seed, Interpreter *calling_interpreter, EntityExternalInterface::LoadEntityStatus &status);

//Stores an entity, including contained entites, etc. from the resource path specified
//if file_type is not an empty string, it will use the specified file_type instead of the filename's extension
Expand Down
1 change: 0 additions & 1 deletion src/Amalgam/amlg_code/full_test.amlg
Original file line number Diff line number Diff line change
Expand Up @@ -2902,7 +2902,6 @@
(query_within_generalized_distance 60 (list "x" "y") (list 0.0 0.0) (null) (null) (null) (null) 0.5 1 (null) "random seed 1234" "radius")
)))

;TODO 17631: add unit tests for sparse deviation matrices
(print "--query_nearest_generalized_distance--\n")
(print (contained_entities "TestContainerExec" (list
(query_nearest_generalized_distance 2 (list "x" "y") (list 0.0 0.0) (null) (null) (null) (null) 0.5 1 (null) "random seed 1234" "radius")
Expand Down
21 changes: 10 additions & 11 deletions src/Amalgam/entity/Entity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,20 +422,19 @@ std::pair<bool, bool> Entity::SetValuesAtLabels(EvaluableNodeReference new_label

EvaluableNodeReference Entity::Execute(ExecutionCycleCount max_num_steps, ExecutionCycleCount &num_steps_executed,
size_t max_num_nodes, size_t &num_nodes_allocated,
StringInternPool::StringID label_sid, EvaluableNode *call_stack, bool on_self,
std::vector<EntityWriteListener *> *write_listeners, PrintListener *print_listener,
StringInternPool::StringID label_sid, EvaluableNode *call_stack, bool on_self, Interpreter *calling_interpreter,
std::vector<EntityWriteListener *> *write_listeners, PrintListener *print_listener
#ifdef MULTITHREAD_SUPPORT
Concurrency::ReadLock *locked_memory_modification_lock,
Concurrency::ReadLock *entity_read_lock,
, Concurrency::ReadLock *entity_read_lock
#endif
Interpreter *calling_interpreter)
)
{
if(!on_self && IsLabelPrivate(label_sid))
return EvaluableNodeReference(nullptr, true);

#ifdef MULTITHREAD_SUPPORT
if(locked_memory_modification_lock != nullptr)
locked_memory_modification_lock->unlock();
if(calling_interpreter != nullptr)
calling_interpreter->memoryModificationLock.unlock();
#endif

EvaluableNode *node_to_execute = nullptr;
Expand All @@ -454,8 +453,8 @@ EvaluableNodeReference Entity::Execute(ExecutionCycleCount max_num_steps, Execut
{
#ifdef MULTITHREAD_SUPPORT
//put lock back in place
if(locked_memory_modification_lock != nullptr)
locked_memory_modification_lock->lock();
if(calling_interpreter != nullptr)
calling_interpreter->memoryModificationLock.lock();
#endif
return EvaluableNodeReference::Null();
}
Expand All @@ -476,8 +475,8 @@ EvaluableNodeReference Entity::Execute(ExecutionCycleCount max_num_steps, Execut

#ifdef MULTITHREAD_SUPPORT
//make sure have lock before copy into destination_temp_enm
if(locked_memory_modification_lock != nullptr)
locked_memory_modification_lock->lock();
if(calling_interpreter != nullptr)
calling_interpreter->memoryModificationLock.lock();
#endif

//find difference in entity size
Expand Down
27 changes: 12 additions & 15 deletions src/Amalgam/entity/Entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,36 +187,33 @@ class Entity
// Uses max_num_steps as the maximum number of operations that can be executed by this and any subordinate operations called. If max_num_steps is 0, then it will execute unlimeted steps
// Uses max_num_nodes as the maximum number of nodes that can be allocated in memory by this and any subordinate operations called. If max_num_nodes is 0, then it will allow unlimited allocations
// If on_self is true, then it will be allowed to access private variables
// If locked_memory_modification_lock is specified, then it will unlock it prior to the execution, but lock it again before
// If entity_read_lock is specified, then it will unlock prior to execution after locked_memory_modification_lock is locked
// If entity_read_lock is specified, then it will unlock prior to execution after the interpreter's memoryModificationLock is locked
// potentially writing anything out to destination_temp_enm
EvaluableNodeReference Execute(ExecutionCycleCount max_num_steps, ExecutionCycleCount &num_steps_executed, size_t max_num_nodes, size_t &num_nodes_allocated,
StringInternPool::StringID label_sid, EvaluableNode *call_stack, bool on_self = false,
std::vector<EntityWriteListener *> *write_listeners = nullptr, PrintListener *print_listener = nullptr,
StringInternPool::StringID label_sid, EvaluableNode *call_stack, bool on_self = false, Interpreter *calling_interpreter = nullptr,
std::vector<EntityWriteListener *> *write_listeners = nullptr, PrintListener *print_listener = nullptr
#ifdef MULTITHREAD_SUPPORT
Concurrency::ReadLock *locked_memory_modification_lock = nullptr,
Concurrency::ReadLock *entity_read_lock = nullptr,
, Concurrency::ReadLock *entity_read_lock = nullptr
#endif
Interpreter *calling_interpreter = nullptr);
);

//same as Execute but accepts a string for label name
inline EvaluableNodeReference Execute(ExecutionCycleCount max_num_steps, ExecutionCycleCount &num_steps_executed,
size_t max_num_nodes, size_t &num_nodes_allocated,
std::string &label_name, EvaluableNode *call_stack, bool on_self = false,
std::vector<EntityWriteListener *> *write_listeners = nullptr, PrintListener *print_listener = nullptr,
std::string &label_name, EvaluableNode *call_stack, bool on_self = false, Interpreter *calling_interpreter = nullptr,
std::vector<EntityWriteListener *> *write_listeners = nullptr, PrintListener *print_listener = nullptr
#ifdef MULTITHREAD_SUPPORT
Concurrency::ReadLock *locked_memory_modification_lock = nullptr,
Concurrency::ReadLock *entity_read_lock = nullptr,
, Concurrency::ReadLock *entity_read_lock = nullptr
#endif
Interpreter *calling_interpreter = nullptr)
)
{
StringInternPool::StringID label_sid = string_intern_pool.GetIDFromString(label_name);
return Execute(max_num_steps, num_steps_executed, max_num_nodes, num_nodes_allocated,
label_sid, call_stack, on_self, write_listeners, print_listener,
label_sid, call_stack, on_self, calling_interpreter, write_listeners, print_listener
#ifdef MULTITHREAD_SUPPORT
locked_memory_modification_lock, entity_read_lock,
, entity_read_lock
#endif
calling_interpreter);
);
}

//returns true if the entity or any of its contained entities are currently being executed, either because of multiple threads executing on it
Expand Down
16 changes: 5 additions & 11 deletions src/Amalgam/entity/EntityExternalInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ EntityExternalInterface::LoadEntityStatus EntityExternalInterface::LoadEntity(st
}

std::string file_type = "";
Entity *entity = asset_manager.LoadEntityFromResourcePath(path, file_type, persistent, load_contained_entities, escape_filename, escape_contained_filenames, rand_seed, status);
Entity *entity = asset_manager.LoadEntityFromResourcePath(path, file_type, persistent, load_contained_entities,
escape_filename, escape_contained_filenames, rand_seed, nullptr, status);

if(!status.loaded)
return status;

Expand Down Expand Up @@ -132,11 +134,7 @@ void EntityExternalInterface::ExecuteEntity(std::string &handle, std::string &la
ExecutionCycleCount max_num_steps = 0, num_steps_executed = 0;
size_t max_num_nodes = 0, num_nodes_allocated = 0;
bundle->entity->Execute(max_num_steps, num_steps_executed, max_num_nodes, num_nodes_allocated,
label, nullptr, false, &bundle->writeListeners, bundle->printListener
#ifdef MULTITHREAD_SUPPORT
, nullptr, nullptr
#endif
);
label, nullptr, false, nullptr, &bundle->writeListeners, bundle->printListener);
}

void EntityExternalInterface::DestroyEntity(std::string &handle)
Expand Down Expand Up @@ -614,11 +612,7 @@ std::string EntityExternalInterface::ExecuteEntityJSON(std::string &handle, std:
ExecutionCycleCount max_num_steps = 0, num_steps_executed = 0;
size_t max_num_nodes = 0, num_nodes_allocated = 0;
EvaluableNodeReference returned_value = bundle->entity->Execute(max_num_steps, num_steps_executed, max_num_nodes,
num_nodes_allocated, label, call_stack, false, &bundle->writeListeners, bundle->printListener
#ifdef MULTITHREAD_SUPPORT
, nullptr, nullptr
#endif
);
num_nodes_allocated, label, call_stack, false, nullptr, &bundle->writeListeners, bundle->printListener);

//ConvertArgsToCallStack always adds an outer list that is safe to free
enm.FreeNode(call_stack);
Expand Down
8 changes: 4 additions & 4 deletions src/Amalgam/entity/EntityQueryCaches.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ bool EntityQueryCaches::DoesCachedConditionMatch(EntityQueryCondition *cond, boo
if(qt == ENT_QUERY_NEAREST_GENERALIZED_DISTANCE || qt == ENT_QUERY_WITHIN_GENERALIZED_DISTANCE || qt == ENT_COMPUTE_ENTITY_CONVICTIONS
|| qt == ENT_COMPUTE_ENTITY_GROUP_KL_DIVERGENCE || qt == ENT_COMPUTE_ENTITY_DISTANCE_CONTRIBUTIONS || qt == ENT_COMPUTE_ENTITY_KL_DIVERGENCES)
{
//TODO 4948: sbfds does not fully support p0 acceleration; it requires templating and calling logs of differences, then performing an inverse transform at the end
//accelerating a p of 0 with the current caches would be a large effort, as everything would have to be
// transformed via logarithms and then pValue = 1 applied
//however, because other transforms, like surprisal_to_prob already transform the data to log space,
//accelerating this edge case does not seem worthwhile
if(cond->distEvaluator.pValue == 0)
return false;

return true;
}

return true;
Expand All @@ -47,7 +48,6 @@ static bool CanUseQueryCaches(std::vector<EntityQueryCondition> &conditions)
return true;
}


#if defined(MULTITHREAD_SUPPORT) || defined(MULTITHREAD_INTERFACE)
void EntityQueryCaches::EnsureLabelsAreCached(EntityQueryCondition *cond, Concurrency::ReadLock &lock)
#else
Expand Down
67 changes: 25 additions & 42 deletions src/Amalgam/evaluablenode/EvaluableNodeManagement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,40 +192,38 @@ void EvaluableNodeManager::CollectGarbage()
//keep trying to acquire write lock to see if this thread wins the race to collect garbage
Concurrency::WriteLock write_lock(memoryModificationMutex, std::defer_lock);

while(!write_lock.try_lock())
//wait for either the lock or no longer need garbage collecting
while(!write_lock.try_lock() && RecommendGarbageCollection())
{ }

//if owns lock, double-check still needs collection,
// and not that another thread collected it since acquiring the lock
if(write_lock.owns_lock())
{
if(!RecommendGarbageCollection())
if(RecommendGarbageCollection())
{
if(memory_modification_lock != nullptr)
memory_modification_lock->lock();
#endif
size_t cur_first_unused_node_index = firstUnusedNodeIndex;
//clear firstUnusedNodeIndex to signal to other threads that they won't need to do garbage collection
firstUnusedNodeIndex = 0;

if(PerformanceProfiler::IsProfilingEnabled())
PerformanceProfiler::EndOperation(GetNumberOfUsedNodes());
//if any group of nodes on the top are ready to be cleaned up cheaply, do so first
while(cur_first_unused_node_index > 0 && nodes[cur_first_unused_node_index - 1] != nullptr
&& nodes[cur_first_unused_node_index - 1]->GetType() == ENT_DEALLOCATED)
cur_first_unused_node_index--;

return;
}
}

//double-check still needs collection, and not that another thread collected it
if(!RecommendGarbageCollection())
{
write_lock.unlock();
if(memory_modification_lock != nullptr)
memory_modification_lock->lock();
//set to contain everything that is referenced
MarkAllReferencedNodesInUse(cur_first_unused_node_index);

if(PerformanceProfiler::IsProfilingEnabled())
PerformanceProfiler::EndOperation(GetNumberOfUsedNodes());
FreeAllNodesExceptReferencedNodes(cur_first_unused_node_index);

return;
}
#endif
#ifdef MULTITHREAD_SUPPORT
}

//perform garbage collection
FreeAllNodesExceptReferencedNodes();
//free the unique lock and reacquire the shared lock
write_lock.unlock();
}

#ifdef MULTITHREAD_SUPPORT
//free the unique lock and reacquire the shared lock
write_lock.unlock();
if(memory_modification_lock != nullptr)
memory_modification_lock->lock();
#endif
Expand Down Expand Up @@ -323,23 +321,8 @@ EvaluableNode *EvaluableNodeManager::AllocUninitializedNode()
return nodes[firstUnusedNodeIndex++];
}

void EvaluableNodeManager::FreeAllNodesExceptReferencedNodes()
void EvaluableNodeManager::FreeAllNodesExceptReferencedNodes(size_t cur_first_unused_node_index)
{
if(nodes.size() == 0)
return;

size_t cur_first_unused_node_index = firstUnusedNodeIndex;
//clear firstUnusedNodeIndex to signal to other threads that they won't need to do garbage collection
firstUnusedNodeIndex = 0;

//if any group of nodes on the top are ready to be cleaned up cheaply, do so first
while(cur_first_unused_node_index > 0 && nodes[cur_first_unused_node_index - 1] != nullptr
&& nodes[cur_first_unused_node_index - 1]->GetType() == ENT_DEALLOCATED)
cur_first_unused_node_index--;

//set to contain everything that is referenced
MarkAllReferencedNodesInUse(cur_first_unused_node_index);

//create a temporary variable for multithreading as to not use the atomic variable to slow things down
size_t first_unused_node_index_temp = 0;

Expand Down
6 changes: 5 additions & 1 deletion src/Amalgam/evaluablenode/EvaluableNodeManagement.h
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,11 @@ class EvaluableNodeManager
EvaluableNode *AllocUninitializedNode();

//frees everything execpt those nodes referenced by nodesCurrentlyReferenced
void FreeAllNodesExceptReferencedNodes();
//cur_first_unused_node_index represents the first unused index and will set firstUnusedNodeIndex
//to the reduced value
//note that this method does not read from firstUnusedNodeIndex, as it may be cleared to indicate threads
//to stop spinlocks
void FreeAllNodesExceptReferencedNodes(size_t cur_first_unused_node_index);

//support for FreeNodeTree, but requires that tree not be nullptr
void FreeNodeTreeRecurse(EvaluableNode *tree);
Expand Down
8 changes: 5 additions & 3 deletions src/Amalgam/interpreter/Interpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ class Interpreter
__forceinline void CollectGarbage()
{
if(evaluableNodeManager->RecommendGarbageCollection())
evaluableNodeManager->CollectGarbage(
{
#ifdef MULTITHREAD_SUPPORT
&memoryModificationLock
evaluableNodeManager->CollectGarbage(&memoryModificationLock);
#else
evaluableNodeManager->CollectGarbage();
#endif
);
}
}

//pushes new_context on the stack; new_context should be a unique associative array,
Expand Down
12 changes: 6 additions & 6 deletions src/Amalgam/interpreter/InterpreterOpcodesEntityAccess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,11 @@ EvaluableNodeReference Interpreter::InterpretNode_ENT_CALL_ENTITY_and_CALL_ENTIT
size_t num_nodes_allocated = 0;
EvaluableNodeReference retval = called_entity->Execute(num_steps_allowed, num_steps_executed,
num_nodes_allowed, num_nodes_allocated,
entity_label_sid, call_stack, called_entity == curEntity, cur_write_listeners, printListener,
entity_label_sid, call_stack, called_entity == curEntity, this, cur_write_listeners, printListener
#ifdef MULTITHREAD_SUPPORT
&memoryModificationLock, &called_entity.lock,
, &called_entity.lock
#endif
this);
);

//accumulate costs of execution
curExecutionStep += num_steps_executed;
Expand Down Expand Up @@ -632,11 +632,11 @@ EvaluableNodeReference Interpreter::InterpretNode_ENT_CALL_CONTAINER(EvaluableNo
ExecutionCycleCount num_steps_executed = 0;
size_t num_nodes_allocated = 0;
EvaluableNodeReference retval = container->Execute(num_steps_allowed, num_steps_executed, num_nodes_allowed, num_nodes_allocated,
container_label_sid, call_stack, false, writeListeners, printListener,
container_label_sid, call_stack, false, this, writeListeners, printListener
#ifdef MULTITHREAD_SUPPORT
&memoryModificationLock, &container.lock,
, &container.lock
#endif
this);
);

//accumulate costs of execution
curExecutionStep += num_steps_executed;
Expand Down
3 changes: 2 additions & 1 deletion src/Amalgam/interpreter/InterpreterOpcodesEntityControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,9 @@ EvaluableNodeReference Interpreter::InterpretNode_ENT_LOAD_ENTITY_and_LOAD_PERSI

EntityExternalInterface::LoadEntityStatus status;
std::string random_seed = destination_entity_parent->CreateRandomStreamFromStringAndRand(resource_name);

Entity *loaded_entity = asset_manager.LoadEntityFromResourcePath(resource_name, file_type,
persistent, true, escape_filename, escape_contained_filenames, random_seed, status);
persistent, true, escape_filename, escape_contained_filenames, random_seed, this, status);

//handle errors
if(!status.loaded)
Expand Down

0 comments on commit 51ac4e9

Please sign in to comment.