diff --git a/src/communication/communication.c b/src/communication/communication.c index 9d2103a9e..639337c0b 100644 --- a/src/communication/communication.c +++ b/src/communication/communication.c @@ -97,7 +97,7 @@ void communication_fini(void) { rsfree(slab_lock); } -static msg_hdr_t *get_msg_hdr_from_slab(void) { +msg_hdr_t *get_msg_hdr_from_slab(void) { spin_lock(&slab_lock[local_tid]); msg_hdr_t *msg = (msg_hdr_t *)slab_alloc(&msg_slab[local_tid]); spin_unlock(&slab_lock[local_tid]); diff --git a/src/communication/communication.h b/src/communication/communication.h index 7116ec299..b15ec8e68 100644 --- a/src/communication/communication.h +++ b/src/communication/communication.h @@ -60,7 +60,7 @@ enum _control_msgs { MAX_VALUE_CONTROL }; -#define count_as_white(type) (type < MIN_VALUE_CONTROL || type == RENDEZVOUS_START) +#define is_control_msg(type) (type >= MIN_VALUE_CONTROL && type != RENDEZVOUS_START) // Message Codes for PVM #define MSG_INIT_MPI 200 @@ -120,6 +120,7 @@ extern void start_ack_timer(void); extern void msg_hdr_release(msg_hdr_t *msg); extern msg_t *get_msg_from_slab(void); +extern msg_hdr_t *get_msg_hdr_from_slab(void); extern void pack_msg(msg_t **msg, GID_t sender, GID_t receiver, int type, simtime_t timestamp, simtime_t send_time, size_t size, void *payload); extern void msg_to_hdr(msg_hdr_t *hdr, msg_t *msg); extern void hdr_to_msg(msg_hdr_t *hdr, msg_t *msg); diff --git a/src/communication/gvt.c b/src/communication/gvt.c index 10ccb5f9a..0b2b97657 100644 --- a/src/communication/gvt.c +++ b/src/communication/gvt.c @@ -317,8 +317,13 @@ simtime_t last_reduced_gvt(void){ void register_outgoing_msg(const msg_t* msg) { - unsigned int dst_kid = GidToKernel(msg->receiver); +#ifdef ECS + if(is_control_msg(msg->type)) + return; +#endif + unsigned int dst_kid = GidToKernel(msg->receiver); + if(dst_kid == kid) return; if(is_red_colour(msg->colour)) { @@ -330,6 +335,11 @@ void register_outgoing_msg(const msg_t* msg) { void register_incoming_msg(const msg_t* msg){ +#ifdef ECS + if(is_control_msg(msg->type)) + return; +#endif + unsigned int src_kid = GidToKernel(msg->sender); if(src_kid == kid) return; diff --git a/src/communication/mpi.c b/src/communication/mpi.c index 04ee2b0e4..84d2068ea 100644 --- a/src/communication/mpi.c +++ b/src/communication/mpi.c @@ -44,8 +44,6 @@ spinlock_t mpi_lock; // control access to the message receiving routine static spinlock_t msgs_lock; -MPI_Datatype msg_mpi_t; - // counter of the kernels that have already reached the // termination condition. Must be updated through the collect_termination() function. static unsigned int terminated = 0; @@ -96,27 +94,11 @@ void send_remote_msg(msg_t *msg){ out_msg->msg->colour = threads_phase_colour[local_tid]; unsigned int dest = GidToKernel(msg->receiver); - if(count_as_white(msg->type)) - register_outgoing_msg(out_msg->msg); - - // Check if the message buffer is from the slab. In this case - // we can send it using the msg_mpi_t. On the other hand, we need - // to send a header telling the size of the message to be received - // at the other endpoint. - if(sizeof(msg_t) + msg->size <= SLAB_MSG_SIZE) { - lock_mpi(); - MPI_Isend(out_msg->msg, 1, msg_mpi_t, dest, MSG_EVENT, MPI_COMM_WORLD, &(out_msg->req)); - unlock_mpi(); - } else { - unsigned int size = sizeof(msg_t) + msg->size; - lock_mpi(); - // We don't keep the header in the list, as the following two messages can be considered - // as just one. - MPI_Send(&size, 1, MPI_UNSIGNED, dest, MSG_EVENT_LARGER, MPI_COMM_WORLD); - MPI_Isend(out_msg->msg, size, MPI_UNSIGNED_CHAR, dest, MSG_EVENT_LARGER, MPI_COMM_WORLD, &(out_msg->req)); - unlock_mpi(); - } + register_outgoing_msg(out_msg->msg); + lock_mpi(); + MPI_Isend(((char*)out_msg->msg) + MSG_PADDING, MSG_META_SIZE + msg->size, MPI_BYTE, dest, MSG_EVENT, MPI_COMM_WORLD, &out_msg->req); + unlock_mpi(); // Keep the message in the outgoing queue until it will be delivered store_outgoing_msg(out_msg, dest); } @@ -132,73 +114,43 @@ void send_remote_msg(msg_t *msg){ * This function is thread-safe. */ void receive_remote_msgs(void){ - int res = 0; int size; msg_t *msg; MPI_Status status; - bool found_msg = true; + int pending; - /* - `pending_msgs` and `MPI_Recv` need to be in the same critical section. - * I could start an MPI_Recv with an empty incoming queue. - * - `MPI_Recv` and `insert_bottom_half` need to be in the same critical section. - * messages need to be inserted in arrival order into the BH - */ if(!spin_trylock(&msgs_lock)) return; - while(found_msg) { - found_msg = false; + lock_mpi(); + MPI_Iprobe(MPI_ANY_SOURCE, MSG_EVENT, MPI_COMM_WORLD, &pending, &status); + unlock_mpi(); - // Try to receive a fixed-size message - if(pending_msgs(MSG_EVENT)) { - found_msg = true; + if(!pending) + goto out; - // We get the buffer from the slab allocator - msg = get_msg_from_slab(); + MPI_Get_count(&status, MPI_BYTE, &size); - // Receive the message - lock_mpi(); - res = MPI_Recv(msg, 1, msg_mpi_t, MPI_ANY_SOURCE, MSG_EVENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - unlock_mpi(); - if(res != 0) - goto out; + if(MSG_PADDING + size <= SLAB_MSG_SIZE) + msg = get_msg_from_slab(); + else + msg = rsalloc(MSG_PADDING + size); - validate_msg(msg); - } + /* - `pending_msgs` and `MPI_Recv` need to be in the same critical section. + * I could start an MPI_Recv with an empty incoming queue. + * - `MPI_Recv` and `insert_bottom_half` need to be in the same critical section. + * messages need to be inserted in arrival order into the BH + */ - // Try to receive a variable-size message - if(pending_msgs(MSG_EVENT_LARGER)) { - found_msg = true; - - // First, get the size and identify the source - lock_mpi(); - res = MPI_Recv(&size, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MSG_EVENT_LARGER, MPI_COMM_WORLD, &status); - unlock_mpi(); - if(res != 0) - goto out; - - // Now get the actual message, matching the source of the header - if(size + sizeof(msg_t) <= SLAB_MSG_SIZE) - msg = get_msg_from_slab(); - else - msg = rsalloc(size); - lock_mpi(); - res = MPI_Recv(msg, size, MPI_UNSIGNED_CHAR, status.MPI_SOURCE, MSG_EVENT_LARGER, MPI_COMM_WORLD, &status); - unlock_mpi(); - if(res != 0) - goto out; - - validate_msg(msg); - } - - if(found_msg) - insert_bottom_half(msg); - } + // Receive the message + lock_mpi(); + MPI_Recv(((char*)msg) + MSG_PADDING, size, MPI_BYTE, MPI_ANY_SOURCE, MSG_EVENT, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + unlock_mpi(); - out: - if(res != 0) - rootsim_error(true, "MPI_Recv did not complete correctly"); + validate_msg(msg); + insert_bottom_half(msg); +out: spin_unlock(&msgs_lock); } @@ -288,57 +240,6 @@ void dist_termination_finalize(void){ } -void mpi_datatype_init(void) { - #define NUM_MEMBERS 7 - - msg_t* msg = NULL; - int base; - unsigned int i; - int type_size; - int header_size = 0; - - MPI_Datatype type[NUM_MEMBERS] = {MPI_UNSIGNED, - MPI_UNSIGNED_CHAR, - MPI_INT, - MPI_DOUBLE, - MPI_UNSIGNED_LONG_LONG, - MPI_INT, - MPI_CHAR}; - - - // The last entry of the array is dynamically populated below - int blocklen[NUM_MEMBERS] = {2, 1, 2, 2, 2, 1, 0}; - for(i = 0; i < (NUM_MEMBERS - 1); i++) { - MPI_Type_size(type[i], &type_size); - type_size *= blocklen[i]; - header_size += type_size; - } - blocklen[NUM_MEMBERS - 1] = SLAB_MSG_SIZE - header_size; - - MPI_Aint disp[NUM_MEMBERS]; - MPI_Get_address(&msg->sender, disp); - MPI_Get_address(&msg->colour, disp + 1); - MPI_Get_address(&msg->type, disp + 2); - MPI_Get_address(&msg->timestamp, disp + 3); - MPI_Get_address(&msg->mark, disp + 4); - MPI_Get_address(&msg->size, disp + 5); - MPI_Get_address(&msg->event_content, disp + 6); - base = disp[0]; - for (i = 0; i < sizeof(disp) / sizeof(disp[0]); i++) { - disp[i] = MPI_Aint_diff(disp[i], base); - } - MPI_Type_create_struct(7, blocklen, disp, type, &msg_mpi_t); - MPI_Type_commit(&msg_mpi_t); - - #undef NUM_MEMBERS -} - - -void mpi_datatype_finalize(void){ - MPI_Type_free(&msg_mpi_t); -} - - /* * Syncronize all the kernels: * @@ -388,7 +289,6 @@ void inter_kernel_comm_init(void){ spinlock_init(&msgs_lock); outgoing_window_init(); - mpi_datatype_init(); gvt_comm_init(); dist_termination_init(); } @@ -396,7 +296,6 @@ void inter_kernel_comm_init(void){ void inter_kernel_comm_finalize(void){ dist_termination_finalize(); - mpi_datatype_finalize(); //outgoing_window_finalize(); gvt_comm_finalize(); } diff --git a/src/core/core.h b/src/core/core.h index d71c5dc7f..d52e2aca4 100644 --- a/src/core/core.h +++ b/src/core/core.h @@ -143,6 +143,9 @@ typedef enum {positive, negative, control} message_kind_t; typedef unsigned char phase_colour; #endif +#define MSG_PADDING offsetof(msg_t, sender) +#define MSG_META_SIZE (sizeof(msg_t) - MSG_PADDING) + /** The MPI datatype msg_mpi_t depends on the order of this struct. See src/communication/mpi.c for the implementation of the datatype */ /// Message Type definition diff --git a/src/mm/ecs.c b/src/mm/ecs.c index 03186a30a..07b76a520 100644 --- a/src/mm/ecs.c +++ b/src/mm/ecs.c @@ -46,6 +46,7 @@ #include #include #include +#include #include #include @@ -69,10 +70,10 @@ static __thread fault_info_t fault_info; // Declared in ecsstub.S extern void rootsim_cross_state_dependency_handler(void); -GID_t target_gid; // This handler is only called in case of a remote ECS void ecs_secondary(void) { + GID_t target_gid; // target_address is filled by the ROOT-Sim fault handler at kernel level before triggering the signal long long target_address = fault_info.target_address; unsigned char *faulting_insn = (unsigned char *)fault_info.rip; @@ -114,8 +115,9 @@ void ecs_secondary(void) { void ecs_initiate(void) { msg_t *control_msg; - msg_hdr_t msg_hdr; + msg_hdr_t *msg_hdr; + GID_t target_gid; // Generate a unique mark for this ECS current_evt->rendezvous_mark = generate_mark(current_lp); LPS(current_lp)->wait_on_rendezvous = current_evt->rendezvous_mark; @@ -127,8 +129,9 @@ void ecs_initiate(void) { control_msg->mark = generate_mark(current_lp); // This message must be stored in the output queue as well, in case this LP rollbacks - msg_to_hdr(&msg_hdr, control_msg); - list_insert(LPS(current_lp)->queue_out, send_time, &msg_hdr); + msg_hdr = get_msg_hdr_from_slab(); + msg_to_hdr(msg_hdr, control_msg); + list_insert(LPS(current_lp)->queue_out, send_time, msg_hdr); // Block the execution of this LP LPS(current_lp)->state = LP_STATE_WAIT_FOR_SYNCH; @@ -225,7 +228,7 @@ void lp_alloc_schedule(void) { sched_info.ds = pgd_ds; sched_info.count = LPS(current_lp)->ECS_index + 1; // it's a counter - sched_info.objects = LPS(current_lp)->ECS_synch_table; // pgd descriptor range from 0 to number threads - a subset of object ids + sched_info.objects = (unsigned int*) LPS(current_lp)->ECS_synch_table; // pgd descriptor range from 0 to number threads - a subset of object ids /* passing into LP mode - here for the pgd_ds-th LP */ ioctl(ioctl_fd,IOCTL_SCHEDULE_ON_PGD, &sched_info); @@ -311,7 +314,6 @@ void unblock_synchronized_objects(LID_t localID) { msg_t *control_msg; for(i = 1; i <= LPS(localID)->ECS_index; i++) { - LPS(localID)->ECS_synch_table[i]; pack_msg(&control_msg, LidToGid(localID), LPS(localID)->ECS_synch_table[i], RENDEZVOUS_UNBLOCK, lvt(localID), lvt(localID), 0, NULL); control_msg->rendezvous_mark = LPS(localID)->wait_on_rendezvous; Send(control_msg); diff --git a/src/scheduler/control.c b/src/scheduler/control.c index 9c73ce8f9..123c4abe1 100644 --- a/src/scheduler/control.c +++ b/src/scheduler/control.c @@ -215,11 +215,13 @@ bool process_control_msg(msg_t *msg) { #ifdef HAVE_CROSS_STATE LID_t lid_receiver = GidToLid(msg->receiver); - + msg_t *copy; switch(msg->type) { case RENDEZVOUS_START: - list_insert(LPS(lid_receiver)->rendezvous_queue, timestamp, msg); + copy = rsalloc(sizeof(msg_t)); + *copy = *msg; + list_insert(LPS(lid_receiver)->rendezvous_queue, timestamp, copy); // Place this into input queue LPS(lid_receiver)->wait_on_rendezvous = msg->rendezvous_mark;