Skip to content
This repository has been archived by the owner on Jun 25, 2022. It is now read-only.

Commit

Permalink
Merge branch 'models' of github.com:HPDCS/ROOT-Sim into models
Browse files Browse the repository at this point in the history
  • Loading branch information
alessandropellegrini committed Jan 29, 2018
2 parents d83686b + 8d7d6ec commit 0b3dcfd
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 139 deletions.
2 changes: 1 addition & 1 deletion src/communication/communication.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void communication_fini(void) {
rsfree(slab_lock);
}

static msg_hdr_t *get_msg_hdr_from_slab(void) {
msg_hdr_t *get_msg_hdr_from_slab(void) {
spin_lock(&slab_lock[local_tid]);
msg_hdr_t *msg = (msg_hdr_t *)slab_alloc(&msg_slab[local_tid]);
spin_unlock(&slab_lock[local_tid]);
Expand Down
3 changes: 2 additions & 1 deletion src/communication/communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ enum _control_msgs {
MAX_VALUE_CONTROL
};

#define count_as_white(type) (type < MIN_VALUE_CONTROL || type == RENDEZVOUS_START)
#define is_control_msg(type) (type >= MIN_VALUE_CONTROL && type != RENDEZVOUS_START)

// Message Codes for PVM
#define MSG_INIT_MPI 200
Expand Down Expand Up @@ -120,6 +120,7 @@ extern void start_ack_timer(void);

extern void msg_hdr_release(msg_hdr_t *msg);
extern msg_t *get_msg_from_slab(void);
extern msg_hdr_t *get_msg_hdr_from_slab(void);
extern void pack_msg(msg_t **msg, GID_t sender, GID_t receiver, int type, simtime_t timestamp, simtime_t send_time, size_t size, void *payload);
extern void msg_to_hdr(msg_hdr_t *hdr, msg_t *msg);
extern void hdr_to_msg(msg_hdr_t *hdr, msg_t *msg);
Expand Down
12 changes: 11 additions & 1 deletion src/communication/gvt.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,13 @@ simtime_t last_reduced_gvt(void){


void register_outgoing_msg(const msg_t* msg) {
unsigned int dst_kid = GidToKernel(msg->receiver);
#ifdef ECS
if(is_control_msg(msg->type))
return;
#endif

unsigned int dst_kid = GidToKernel(msg->receiver);

if(dst_kid == kid) return;

if(is_red_colour(msg->colour)) {
Expand All @@ -330,6 +335,11 @@ void register_outgoing_msg(const msg_t* msg) {


void register_incoming_msg(const msg_t* msg){
#ifdef ECS
if(is_control_msg(msg->type))
return;
#endif

unsigned int src_kid = GidToKernel(msg->sender);

if(src_kid == kid) return;
Expand Down
155 changes: 27 additions & 128 deletions src/communication/mpi.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ spinlock_t mpi_lock;
// control access to the message receiving routine
static spinlock_t msgs_lock;

MPI_Datatype msg_mpi_t;

// counter of the kernels that have already reached the
// termination condition. Must be updated through the collect_termination() function.
static unsigned int terminated = 0;
Expand Down Expand Up @@ -96,27 +94,11 @@ void send_remote_msg(msg_t *msg){
out_msg->msg->colour = threads_phase_colour[local_tid];
unsigned int dest = GidToKernel(msg->receiver);

if(count_as_white(msg->type))
register_outgoing_msg(out_msg->msg);

// Check if the message buffer is from the slab. In this case
// we can send it using the msg_mpi_t. On the other hand, we need
// to send a header telling the size of the message to be received
// at the other endpoint.
if(sizeof(msg_t) + msg->size <= SLAB_MSG_SIZE) {
lock_mpi();
MPI_Isend(out_msg->msg, 1, msg_mpi_t, dest, MSG_EVENT, MPI_COMM_WORLD, &(out_msg->req));
unlock_mpi();
} else {
unsigned int size = sizeof(msg_t) + msg->size;
lock_mpi();
// We don't keep the header in the list, as the following two messages can be considered
// as just one.
MPI_Send(&size, 1, MPI_UNSIGNED, dest, MSG_EVENT_LARGER, MPI_COMM_WORLD);
MPI_Isend(out_msg->msg, size, MPI_UNSIGNED_CHAR, dest, MSG_EVENT_LARGER, MPI_COMM_WORLD, &(out_msg->req));
unlock_mpi();
}
register_outgoing_msg(out_msg->msg);

lock_mpi();
MPI_Isend(((char*)out_msg->msg) + MSG_PADDING, MSG_META_SIZE + msg->size, MPI_BYTE, dest, MSG_EVENT, MPI_COMM_WORLD, &out_msg->req);
unlock_mpi();
// Keep the message in the outgoing queue until it will be delivered
store_outgoing_msg(out_msg, dest);
}
Expand All @@ -132,73 +114,43 @@ void send_remote_msg(msg_t *msg){
* This function is thread-safe.
*/
void receive_remote_msgs(void){
int res = 0;
int size;
msg_t *msg;
MPI_Status status;
bool found_msg = true;
int pending;

/* - `pending_msgs` and `MPI_Recv` need to be in the same critical section.
* I could start an MPI_Recv with an empty incoming queue.
* - `MPI_Recv` and `insert_bottom_half` need to be in the same critical section.
* messages need to be inserted in arrival order into the BH
*/
if(!spin_trylock(&msgs_lock))
return;

while(found_msg) {
found_msg = false;
lock_mpi();
MPI_Iprobe(MPI_ANY_SOURCE, MSG_EVENT, MPI_COMM_WORLD, &pending, &status);
unlock_mpi();

// Try to receive a fixed-size message
if(pending_msgs(MSG_EVENT)) {
found_msg = true;
if(!pending)
goto out;

// We get the buffer from the slab allocator
msg = get_msg_from_slab();
MPI_Get_count(&status, MPI_BYTE, &size);

// Receive the message
lock_mpi();
res = MPI_Recv(msg, 1, msg_mpi_t, MPI_ANY_SOURCE, MSG_EVENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
unlock_mpi();
if(res != 0)
goto out;
if(MSG_PADDING + size <= SLAB_MSG_SIZE)
msg = get_msg_from_slab();
else
msg = rsalloc(MSG_PADDING + size);

validate_msg(msg);
}
/* - `pending_msgs` and `MPI_Recv` need to be in the same critical section.
* I could start an MPI_Recv with an empty incoming queue.
* - `MPI_Recv` and `insert_bottom_half` need to be in the same critical section.
* messages need to be inserted in arrival order into the BH
*/

// Try to receive a variable-size message
if(pending_msgs(MSG_EVENT_LARGER)) {
found_msg = true;

// First, get the size and identify the source
lock_mpi();
res = MPI_Recv(&size, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MSG_EVENT_LARGER, MPI_COMM_WORLD, &status);
unlock_mpi();
if(res != 0)
goto out;

// Now get the actual message, matching the source of the header
if(size + sizeof(msg_t) <= SLAB_MSG_SIZE)
msg = get_msg_from_slab();
else
msg = rsalloc(size);
lock_mpi();
res = MPI_Recv(msg, size, MPI_UNSIGNED_CHAR, status.MPI_SOURCE, MSG_EVENT_LARGER, MPI_COMM_WORLD, &status);
unlock_mpi();
if(res != 0)
goto out;

validate_msg(msg);
}

if(found_msg)
insert_bottom_half(msg);
}
// Receive the message
lock_mpi();
MPI_Recv(((char*)msg) + MSG_PADDING, size, MPI_BYTE, MPI_ANY_SOURCE, MSG_EVENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
unlock_mpi();

out:
if(res != 0)
rootsim_error(true, "MPI_Recv did not complete correctly");
validate_msg(msg);
insert_bottom_half(msg);

out:
spin_unlock(&msgs_lock);
}

Expand Down Expand Up @@ -288,57 +240,6 @@ void dist_termination_finalize(void){
}


void mpi_datatype_init(void) {
#define NUM_MEMBERS 7

msg_t* msg = NULL;
int base;
unsigned int i;
int type_size;
int header_size = 0;

MPI_Datatype type[NUM_MEMBERS] = {MPI_UNSIGNED,
MPI_UNSIGNED_CHAR,
MPI_INT,
MPI_DOUBLE,
MPI_UNSIGNED_LONG_LONG,
MPI_INT,
MPI_CHAR};


// The last entry of the array is dynamically populated below
int blocklen[NUM_MEMBERS] = {2, 1, 2, 2, 2, 1, 0};
for(i = 0; i < (NUM_MEMBERS - 1); i++) {
MPI_Type_size(type[i], &type_size);
type_size *= blocklen[i];
header_size += type_size;
}
blocklen[NUM_MEMBERS - 1] = SLAB_MSG_SIZE - header_size;

MPI_Aint disp[NUM_MEMBERS];
MPI_Get_address(&msg->sender, disp);
MPI_Get_address(&msg->colour, disp + 1);
MPI_Get_address(&msg->type, disp + 2);
MPI_Get_address(&msg->timestamp, disp + 3);
MPI_Get_address(&msg->mark, disp + 4);
MPI_Get_address(&msg->size, disp + 5);
MPI_Get_address(&msg->event_content, disp + 6);
base = disp[0];
for (i = 0; i < sizeof(disp) / sizeof(disp[0]); i++) {
disp[i] = MPI_Aint_diff(disp[i], base);
}
MPI_Type_create_struct(7, blocklen, disp, type, &msg_mpi_t);
MPI_Type_commit(&msg_mpi_t);

#undef NUM_MEMBERS
}


void mpi_datatype_finalize(void){
MPI_Type_free(&msg_mpi_t);
}


/*
* Syncronize all the kernels:
*
Expand Down Expand Up @@ -388,15 +289,13 @@ void inter_kernel_comm_init(void){
spinlock_init(&msgs_lock);

outgoing_window_init();
mpi_datatype_init();
gvt_comm_init();
dist_termination_init();
}


void inter_kernel_comm_finalize(void){
dist_termination_finalize();
mpi_datatype_finalize();
//outgoing_window_finalize();
gvt_comm_finalize();
}
Expand Down
3 changes: 3 additions & 0 deletions src/core/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ typedef enum {positive, negative, control} message_kind_t;
typedef unsigned char phase_colour;
#endif

#define MSG_PADDING offsetof(msg_t, sender)
#define MSG_META_SIZE (sizeof(msg_t) - MSG_PADDING)

/** The MPI datatype msg_mpi_t depends on the order of this struct.
See src/communication/mpi.c for the implementation of the datatype */
/// Message Type definition
Expand Down
14 changes: 8 additions & 6 deletions src/mm/ecs.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <mm/mm.h>
#include <scheduler/scheduler.h>
#include <scheduler/process.h>
#include <communication/communication.h>
#include <arch/ult.h>
#include <arch/x86.h>

Expand All @@ -69,10 +70,10 @@ static __thread fault_info_t fault_info;
// Declared in ecsstub.S
extern void rootsim_cross_state_dependency_handler(void);

GID_t target_gid;
// This handler is only called in case of a remote ECS
void ecs_secondary(void) {

GID_t target_gid;
// target_address is filled by the ROOT-Sim fault handler at kernel level before triggering the signal
long long target_address = fault_info.target_address;
unsigned char *faulting_insn = (unsigned char *)fault_info.rip;
Expand Down Expand Up @@ -114,8 +115,9 @@ void ecs_secondary(void) {

void ecs_initiate(void) {
msg_t *control_msg;
msg_hdr_t msg_hdr;
msg_hdr_t *msg_hdr;

GID_t target_gid;
// Generate a unique mark for this ECS
current_evt->rendezvous_mark = generate_mark(current_lp);
LPS(current_lp)->wait_on_rendezvous = current_evt->rendezvous_mark;
Expand All @@ -127,8 +129,9 @@ void ecs_initiate(void) {
control_msg->mark = generate_mark(current_lp);

// This message must be stored in the output queue as well, in case this LP rollbacks
msg_to_hdr(&msg_hdr, control_msg);
list_insert(LPS(current_lp)->queue_out, send_time, &msg_hdr);
msg_hdr = get_msg_hdr_from_slab();
msg_to_hdr(msg_hdr, control_msg);
list_insert(LPS(current_lp)->queue_out, send_time, msg_hdr);

// Block the execution of this LP
LPS(current_lp)->state = LP_STATE_WAIT_FOR_SYNCH;
Expand Down Expand Up @@ -225,7 +228,7 @@ void lp_alloc_schedule(void) {

sched_info.ds = pgd_ds;
sched_info.count = LPS(current_lp)->ECS_index + 1; // it's a counter
sched_info.objects = LPS(current_lp)->ECS_synch_table; // pgd descriptor range from 0 to number threads - a subset of object ids
sched_info.objects = (unsigned int*) LPS(current_lp)->ECS_synch_table; // pgd descriptor range from 0 to number threads - a subset of object ids

/* passing into LP mode - here for the pgd_ds-th LP */
ioctl(ioctl_fd,IOCTL_SCHEDULE_ON_PGD, &sched_info);
Expand Down Expand Up @@ -311,7 +314,6 @@ void unblock_synchronized_objects(LID_t localID) {
msg_t *control_msg;

for(i = 1; i <= LPS(localID)->ECS_index; i++) {
LPS(localID)->ECS_synch_table[i];
pack_msg(&control_msg, LidToGid(localID), LPS(localID)->ECS_synch_table[i], RENDEZVOUS_UNBLOCK, lvt(localID), lvt(localID), 0, NULL);
control_msg->rendezvous_mark = LPS(localID)->wait_on_rendezvous;
Send(control_msg);
Expand Down
6 changes: 4 additions & 2 deletions src/scheduler/control.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,13 @@ bool process_control_msg(msg_t *msg) {

#ifdef HAVE_CROSS_STATE
LID_t lid_receiver = GidToLid(msg->receiver);

msg_t *copy;
switch(msg->type) {

case RENDEZVOUS_START:
list_insert(LPS(lid_receiver)->rendezvous_queue, timestamp, msg);
copy = rsalloc(sizeof(msg_t));
*copy = *msg;
list_insert(LPS(lid_receiver)->rendezvous_queue, timestamp, copy);
// Place this into input queue
LPS(lid_receiver)->wait_on_rendezvous = msg->rendezvous_mark;

Expand Down

0 comments on commit 0b3dcfd

Please sign in to comment.