Skip to content

Commit

Permalink
Merge pull request #75 from mattcolegate/zReuseSemaphores
Browse files Browse the repository at this point in the history
Allow semaphore reuse for z/OS
  • Loading branch information
tobespc authored Dec 3, 2018
2 parents 0559a24 + 13e6932 commit 6a72c0a
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 75 deletions.
15 changes: 9 additions & 6 deletions src/ibmras/common/port/Semaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ namespace port {
/* class to provide semaphore semantics */
class Semaphore {
public:
Semaphore(uint32 initial, uint32 max); /* semaphore initial and max count */
void inc(); /* increase the semaphore count */
bool wait(uint32 timeout); /* decrement the semaphore count */
~Semaphore(); /* OS cleanup of semaphore */
Semaphore(uint32 initial, uint32 max, const char *sourceName); /* semaphore initial and max count, source name */
void inc(); /* increase the semaphore count */
bool wait(uint32 timeout); /* decrement the semaphore count */
~Semaphore(); /* OS cleanup of semaphore */
#if defined(_ZOS)
int open(int* semid); /* Either create a new semaphore or open an existing one*/
#endif
private:
void* handle; /* opaque handle to platform data structure */
#if defined __MACH__
void* handle; /* opaque handle to platform data structure */
#if defined __MACH__ || defined(_ZOS)
std::string name;
#endif
};
Expand Down
4 changes: 2 additions & 2 deletions src/ibmras/common/port/aix/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,10 @@ void stopAllThreads() {
pthread_mutex_unlock(&threadMapMux);
}

Semaphore::Semaphore(uint32 initial, uint32 max) {
Semaphore::Semaphore(uint32 initial, uint32 max, const char* sourceName) {
if (!stopping) {
handle = new sem_t;
IBMRAS_DEBUG(fine,"in thread.cpp creating CreateSemaphoreA");
IBMRAS_DEBUG_1(fine,"in thread.cpp creating semaphore for source %s", sourceName);
int result;
result = sem_init(reinterpret_cast<sem_t*>(handle), 0, initial);
if (result) {
Expand Down
4 changes: 2 additions & 2 deletions src/ibmras/common/port/linux/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ void stopAllThreads() {
pthread_mutex_unlock(&threadMapMux);
}

Semaphore::Semaphore(uint32 initial, uint32 max) {
Semaphore::Semaphore(uint32 initial, uint32 max, const char* sourceName) {
if (!stopping) {
handle = new sem_t;
IBMRAS_DEBUG(fine,"in thread.cpp creating CreateSemaphoreA");
IBMRAS_DEBUG_1(fine,"in thread.cpp creating semaphore for source %s", sourceName);
int result;
result = sem_init(reinterpret_cast<sem_t*>(handle), 0, initial);
if (result) {
Expand Down
4 changes: 2 additions & 2 deletions src/ibmras/common/port/osx/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ void stopAllThreads() {
pthread_mutex_unlock(&threadMapMux);
}

Semaphore::Semaphore(uint32 initial, uint32 max) {
Semaphore::Semaphore(uint32 initial, uint32 max, const char* sourceName) {
if (!stopping) {
name = "/hc/";
name.append(ibmras::common::itoa(getpid()));
name.append("/");
name.append(ibmras::common::itoa(pthread_self()));
handle = new sem_t;
IBMRAS_DEBUG_1(fine, "in thread.cpp creating semaphore %s", name.c_str());
IBMRAS_DEBUG_2(fine, "in thread.cpp creating semaphore %s for %s", name.c_str(), sourceName);

handle = sem_open(name.c_str(), O_CREAT | O_EXCL, S_IRWXU | S_IRWXG | S_IRWXO, initial);
int i=0;
Expand Down
4 changes: 2 additions & 2 deletions src/ibmras/common/port/windows/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ void stopAllThreads() {
IBMRAS_DEBUG(fine,"in thread.cpp->stopAllThreads");
}

Semaphore::Semaphore(uint32 initial, uint32 max) {
Semaphore::Semaphore(uint32 initial, uint32 max, const char* sourceName) {
handle = new HANDLE;
IBMRAS_DEBUG(fine, "in thread.cpp creating CreateSemaphoreA");
IBMRAS_DEBUG_1(fine,"in thread.cpp creating semaphore for source %s", sourceName);
handle = CreateSemaphoreA(NULL, initial, max, NULL);
if(handle == NULL) {
IBMRAS_DEBUG_1(warning, "Failed to create semaphore : error code %d", GetLastError());
Expand Down
174 changes: 145 additions & 29 deletions src/ibmras/common/port/zos/Thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
#include <time.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include <sys/sem.h>
#include <sys/ipc.h>
#include <sys/modes.h>
#include <limits.h>
#include <sys/time.h>
#include <sys/resource.h>
Expand All @@ -44,11 +46,22 @@
#include "ibmras/common/port/ThreadData.h"
#include "ibmras/common/port/Semaphore.h"
#include "ibmras/common/logging.h"
#include "ibmras/common/util/FileUtils.h"
#include "ibmras/monitoring/agent/Agent.h"

namespace ibmras {
namespace common {
namespace port {

#define SEM_STORE_DIR ".com_ibm_tools_hc"
#define SEM_SUFFIX "_notifier"
#define SEMFLAGS_OPEN (S_IRUSR | S_IWUSR)
#define SEMFLAGS_CREATE (IPC_CREAT | IPC_EXCL | S_IRUSR | S_IWUSR)
#define SEM_CREATED 1
#define SEM_OPENED 2
#define SEM_OPEN_FAILED 0
#define FILE_SEPARATOR "/"

IBMRAS_DEFINE_LOGGER("Port");

extern "C" void* wrapper(void *params) {
Expand Down Expand Up @@ -164,48 +177,151 @@ int sem_timedwait(int *semid, struct timespec *t) {
return 0;
}

Semaphore::Semaphore(uint32 initial, uint32 max) {
IBMRAS_DEBUG(fine,"in thread.cpp creating CreateSemaphoreA");
handle = new int*;
int result;
result = sem_init(reinterpret_cast<int*>(handle), 0, initial);
if (result) {
IBMRAS_DEBUG_1(warning, "Failed to create semaphore : error code %d", result);
handle = NULL;
}
bool getIPCKey(key_t* handle, std::string name) {
// get the semaphore temp dir set in the loader
std::string tempDir = ibmras::monitoring::agent::Agent::getInstance()->getProperty("platform_tempdir");
if ("" == tempDir) {
IBMRAS_DEBUG(debug, "Defaulting tempDir to /tmp");
tempDir = "/tmp";
}
std::string semRecDir = tempDir + FILE_SEPARATOR + SEM_STORE_DIR;
// Create the semaphore resource directory if it doesn't exist
int directoryExists = ibmras::common::util::createDirectory(semRecDir);
IBMRAS_DEBUG_2(debug,"ibmras::common::util::createDirectory(%s) = %d", semRecDir.c_str(), directoryExists);
if (directoryExists) {
std::string baseFile = semRecDir + FILE_SEPARATOR + name + SEM_SUFFIX;
IBMRAS_DEBUG_1(debug, "baseFile = %s", baseFile.c_str());
int fileExists = ibmras::common::util::createFile(baseFile);
IBMRAS_DEBUG_2(debug,"ibmras::common::util::createFile(%s) = %d", baseFile.c_str(), fileExists);
if (fileExists) {
uint8_t hc_id = 0x8c; // unique prefix to identify semaphore IDs as Health Centerol
/* Generate the key for creating the semaphore*/
*handle = ftok(baseFile.c_str(), hc_id);
IBMRAS_DEBUG_2(info, "IPCkey for %s is %x", baseFile.c_str(), *handle);
if (-1 == *handle) {
IBMRAS_LOG_1(warning, "Unable to obtain semaphore IPC key: %s", strerror(errno));
return false;
} else {
return true;
}
} else {
IBMRAS_LOG_1(warning, "Failed to create file %s; semaphore key not created", baseFile.c_str());
return false;
}
} else {
IBMRAS_LOG_1(warning, "Failed to create directory %s; semaphore key not created", semRecDir.c_str());
return false;
}
}

Semaphore::Semaphore(uint32 initial, uint32 max, const char* sourceName) {
name = sourceName;
// handle will store the IPC key needed to open a semaphore
handle = new key_t*;
if (!getIPCKey((reinterpret_cast<key_t*>(handle)), name)) {
handle = NULL;
}
}

int Semaphore::open(int* semid) {
IBMRAS_DEBUG_1(fine,"in thread.cpp creating semaphore for source %s", name.c_str());
if (handle) {
int semflags_create;
int semflags_open;
uint32 permissions = 0666;
/* trim the permissions down to 9 least significant bits */
permissions &= 0777;
semflags_open = SEMFLAGS_OPEN | permissions;
semflags_create = SEMFLAGS_CREATE | permissions;
// attempt to create semaphore
*semid = semget(*(reinterpret_cast<key_t*>(handle)), 1, semflags_create);
if (-1 == *semid) {
if (EEXIST == errno) {
IBMRAS_DEBUG(debug, "Semaphore already exists; attempt to open");
*semid = semget(*(reinterpret_cast<key_t*>(handle)), 1, semflags_open);
}
if (-1 == *semid) {
IBMRAS_LOG_1(warning, "Unable to obtain semaphore: ", strerror(errno));
return SEM_OPEN_FAILED;
} else {
IBMRAS_DEBUG_1(debug, "Semaphore %d opened", *semid);
return SEM_OPENED;
}
} else {
IBMRAS_DEBUG_1(debug, "Semaphore %d created", *semid);
return SEM_CREATED;
}
} else {
IBMRAS_LOG(warning, "Unable to obtain semaphore: invalid key");
// attempt to generate a new key for next time
handle = new key_t*;
if (!getIPCKey((reinterpret_cast<key_t*>(handle)), name)) {
handle = NULL;
}
return SEM_OPEN_FAILED;
}
}

void Semaphore::inc() {
IBMRAS_DEBUG(finest, "Incrementing semaphore ticket count");
if (handle) {
sem_post(reinterpret_cast<int*>(handle));
int semid;
int result;
result = Semaphore::open(&semid);
if (result) {
if (SEM_CREATED == result) {
// shouldn't be incrementing a semaphore that doesn't already exist - probably shutting down
sem_destroy(&semid);
} else {
IBMRAS_DEBUG_2(finest, "Incrementing semaphore %d (%s)", semid, name.c_str());
sem_post(&semid);
}
}
}

bool Semaphore::wait(uint32 timeout) {
int semid;
int result;
struct timespec t;

while (!handle) {
sleep(timeout); /* wait for the semaphore to be established */
}

t.tv_sec = timeout; /* configure the sleep interval */
t.tv_nsec = 0;

result = sem_timedwait(reinterpret_cast<int*>(handle), &t);
if (!result) {
IBMRAS_DEBUG(finest, "semaphore posted");
return true;
}

IBMRAS_DEBUG(finest, "possible semaphore timeout");
return (errno != EAGAIN);
result = Semaphore::open(&semid);
if (result) {
t.tv_sec = timeout; /* configure the sleep interval */
t.tv_nsec = 0;

result = sem_timedwait(&semid, &t);
if (!result) {
IBMRAS_DEBUG_2(finest, "Process %s waiting for semaphore %d", name.c_str(), semid);
return true;
}
IBMRAS_DEBUG_1(finest, "possible timeout for semaphore %d", semid);
return (errno != EAGAIN);
} else {
IBMRAS_LOG_1(warning, "Unable to obtain semaphore to wait on: %s", strerror(errno));
return false;
}
}

Semaphore::~Semaphore() {
sem_destroy(reinterpret_cast<int*>(handle));
delete (int*) handle;
int semid;
int result;
int n_count;
int z_count;

result = Semaphore::open(&semid);
if (result) {
// how many processes are waiting for the semaphore?
n_count = semctl(semid, 0, GETNCNT);
z_count = semctl(semid, 0, GETZCNT);
if (-1 == n_count || -1 == z_count) {
IBMRAS_LOG_1(warning, "Unable to access semaphore info: %s", strerror(errno));
} else {
// if we're the last semaphore users, no one should be waiting
if (0 == (n_count + z_count)) {
IBMRAS_DEBUG_2(debug, "Destroying semaphore %d for %s", semid, name.c_str());
sem_destroy(&semid);
}
}
}
delete handle;
}

}
Expand Down
Loading

0 comments on commit 6a72c0a

Please sign in to comment.