From c2b22d7bc9332c6ae0399d20cdd32201b8f3ed99 Mon Sep 17 00:00:00 2001 From: "Jorge Chamorro Bieling / @jorgechamorro" Date: Sat, 31 Dec 2016 00:47:44 +0100 Subject: [PATCH] v0.1.13 --- LICENSE | 4 +- README.md | 21 +- package.json | 4 +- src/threads_a_gogo.cc | 1230 ++++++++++------- test/02_events_storm.js | 17 +- test/all.js | 2 +- test/test22_create_destroy_loop.js | 18 +- ...st95_eval_to_emit_to_callback_benchmark.js | 2 +- ...t96_destroy_loop_using_destroy_callbaks.js | 6 +- 9 files changed, 781 insertions(+), 523 deletions(-) diff --git a/LICENSE b/LICENSE index c0c9ee1..6b1835a 100644 --- a/LICENSE +++ b/LICENSE @@ -2,8 +2,8 @@ Threads_a_gogo license follows: ==== -Copyright 2011 Proyectos Equis Ka, s.l., Jorge Chamorro Bieling and other -contributors. See the AUTHORS file. All rights reserved. +Copyright 2011 Proyectos Equis Ka, s.l., Jorge Chamorro Bieling. +All rights reserved. ==== diff --git a/README.md b/README.md index 4536e49..2ddd900 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![build status](https://travis-ci.org/xk/node-threads-a-gogo.svg?branch=master)](https://travis-ci.org/xk/node-threads-a-gogo) +[![build status](https://travis-ci.org/xk/node-threads-a-gogo.svg?branch=SYNC)](https://travis-ci.org/xk/node-threads-a-gogo) [![npm version](https://badge.fury.io/js/threads_a_gogo.svg)](https://www.npmjs.com/package/threads_a_gogo) *** @@ -19,12 +19,15 @@ From source: cd node-threads-a-gogo # One of node-gyp rebuild + node test/all.js # or npm install + node test/all.js # or - node-waf configure build install + node-waf configure build install test # Depending of what wersion of node you've got. - # THREADS_A_GOGO CURRENTLY (v0.1.12) RUNS ON NODES v0.5.1 TO v0.10.48 + # + # THREADS_A_GOGO CURRENTLY (v0.1.13) RUNS ON NODES v0.5.1 TO v6.9.2 Basic functionality test: @@ -42,7 +45,7 @@ Basic functionality test: 27.OK.WAITING FOR DESTROY CB 28.OK.29.DESTROY CB OK END - THREADS_A_GOGO v0.1.12 BASIC FUNCTIONALITY TEST: OK, IT WORKS! + THREADS_A_GOGO v0.1.13 BASIC FUNCTIONALITY TEST: OK, IT WORKS! To include the module in your project: @@ -50,7 +53,7 @@ To include the module in your project: **You need a node with a v8 >= 3.2.4 to run this module. Any node >= 0.5.1 comes with a v8 >= 3.2.4.** -The module **runs fine, though, in any node >= 0.2.0** as long as you build it with a v8 >= 3.2.4. To do that you simply have to replace /node/deps/v8 with a newer version of v8 and recompile it (node). To get any version of node goto http://nodejs.org/dist/, and for v8 goto http://github.com/v8/v8, click on "branch", select the proper tag (>= 3.2.4), and download the .zip. +The module **runs fine, though, in any node >= 0.1.13** as long as you build it with a v8 >= 3.2.4, [see here](https://nodejs.org/en/download/releases/). To do that you simply have to replace /node/deps/v8 with a newer version of v8 and recompile it (node). To get any version of node goto http://nodejs.org/dist/, and for v8 goto http://github.com/v8/v8, click on "branch", select the proper tag (>= 3.2.4), and download the .zip. ## Intro @@ -263,7 +266,7 @@ tagg= require('threads_a_gogo') -> tagg object { create: [Function], createPool: [Function: createPool], - version: '0.1.12' } + version: '0.1.13' } ``` ### .create() @@ -286,7 +289,7 @@ thread= tagg.create() -> thread object emit: [Function: emit], destroy: [Function: destroy], id: 0, - version: '0.1.12', + version: '0.1.13', on: [Function: on], once: [Function: once], _on: {}, @@ -348,7 +351,7 @@ Inside every thread .create()d by threads_a_gogo, there's a global `thread` obje thread (a global) -> { id: 0, - version: '0.1.12', + version: '0.1.13', on: [Function: on], once: [Function: once], emit: [Function: emit], @@ -438,7 +441,7 @@ pool= tagg.createPool( numbreOfThreads ) -> emit: [Function: emit], destroy: [Function: destroy], id: 0, - version: '0.1.12', + version: '0.1.13', on: [Function: on], once: [Function: once], _on: {}, diff --git a/package.json b/package.json index 37e1231..fbd8861 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "threads_a_gogo", - "version": "0.1.12", + "version": "0.1.13", "main": "build/Release/threads_a_gogo.node", "description": "██ Simple and fast JavaScript threads for Node.js ██", "keywords": [ @@ -31,7 +31,7 @@ }, "os": ["macos", "linux", "darwin"], "engines": { - "node": ">=0.5.1 <0.11" + "node": ">=0.5.1 <=6.9.2" }, "devDependencies": { "test": ">=0.1.8" } } diff --git a/src/threads_a_gogo.cc b/src/threads_a_gogo.cc index 1bf92b2..ae75a33 100644 --- a/src/threads_a_gogo.cc +++ b/src/threads_a_gogo.cc @@ -1,17 +1,16 @@ -//2011-11 Proyectos Equis Ka, s.l., jorge@jorgechamorro.com +//2011-11, 2016-12 Proyectos Equis Ka, s.l., jorge@jorgechamorro.com //threads_a_gogo.cc +#include #include #include -#include -#include +#include #include #include -#include -#include #include -#include #include +#include +#include //using namespace node; //using namespace v8; @@ -31,15 +30,34 @@ #define WAKEUP_NODE_EVENT_LOOP ev_async_send(EV_DEFAULT_UC_ &thread->async_watcher); #endif +#define TAGG_USE_NEW_API +#if (NODE_MAJOR_VERSION == 0) && (NODE_MINOR_VERSION < 12) + #undef TAGG_USE_NEW_API +#endif + +#ifdef TAGG_USE_NEW_API + #define TAGG_USE_ALLOCATOR + #if (NODE_MAJOR_VERSION < 3) + #undef TAGG_USE_ALLOCATOR + #endif +#endif + +#ifdef TAGG_USE_ALLOCATOR + #define TAGG_USE_DEFAULT_ALLOCATOR + #if (NODE_MAJOR_VERSION < 7) + #undef TAGG_USE_DEFAULT_ALLOCATOR + #endif +#endif + //Macros END //Type definitions BEGIN typedef enum eventTypes { - eventTypeNone = 0, - eventTypeEmit, - eventTypeEval, - eventTypeLoad + kEventTypeEmpty = 0, + kEventTypeEmit, + kEventTypeEval, + kEventTypeLoad } eventTypes; struct emitStruct { @@ -67,7 +85,7 @@ struct eventsQueueItem { int eventType; eventsQueueItem* next; unsigned long serial; - v8::Persistent callback; + v8::Persistent callback; union { emitStruct emit; evalStruct eval; @@ -77,11 +95,8 @@ struct eventsQueueItem { struct eventsQueue { eventsQueueItem* first; - eventsQueueItem* pullPtr; - union { - eventsQueueItem* pushPtr; - eventsQueueItem* last; - }; + eventsQueueItem* last; + pthread_mutex_t mutex; }; typedef enum killTypes { @@ -99,10 +114,10 @@ typedef struct typeThread { long int id; pthread_t thread; - int IDLE; - int ended; - int sigkill; - int destroyed; + volatile int IDLE; + volatile int ended; + volatile int sigkill; + int destroyed; int hasDestroyCallback; int hasIdleEventsListener; unsigned long threadMagicCookie; @@ -114,9 +129,9 @@ typedef struct typeThread { pthread_mutex_t idle_mutex; v8::Isolate* isolate; - v8::Persistent nodeObject; - v8::Persistent nodeDispatchEvents; - v8::Persistent destroyCallback; + v8::Persistent nodeObject; + v8::Persistent destroyCallback; + v8::Persistent nodeDispatchEvents; } typeThread; @@ -127,28 +142,41 @@ typedef struct typeThread { static inline void beep (void); static inline void qPush (eventsQueueItem*, eventsQueue*); static inline eventsQueueItem* qPull (eventsQueue*); -static inline eventsQueueItem* qUsed (eventsQueue*); -static inline eventsQueueItem* nuQitem (eventsQueue*); +static inline eventsQueueItem* nuQitem (); static eventsQueue* nuQueue (void); -static void qitemStorePush (eventsQueueItem*); -static eventsQueueItem* qitemStorePull (void); -static eventsQueue* qitemStoreInit (void); -static void destroyQueue (eventsQueue*); +static eventsQueue* destroyQueue (eventsQueue*); static inline typeThread* isAThread (v8::Handle); static inline void wakeUpThread (typeThread*, int); -static v8::Handle Puts (const v8::Arguments &); static void* threadBootProc (void*); static inline char* o2cstr (v8::Handle); static void eventLoop (typeThread*); static void notifyIdle (typeThread*); +static void cleanUpAfterThreadUVCallback (uv_handle_t*); static void cleanUpAfterThread (typeThread*); static void Callback ( #ifdef TAGG_USE_LIBUV uv_async_t* + #if defined(UV_VERSION_MAJOR) && (UV_VERSION_MAJOR == 0) + , int + #endif #else - EV_P_ ev_async* + EV_P_ ev_async*, int #endif - , int); +); + +#ifdef TAGG_USE_NEW_API +static void Puts (const v8::FunctionCallbackInfo&); +static void NOP (const v8::FunctionCallbackInfo&); +static void Destroy (const v8::FunctionCallbackInfo&); +static void Eval (const v8::FunctionCallbackInfo&); +static void Load (const v8::FunctionCallbackInfo&); +static inline void pushEmitEvent (eventsQueue*, const v8::FunctionCallbackInfo&); +static void processEmit (const v8::FunctionCallbackInfo&); +static void threadEmit (const v8::FunctionCallbackInfo&); +static void Create (const v8::FunctionCallbackInfo&); +#else +static v8::Handle Puts (const v8::Arguments &); +static v8::Handle NOP (const v8::Arguments &); static v8::Handle Destroy (const v8::Arguments &); static v8::Handle Eval (const v8::Arguments &); static v8::Handle Load (const v8::Arguments &); @@ -156,6 +184,8 @@ static inline void pushEmitEvent (eventsQueue*, const v8::Arguments &); static v8::Handle processEmit (const v8::Arguments &); static v8::Handle threadEmit (const v8::Arguments &); static v8::Handle Create (const v8::Arguments &); +#endif + void Init (v8::Handle); //Prototypes END @@ -163,16 +193,14 @@ void Init (v8::Handle); //Globals BEGIN -const char* k_TAGG_VERSION= "0.1.12"; +const char* k_TAGG_VERSION= "0.1.13"; static int TAGG_DEBUG= 0; static bool useLocker; static long int threadsCtr= 0; -static v8::Persistent boot_js; -static v8::Persistent id_symbol; -static v8::Persistent version_symbol; +static eventsQueue* qitemsStore= NULL; +static v8::Persistent boot_js; static v8::Persistent threadTemplate; -static eventsQueue* qitemStore; static unsigned long serial= 0; #include "boot.js.c" @@ -202,7 +230,7 @@ cat ../../../src/nextTick.js | ./minify kNextTick_js > ../../../src/kNextTick_js //jejeje static inline void beep (void) { - printf("\a"), fflush (stdout); // que es lo mismo que \x07 + printf("\x07"), fflush (stdout); // que es lo mismo que \a } @@ -211,14 +239,21 @@ static inline void beep (void) { -//Se puede usar en cualquier thread pero solo si pasas la cola apropiada + static inline void qPush (eventsQueueItem* qitem, eventsQueue* queue) { TAGG_DEBUG && printf("Q_PUSH\n"); + pthread_mutex_lock(&queue->mutex); qitem->next= NULL; - assert(queue->pushPtr != NULL); - assert(queue->pushPtr->next == NULL); - queue->pushPtr->next= qitem; - queue->pushPtr= qitem; + if (queue->last) { + assert(queue->last->next == NULL); + queue->last->next= qitem; + } + else { + assert(queue->first == NULL); + queue->first= qitem; + } + queue->last= qitem; + pthread_mutex_unlock(&queue->mutex); } @@ -227,40 +262,23 @@ static inline void qPush (eventsQueueItem* qitem, eventsQueue* queue) { -//Se puede usar en cualquier thread pero solo si pasas la cola apropiada + static eventsQueueItem* qPull (eventsQueue* queue) { TAGG_DEBUG && printf("Q_PULL\n"); - eventsQueueItem* qitem= queue->pullPtr; - assert(qitem != NULL); - while ((qitem->eventType == eventTypeNone) && qitem->next) { - qitem= qitem->next; - queue->pullPtr= qitem; - } - if (qitem->eventType == eventTypeNone) - return NULL; - else - return qitem; -} - - - - - - - -//Se puede usar en cualquier thread pero solo si pasas la cola apropiada -static inline eventsQueueItem* qUsed (eventsQueue* queue) { - TAGG_DEBUG && printf("Q_USED\n"); + pthread_mutex_lock(&queue->mutex); eventsQueueItem* qitem= NULL; - assert(queue->first != NULL); - assert(queue->pullPtr != NULL); - if (queue->first != queue->pullPtr) { + if (queue->first != NULL) { qitem= queue->first; - assert(qitem->next != NULL); - assert(queue->first != queue->pullPtr); queue->first= qitem->next; + if (queue->last == qitem) { + queue->last= qitem->next; + } qitem->next= NULL; } + else { + assert(queue->last == NULL); + } + pthread_mutex_unlock(&queue->mutex); return qitem; } @@ -270,18 +288,15 @@ static inline eventsQueueItem* qUsed (eventsQueue* queue) { -//Se puede usar en cualquier thread pero solo si pasas la cola apropiada -static inline eventsQueueItem* nuQitem (eventsQueue* queue) { + +static inline eventsQueueItem* nuQitem () { TAGG_DEBUG && printf("Q_NU_Q_ITEM\n"); - eventsQueueItem* qitem= NULL; - if (queue) qitem= qUsed(queue); + eventsQueueItem* qitem= qPull(qitemsStore); if (!qitem) { qitem= (eventsQueueItem*) calloc(1, sizeof(eventsQueueItem)); - //beep(); + beep(); } qitem->serial= serial++; - qitem->eventType= eventTypeNone; - qitem->next= NULL; return qitem; } @@ -291,22 +306,12 @@ static inline eventsQueueItem* nuQitem (eventsQueue* queue) { -//Sólo se debe usar en main/node's thread ! + static eventsQueue* nuQueue (void) { TAGG_DEBUG && printf("Q_NU_QUEUE\n"); eventsQueue* queue= (eventsQueue*) calloc(1, sizeof(eventsQueue)); - eventsQueueItem* qitem= qitemStorePull(); - if (!qitem) qitem= nuQitem(NULL); - queue->first= qitem; - qitem->eventType= eventTypeNone; - int i= 96; - while (--i) { - qitem->next= qitemStorePull(); - if (!qitem->next) qitem->next= nuQitem(NULL); - (qitem= qitem->next)->eventType= eventTypeNone; - } - qitem->next= NULL; - queue->pullPtr= queue->pushPtr= qitem; + queue->first= queue->last= NULL; + pthread_mutex_init(&(queue->mutex), NULL); return queue; } @@ -316,73 +321,13 @@ static eventsQueue* nuQueue (void) { -//Sólo se debe usar en main/node's thread ! -static void qitemStorePush (eventsQueueItem* qitem) { - TAGG_DEBUG && printf("Q_ITEM_STORE_PUSH\n"); - qitem->next= NULL; - assert(qitemStore->last != NULL); - assert(qitemStore->last->next == NULL); - qitemStore->last->next= qitem; - qitemStore->last= qitem; -} - - - - - - - -//Sólo se debe usar en main/node's thread ! -static eventsQueueItem* qitemStorePull (void) { - TAGG_DEBUG && printf("Q_ITEM_STORE_PULL\n"); - eventsQueueItem* qitem= NULL; - assert(qitemStore->first != NULL); - assert(qitemStore->last != NULL); - if (qitemStore->first != qitemStore->last) { - qitem= qitemStore->first; - assert(qitem->next != NULL); - qitemStore->first= qitem->next; - } - return qitem; -} - - - - - - -//Sólo se debe usar en main/node's thread ! -static eventsQueue* qitemStoreInit (void) { - TAGG_DEBUG && printf("Q_ITEM_STORE_INIT\n"); - eventsQueue* queue= (eventsQueue*) calloc(1, sizeof(eventsQueue)); - eventsQueueItem* qitem= queue->first= (eventsQueueItem*) calloc(1, sizeof(eventsQueueItem)); - int i= 2048; - while (i--) { - qitem->next= (eventsQueueItem*) calloc(1, sizeof(eventsQueueItem)); - qitem= qitem->next; - } - queue->last= qitem; - return queue; -} - - - - - - - -//Sólo se debe usar en main/node's thread ! -static void destroyQueue (eventsQueue* queue) { - TAGG_DEBUG && printf("Q_DESTROY_QUEUE\n"); +static eventsQueue* destroyQueue (eventsQueue* queue) { eventsQueueItem* qitem; - assert(queue->first != NULL); - while (queue->first) { - qitem= queue->first; - queue->first= qitem->next; - qitemStorePush(qitem); - } + while ((qitem= qPull(queue))) qPush(qitem, qitemsStore); + pthread_mutex_destroy(&(queue->mutex)); free(queue); + return NULL; } @@ -393,15 +338,18 @@ static void destroyQueue (eventsQueue* queue) { //Llamar a un método de la thread con el 'this' (receiver) mal puesto es bombazo seguro, por eso esto. static typeThread* isAThread (v8::Handle receiver) { - typeThread* thread; + typeThread* thread= NULL; if (receiver->IsObject()) { - if (receiver->InternalFieldCount() == 1) { +#ifdef TAGG_USE_NEW_API + v8::Local ptr= receiver->GetHiddenValue(v8::String::NewFromUtf8(v8::Isolate::GetCurrent(), "ptr")); + thread= (typeThread*) ((uintptr_t) ptr->ToNumber()->Value()); +#else + if (receiver->InternalFieldCount() == 1) thread= (typeThread*) receiver->GetPointerFromInternalField(0); - assert(thread != NULL); - if (thread && (thread->threadMagicCookie == kThreadMagicCookie)) { - return thread; - } - } +#endif + assert(thread != NULL); + assert(thread->threadMagicCookie == kThreadMagicCookie); + return thread; } return NULL; } @@ -450,21 +398,23 @@ static void wakeUpThread (typeThread* thread, int sigkill) { +#if defined(TAGG_USE_ALLOCATOR) +#if !defined(TAGG_USE_DEFAULT_ALLOCATOR) +//See https://github.com/v8/v8/blob/1440cd3d833c7bca7777232b5fd754352010aa3e/samples/shell.cc#L66 -//printf de andar por casa -static v8::Handle Puts (const v8::Arguments &args) { - int i= 0; - while (i < args.Length()) { - v8::HandleScope scope; - v8::String::Utf8Value c_str(args[i]); - fputs(*c_str, stdout); - i++; +class ArrayBufferAllocator : public v8::ArrayBuffer::Allocator { + public: + virtual void* Allocate(size_t length) { + void* data = AllocateUninitialized(length); + return data == NULL ? data : memset(data, 0, length); } - fflush(stdout); - return v8::Undefined(); -} + virtual void* AllocateUninitialized(size_t length) { return malloc(length); } + virtual void Free(void* data, size_t) { free(data); } +}; +#endif +#endif @@ -477,36 +427,52 @@ static v8::Handle Puts (const v8::Arguments &args) { static void* threadBootProc (void* arg) { int dummy; + typeThread* thread= (typeThread*) arg; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &dummy); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &dummy); - typeThread* thread= (typeThread*) arg; + TAGG_DEBUG && printf("THREAD %ld BOOTPROC ENTER\n", thread->id); - TAGG_DEBUG && printf("THREAD %ld BOOT ENTER\n", thread->id); + assert(v8::Isolate::GetCurrent() == NULL); +#ifdef TAGG_USE_ALLOCATOR + + v8::Isolate::CreateParams create_params; + + #ifdef TAGG_USE_DEFAULT_ALLOCATOR + create_params.array_buffer_allocator = v8::ArrayBuffer::Allocator::NewDefaultAllocator(); + #else + ArrayBufferAllocator wtf; + create_params.array_buffer_allocator= &wtf; + #endif + + thread->isolate= v8::Isolate::New(create_params); + +#else thread->isolate= v8::Isolate::New(); +#endif + +#ifdef TAGG_USE_NEW_API + thread->isolate->SetData(1, thread); +#else thread->isolate->SetData(thread); - +#endif + if (useLocker) { - //TAGG_DEBUG && printf("**** USING LOCKER: YES\n"); - v8::Locker myLocker(thread->isolate); - //v8::Isolate::Scope isolate_scope(thread->isolate); + v8::Locker wtf(thread->isolate); eventLoop(thread); } - else { + else eventLoop(thread); - } - TAGG_DEBUG && printf("THREAD %ld BOOT EXIT #1\n", thread->id); - thread->isolate->Exit(); - TAGG_DEBUG && printf("THREAD %ld BOOT EXIT #2\n", thread->id); + assert(v8::Isolate::GetCurrent() == NULL); + thread->isolate->Dispose(); - TAGG_DEBUG && printf("THREAD %ld BOOT EXIT #3\n", thread->id); thread->ended= 1; - TAGG_DEBUG && printf("THREAD %ld BOOT EXIT #4 WAKEUP_NODE_EVENT_LOOP\n", thread->id); WAKEUP_NODE_EVENT_LOOP - TAGG_DEBUG && printf("THREAD %ld BOOT EXIT #5 ENDED\n", thread->id); - return 0; + + TAGG_DEBUG && printf("THREAD %ld BOOTPROC EXIT\n", thread->id); + return NULL; } @@ -535,13 +501,29 @@ static inline char* o2cstr (v8::Handle o) { // The thread's eventloop runs in the thread(s) not in node's main thread static void eventLoop (typeThread* thread) { TAGG_DEBUG && printf("THREAD %ld EVENTLOOP ENTER\n", thread->id); - +#ifdef TAGG_USE_NEW_API + v8::Isolate* iso= thread->isolate; + v8::Isolate::Scope isolate_scope(iso); + v8::HandleScope scope1(iso); + v8::Local context= v8::Context::New(iso); + v8::Context::Scope context_scope(context); + + v8::Local global= context->Global(); + global->Set(v8::String::NewFromUtf8(iso, "puts"), v8::FunctionTemplate::New(iso, Puts)->GetFunction()); + v8::Local threadObject= v8::Object::New(iso); + threadObject->Set(v8::String::NewFromUtf8(iso, "id"), v8::Number::New(iso, thread->id)); + threadObject->Set(v8::String::NewFromUtf8(iso, "version"),v8::String::NewFromUtf8(iso, k_TAGG_VERSION)); + threadObject->Set(v8::String::NewFromUtf8(iso, "emit"), v8::FunctionTemplate::New(iso, threadEmit)->GetFunction()); + v8::Local script= v8::Local::New(iso, v8::Script::Compile(v8::String::NewFromUtf8(iso, kBoot_js))->Run()->ToObject()); + v8::Local r= script->CallAsFunction(threadObject, 0, NULL)->ToObject(); + v8::Local dnt= r->Get(v8::String::NewFromUtf8(iso, "dnt"))->ToObject(); + v8::Local dev= r->Get(v8::String::NewFromUtf8(iso, "dev"))->ToObject(); +#else thread->isolate->Enter(); v8::Persistent context= v8::Context::New(); context->Enter(); { v8::HandleScope scope1; - v8::Local global= context->Global(); global->Set(v8::String::NewSymbol("puts"), v8::FunctionTemplate::New(Puts)->GetFunction()); v8::Local threadObject= v8::Object::New(); @@ -552,202 +534,222 @@ static void eventLoop (typeThread* thread) { v8::Local r= script->CallAsFunction(threadObject, 0, NULL)->ToObject(); v8::Local dnt= r->Get(v8::String::NewSymbol("dnt"))->ToObject(); v8::Local dev= r->Get(v8::String::NewSymbol("dev"))->ToObject(); - +#endif + //SetFatalErrorHandler(FatalErrorCB); - - while (1) { - double ntql; - eventsQueueItem* qitem= NULL; - eventsQueueItem* event; - eventsQueueItem* qitem3; - v8::TryCatch onError; + double ntql; + eventsQueueItem* event= NULL; + while (1) { + v8::TryCatch onError; + TAGG_DEBUG && printf("THREAD %ld IN WHILE(1)\n", thread->id); + + if (thread->sigkill == kKillRudely) break; + + if (!event) event= qPull(thread->threadEventsQueue); + if (event) { + + TAGG_DEBUG && printf("THREAD %ld QITEM #%ld\n", thread->id, event->serial); + + if (event->eventType == kEventTypeLoad) { +#ifdef TAGG_USE_NEW_API + v8::HandleScope scope(iso); +#else + v8::HandleScope scope; +#endif - TAGG_DEBUG && printf("THREAD %ld BEFORE WHILE\n", thread->id); - - while (1) { - - TAGG_DEBUG && printf("THREAD %ld WHILE\n", thread->id); - - if (thread->sigkill == kKillRudely) break; - else if (qitem || (qitem= qPull(thread->threadEventsQueue))) { - - event= qitem; - qitem= NULL; - TAGG_DEBUG && printf("THREAD %ld QITEM\n", thread->id); - if (event->eventType == eventTypeLoad) { - v8::HandleScope scope; - - v8::Local script; - v8::Local resultado; - - TAGG_DEBUG && printf("THREAD %ld QITEM LOAD\n", thread->id); - - char* buf= NULL; - assert(event->load.path != NULL); - FILE* fp= fopen(event->load.path, "rb"); - free(event->load.path); - - if (fp) { - fseek(fp, 0, SEEK_END); - long len= ftell(fp); - rewind(fp); //fseek(fp, 0, SEEK_SET); - buf= (char*) calloc(len + 1, sizeof(char)); // +1 to get null terminated string - fread(buf, len, 1, fp); - fclose(fp); - } - - if (buf != NULL) { - script= v8::Script::Compile(v8::String::New(buf)); - free(buf); - if (!onError.HasCaught()) resultado= script->Run(); - event->load.error= onError.HasCaught() ? 1 : 0; - } - else { - event->load.error= 2; - } - - if (event->load.hasCallback) { - qitem3= nuQitem(thread->processEventsQueue); - qitem3->eval.error= event->load.error; - if (!qitem3->eval.error) - qitem3->eval.resultado= o2cstr(resultado); - else if (qitem3->eval.error == 1) - qitem3->eval.resultado= o2cstr(onError.Exception()); - else - qitem3->eval.resultado= strdup("fopen(path) error"); - qitem3->callback= event->callback; - qitem3->load.hasCallback= 1; - qitem3->eventType= eventTypeEval; - qPush(qitem3, thread->processEventsQueue); - WAKEUP_NODE_EVENT_LOOP - } - - if (onError.HasCaught()) onError.Reset(); - - event->eventType= eventTypeNone; - } - else if (event->eventType == eventTypeEval) { - v8::HandleScope scope; - - v8::Local script; - v8::Local resultado; - - TAGG_DEBUG && printf("THREAD %ld QITEM EVAL\n", thread->id); - - script= v8::Script::New(v8::String::New(event->eval.scriptText)); - free(event->eval.scriptText); - - if (!onError.HasCaught()) - resultado= script->Run(); - event->eval.error= onError.HasCaught() ? 1 : 0; - - if (event->eval.hasCallback) { - qitem3= nuQitem(thread->processEventsQueue); - qitem3->eval.error= event->eval.error; - if (!qitem3->eval.error) - qitem3->eval.resultado= o2cstr(resultado); - else - qitem3->eval.resultado= o2cstr(onError.Exception()); - qitem3->callback= event->callback; - qitem3->eval.hasCallback= 1; - qitem3->eventType= eventTypeEval; - qPush(qitem3, thread->processEventsQueue); - WAKEUP_NODE_EVENT_LOOP - } - - if (onError.HasCaught()) onError.Reset(); - - event->eventType= eventTypeNone; - } - else if (event->eventType == eventTypeEmit) { - v8::HandleScope scope; - - v8::Local array; - v8::Local args[2]; - - TAGG_DEBUG && printf("THREAD %ld QITEM EVENT #%ld\n", thread->id, event->serial); - - assert(event->emit.eventName != NULL); - args[0]= v8::String::New(event->emit.eventName); - args[1]= array= v8::Array::New(event->emit.argc); - if (event->emit.argc) { - int i= 0; - while (i < event->emit.argc) { - array->Set(i, v8::String::New(event->emit.argv[i])); - free(event->emit.argv[i]); - i++; - } - free(event->emit.argv); - } - - dev->CallAsFunction(global, 2, args); - free(event->emit.eventName); - event->eventType= eventTypeNone; - } - else { - assert(0); - } - } + v8::Local script; + v8::Local resultado; + + TAGG_DEBUG && printf("THREAD %ld QITEM LOAD\n", thread->id); + + char* buf= NULL; + assert(event->load.path != NULL); + FILE* fp= fopen(event->load.path, "rb"); + free(event->load.path); + + if (fp) { + fseek(fp, 0, SEEK_END); + long len= ftell(fp); + rewind(fp); //fseek(fp, 0, SEEK_SET); + buf= (char*) calloc(len + 1, sizeof(char)); // +1 to get null terminated string + fread(buf, len, 1, fp); + fclose(fp); + } + + if (buf != NULL) { +#ifdef TAGG_USE_NEW_API + script= v8::Script::Compile(v8::String::NewFromUtf8(iso, buf)); +#else + script= v8::Script::Compile(v8::String::New(buf)); +#endif + free(buf); + if (!onError.HasCaught()) resultado= script->Run(); + event->load.error= onError.HasCaught() ? 1 : 0; + } + else { + event->load.error= 2; + } + + if (event->load.hasCallback) { + if (!event->load.error) + event->eval.resultado= o2cstr(resultado); + else if (event->load.error == 1) + event->eval.resultado= o2cstr(onError.Exception()); else - TAGG_DEBUG && printf("THREAD %ld NO QITEM\n", thread->id); - - if (thread->sigkill == kKillRudely) break; - else { - v8::HandleScope scope; - TAGG_DEBUG && printf("THREAD %ld NTQL\n", thread->id); - ntql= dnt->CallAsFunction(global, 0, NULL)->ToNumber()->Value(); - if (onError.HasCaught()) onError.Reset(); + event->eval.resultado= strdup("fopen(path) error"); + event->eventType= kEventTypeEval; + qPush(event, thread->processEventsQueue); + WAKEUP_NODE_EVENT_LOOP + } + else { + qPush(event, qitemsStore); + } + + if (onError.HasCaught()) onError.Reset(); + } + else if (event->eventType == kEventTypeEval) { +#ifdef TAGG_USE_NEW_API + v8::HandleScope scope(iso); +#else + v8::HandleScope scope; +#endif + + v8::Local script; + v8::Local resultado; + + TAGG_DEBUG && printf("THREAD %ld QITEM EVAL\n", thread->id); +#ifdef TAGG_USE_NEW_API + script= v8::Script::Compile(v8::String::NewFromUtf8(iso, event->eval.scriptText)); +#else + script= v8::Script::Compile(v8::String::New(event->eval.scriptText)); +#endif + free(event->eval.scriptText); + + if (!onError.HasCaught()) resultado= script->Run(); + if (event->eval.hasCallback) { + event->eval.error= onError.HasCaught() ? 1 : 0; + if (!event->eval.error) + event->eval.resultado= o2cstr(resultado); + else + event->eval.resultado= o2cstr(onError.Exception()); + event->eventType= kEventTypeEval; + qPush(event, thread->processEventsQueue); + WAKEUP_NODE_EVENT_LOOP + } + else { + qPush(event, qitemsStore); + } + + if (onError.HasCaught()) onError.Reset(); + } + else if (event->eventType == kEventTypeEmit) { +#ifdef TAGG_USE_NEW_API + v8::HandleScope scope(iso); +#else + v8::HandleScope scope; +#endif + v8::Local array; + v8::Local args[2]; + + TAGG_DEBUG && printf("THREAD %ld QITEM EVENT #%ld\n", thread->id, event->serial); + + assert(event->emit.eventName != NULL); + +#ifdef TAGG_USE_NEW_API + args[0]= v8::String::NewFromUtf8(iso, event->emit.eventName); + free(event->emit.eventName); + args[1]= array= v8::Array::New(iso, event->emit.argc); + if (event->emit.argc) { + int i= 0; + while (i < event->emit.argc) { + array->Set(i, v8::String::NewFromUtf8(iso, event->emit.argv[i])); + free(event->emit.argv[i]); + i++; } - - if (thread->sigkill == kKillRudely) break; - else if (!ntql && !(qitem || (qitem= qPull(thread->threadEventsQueue)))) { - TAGG_DEBUG && printf("THREAD %ld EXIT WHILE: NO NTQL AND NO QITEM\n", thread->id); - break; + free(event->emit.argv); + } +#else + args[0]= v8::String::New(event->emit.eventName); + free(event->emit.eventName); + args[1]= array= v8::Array::New(event->emit.argc); + if (event->emit.argc) { + int i= 0; + while (i < event->emit.argc) { + array->Set(i, v8::String::New(event->emit.argv[i])); + free(event->emit.argv[i]); + i++; } - + free(event->emit.argv); } - - if (thread->sigkill) break; - - v8::V8::IdleNotification(); - - TAGG_DEBUG && printf("THREAD %ld BEFORE MUTEX\n", thread->id); - //cogemos el lock para - //por un lado poder mirar si hay cosas en la queue sabiendo - //que nadie la está tocando - //y por otro lado para poder tocar thread->IDLE sabiendo - //que nadie la está mirando mientras la tocamos. - pthread_mutex_lock(&thread->idle_mutex); - TAGG_DEBUG && printf("THREAD %ld TIENE threadEventsQueue_MUTEX\n", thread->id); - //aquí tenemos acceso exclusivo a threadEventsQueue y a thread->IDLE - while (!(qitem || (qitem= qPull(thread->threadEventsQueue))) && !thread->sigkill) { - //sólo se entra aquí si no hay nada en la queue y no hay sigkill - //hemos avisado con thread->IDLE de que nos quedamos parados - // para que sepan que nos han de despertar - thread->IDLE= 1; - if (thread->hasIdleEventsListener) notifyIdle(thread); - TAGG_DEBUG && printf("THREAD %ld SLEEP\n", thread->id); - //en pthread_cond_wait se quedará atascada esta thread hasta que - //nos despierten y haya cosas en la queue o haya sigkill - //El lock se abre al entrar en pthread_cond_wait así que los - //demás ahora van a poder mirar thread->IDLE mientras estamos parados/durmiendo - pthread_cond_wait(&thread->idle_cv, &thread->idle_mutex); - //El lock queda cerrado al salir de pthread_cond_wait pero no importa xq - //si seguimos en el bucle se va a volver a abrir y si salimos tb +#endif + dev->CallAsFunction(global, 2, args); + qPush(event, qitemsStore); + } + else { + assert(0); } - //Aquí aún tenemos el lock así que podemos tocar thread->IDLE con seguridad - thread->IDLE= 0; - TAGG_DEBUG && printf("THREAD %ld WAKE UP\n", thread->id); - //lo soltamos - pthread_mutex_unlock(&thread->idle_mutex); - TAGG_DEBUG && printf("THREAD %ld SUELTA threadEventsQueue_mutex\n", thread->id); - } - + else + TAGG_DEBUG && printf("THREAD %ld NO QITEM\n", thread->id); + + if (thread->sigkill == kKillRudely) break; + + TAGG_DEBUG && printf("THREAD %ld NTQL\n", thread->id); + ntql= dnt->CallAsFunction(global, 0, NULL)->ToNumber()->Value(); + if (onError.HasCaught()) onError.Reset(); + + event= NULL; + if (ntql) continue; + event= qPull(thread->threadEventsQueue); + if (event) continue; + if (thread->sigkill) break; + + TAGG_DEBUG && printf("THREAD %ld : NO NTQL AND NO QITEM\n", thread->id); + + //v8::V8::IdleNotification(); + + TAGG_DEBUG && printf("THREAD %ld BEFORE MUTEX\n", thread->id); + //cogemos el lock para + //por un lado poder mirar si hay cosas en la queue sabiendo + //que nadie la está tocando + //y por otro lado para poder tocar thread->IDLE sabiendo + //que nadie la está mirando mientras la tocamos. + pthread_mutex_lock(&thread->idle_mutex); + TAGG_DEBUG && printf("THREAD %ld TIENE threadEventsQueue_MUTEX\n", thread->id); + event= qPull(thread->threadEventsQueue); + //aquí tenemos acceso exclusivo a threadEventsQueue y a thread->IDLE + if (!event && !thread->sigkill) { + //sólo se entra aquí si no hay nada en la queue y no hay sigkill + //hemos avisado con thread->IDLE de que nos quedamos parados + // para que sepan que nos han de despertar + thread->IDLE= 1; + if (thread->hasIdleEventsListener) notifyIdle(thread); + TAGG_DEBUG && printf("THREAD %ld SLEEP\n", thread->id); + //en pthread_cond_wait se quedará atascada esta thread hasta que + //nos despierten y haya cosas en la queue o haya sigkill + //El lock se abre al entrar en pthread_cond_wait así que los + //demás ahora van a poder mirar thread->IDLE mientras estamos parados/durmiendo + pthread_cond_wait(&thread->idle_cv, &thread->idle_mutex); + //El lock queda cerrado al salir de pthread_cond_wait pero no importa xq + //si seguimos en el bucle se va a volver a abrir y si salimos tb + } + //Aquí aún tenemos el lock así que podemos tocar thread->IDLE con seguridad + thread->IDLE= 0; + TAGG_DEBUG && printf("THREAD %ld WAKE UP\n", thread->id); + //lo soltamos + pthread_mutex_unlock(&thread->idle_mutex); + TAGG_DEBUG && printf("THREAD %ld SUELTA threadEventsQueue_mutex\n", thread->id); + } + +#ifdef TAGG_USE_NEW_API +#else context->Exit(); context.Dispose(); + } + thread->isolate->Exit(); +#endif TAGG_DEBUG && printf("THREAD %ld EVENTLOOP EXIT\n", thread->id); } @@ -770,14 +772,39 @@ static void notifyIdle (typeThread* thread) { //Esto es por culpa de libuv que se empeña en tener un callback de terminación. Al parecer... -static void cleanUpAfterThreadCallback (uv_handle_t* arg) { - v8::HandleScope scope; +static void cleanUpAfterThreadUVCallback (uv_handle_t* arg) { + typeThread* thread= (typeThread*) arg; + +#ifdef TAGG_USE_NEW_API + v8::HandleScope scope(v8::Isolate::GetCurrent()); + + TAGG_DEBUG && printf("THREAD %ld cleanUpAfterThreadUVCallback()\n", thread->id); + + if (thread->hasDestroyCallback) { + v8::Local cb; + cb= v8::Local::New(v8::Isolate::GetCurrent(), thread->destroyCallback); + assert(cb->IsFunction()); + cb->ToObject()->CallAsFunction(v8::Isolate::GetCurrent()->GetCurrentContext()->Global(), 0, NULL); + thread->destroyCallback.Reset(); + } + + thread->nodeDispatchEvents.Reset(); + thread->nodeObject.Reset(); +#else + v8::HandleScope scope; + TAGG_DEBUG && printf("THREAD %ld cleanUpAfterThreadCallback()\n", thread->id); + if (thread->hasDestroyCallback) { - thread->destroyCallback->CallAsFunction(v8::Context::GetCurrent()->Global(), 0, NULL); + thread->destroyCallback->ToObject()->CallAsFunction(v8::Context::GetCurrent()->Global(), 0, NULL); + thread->destroyCallback.Dispose(); } - thread->destroyCallback.Dispose(); + + thread->nodeDispatchEvents.Dispose(); + thread->nodeObject.Dispose(); +#endif + free(thread); } @@ -793,29 +820,26 @@ static void cleanUpAfterThread (typeThread* thread) { TAGG_DEBUG && printf("THREAD %ld cleanUpAfterThread() IN MAIN THREAD #1\n", thread->id); TAGG_DEBUG && printf("THREAD %ld cleanUpAfterThread() destroyQueue(thread->threadEventsQueue)\n", thread->id); - destroyQueue(thread->threadEventsQueue); + thread->threadEventsQueue= destroyQueue(thread->threadEventsQueue); TAGG_DEBUG && printf("THREAD %ld cleanUpAfterThread() destroyQueue(thread->processEventsQueue)\n", thread->id); - destroyQueue(thread->processEventsQueue); - + thread->processEventsQueue= destroyQueue(thread->processEventsQueue); pthread_cond_destroy(&(thread->idle_cv)); pthread_mutex_destroy(&(thread->idle_mutex)); - thread->nodeDispatchEvents.Dispose(); - thread->nodeObject.Dispose(); //OJO Y SI QUEDAN OTRAS REFERENCIAS POR AHÍ QUÉ PASA? if (thread->ended) { // Esta thread llegó a funcionar alguna vez // hay que apagar uv antes de poder hacer free(thread) - // De hecho el free(thread) se hará en una Callabck xq uv_close la va a llamar + // free(thread) se hará en cleanUpAfterThreadUVCallback xq uv_close la va a llamar TAGG_DEBUG && printf("THREAD %ld cleanUpAfterThread() FREE IN UV CALLBACK #2\n", thread->id); #ifdef TAGG_USE_LIBUV - uv_close((uv_handle_t*) &thread->async_watcher, cleanUpAfterThreadCallback); + uv_close((uv_handle_t*) &thread->async_watcher, cleanUpAfterThreadUVCallback); //uv_unref(&thread->async_watcher); #else ev_async_stop(EV_DEFAULT_UC_ &thread->async_watcher); ev_unref(EV_DEFAULT_UC); - cleanUpAfterThreadCallback((uv_handle_t*) thread); + cleanUpAfterThreadUVCallback((uv_handle_t*) thread); #endif } @@ -840,31 +864,115 @@ static void cleanUpAfterThread (typeThread* thread) { static void Callback ( #ifdef TAGG_USE_LIBUV uv_async_t* watcher + #if defined(UV_VERSION_MAJOR) && (UV_VERSION_MAJOR == 0) + , int status + #endif #else - EV_P_ ev_async* watcher + EV_P_ ev_async* watcher, int status #endif - , int status) { - - v8::HandleScope scope; +) { eventsQueueItem* event; typeThread* thread= (typeThread*) watcher; +#ifdef TAGG_USE_NEW_API + v8::HandleScope scope(v8::Isolate::GetCurrent()); + + v8::Isolate* iso= v8::Isolate::GetCurrent(); + + v8::Local cb; + v8::Local that; v8::Local array; v8::Local args[2]; - v8::Local null= v8::Local::New(v8::Null()); + v8::Local null= v8::Local::New(iso, v8::Null(iso)); assert(thread != NULL); assert(!thread->destroyed); + TAGG_DEBUG && printf("UV CALLBACK FOR THREAD %ld BEGIN\n", thread->id); + v8::TryCatch onError; while ((event= qPull(thread->processEventsQueue))) { - TAGG_DEBUG && printf("CALLBACK %ld IN MAIN THREAD\n", thread->id); + TAGG_DEBUG && printf("UV CALLBACK FOR THREAD %ld GOT EVENT #%ld\n", thread->id, event->serial); - assert(event != NULL); + if (event->eventType == kEventTypeEval) { + + TAGG_DEBUG && printf("CALLBACK kEventTypeEval IN MAIN THREAD\n"); + + assert(event->eval.hasCallback); + assert(event->eval.resultado != NULL); + + if (event->eval.error) { + args[0]= v8::Exception::Error(v8::String::NewFromUtf8(iso, event->eval.resultado)); + args[1]= null; + } + else { + args[0]= null; + args[1]= v8::String::NewFromUtf8(iso, event->eval.resultado); + } + free(event->eval.resultado); + + cb= v8::Local::New(iso, event->callback); + that= v8::Local::New(iso, thread->nodeObject); + assert(that->IsObject()); + assert(cb->IsFunction()); + cb->ToObject()->CallAsFunction(that->ToObject(), 2, args); + event->callback.Reset(); + event->eventType = kEventTypeEmpty; + qPush(event, qitemsStore); + + if (onError.HasCaught()) { + node::FatalException(onError); + return; + } + } + else if (event->eventType == kEventTypeEmit) { + + TAGG_DEBUG && printf("CALLBACK kEventTypeEmit IN MAIN THREAD\n"); + + args[0]= v8::String::NewFromUtf8(iso, event->emit.eventName); + free(event->emit.eventName); + array= v8::Array::New(iso, event->emit.argc); + args[1]= array; + if (event->emit.argc) { + int i= 0; + while (i < event->emit.argc) { + array->Set(i, v8::String::NewFromUtf8(iso, event->emit.argv[i])); + free(event->emit.argv[i]); + i++; + } + free(event->emit.argv); + } + cb= v8::Local::New(iso, thread->nodeDispatchEvents); + cb->ToObject()->CallAsFunction(iso->GetCurrentContext()->Global(), 2, args); + + event->eventType = kEventTypeEmpty; + qPush(event, qitemsStore); + } + else { + assert(0); + } + + } +#else + v8::HandleScope scope; + + v8::Local array; + v8::Local args[2]; + v8::Local null= v8::Local::New(v8::Null()); + + assert(thread != NULL); + assert(!thread->destroyed); + + TAGG_DEBUG && printf("UV CALLBACK FOR THREAD %ld BEGIN\n", thread->id); + + v8::TryCatch onError; + while ((event= qPull(thread->processEventsQueue))) { + + TAGG_DEBUG && printf("UV CALLBACK FOR THREAD %ld GOT EVENT #%ld\n", thread->id, event->serial); - if (event->eventType == eventTypeEval) { + if (event->eventType == kEventTypeEval) { TAGG_DEBUG && printf("CALLBACK eventTypeEval IN MAIN THREAD\n"); @@ -879,25 +987,27 @@ static void Callback ( args[0]= null; args[1]= v8::String::New(event->eval.resultado); } - event->callback->CallAsFunction(thread->nodeObject, 2, args); + free(event->eval.resultado); + assert(event->callback->IsFunction()); + event->callback->ToObject()->CallAsFunction(thread->nodeObject->ToObject(), 2, args); event->callback.Dispose(); - free(event->eval.resultado); - event->eventType = eventTypeNone; + event->eventType = kEventTypeEmpty; + qPush(event, qitemsStore); if (onError.HasCaught()) { node::FatalException(onError); return; } } - else if (event->eventType == eventTypeEmit) { + else if (event->eventType == kEventTypeEmit) { - TAGG_DEBUG && printf("CALLBACK eventTypeEmit IN MAIN THREAD\n"); + TAGG_DEBUG && printf("CALLBACK kEventTypeEmit IN MAIN THREAD\n"); args[0]= v8::String::New(event->emit.eventName); + free(event->emit.eventName); array= v8::Array::New(event->emit.argc); args[1]= array; - if (event->emit.argc) { int i= 0; while (i < event->emit.argc) { @@ -907,26 +1017,54 @@ static void Callback ( } free(event->emit.argv); } - - thread->nodeDispatchEvents->CallAsFunction(v8::Context::GetCurrent()->Global(), 2, args); + thread->nodeDispatchEvents->ToObject()->CallAsFunction(v8::Context::GetCurrent()->Global(), 2, args); - free(event->emit.eventName); - event->eventType = eventTypeNone; + event->eventType = kEventTypeEmpty; + qPush(event, qitemsStore); } else { assert(0); } - event->eventType = eventTypeNone; - event= NULL; } - +#endif + if (thread->sigkill && thread->ended) { - TAGG_DEBUG && printf("THREAD %ld CALLBACK CALLED cleanUpAfterThread()\n", thread->id); + TAGG_DEBUG && printf("UV CALLBACK FOR THREAD %ld CALLED cleanUpAfterThread()\n", thread->id); //pthread_cancel(thread->thread); thread->destroyed= 1; cleanUpAfterThread(thread); } + + TAGG_DEBUG && printf("UV CALLBACK FOR THREAD %ld END\n", thread->id); +} + + + + + + + +//printf de andar por casa +#ifdef TAGG_USE_NEW_API +static void Puts (const v8::FunctionCallbackInfo& args) { + v8::HandleScope scope(args.GetIsolate()); +#else +static v8::Handle Puts (const v8::Arguments &args) { + v8::HandleScope scope; +#endif + int i= 0; + while (i < args.Length()) { + v8::String::Utf8Value c_str(args[i]); + fputs(*c_str, stdout); + i++; + } + fflush(stdout); +#ifdef TAGG_USE_NEW_API + //args.GetReturnValue().Set(v8::Undefined(args.GetIsolate())); +#else + return v8::Undefined(); +#endif } @@ -936,8 +1074,35 @@ static void Callback ( +// Calling a method of a destroyed thread throws an error. +#ifdef TAGG_USE_NEW_API +static void NOP (const v8::FunctionCallbackInfo& args) { + v8::Isolate* iso= args.GetIsolate(); + v8::HandleScope scope(iso); + iso->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(iso, "This thread has been destroyed."))); +} +#else +static v8::Handle NOP (const v8::Arguments &args) { + v8::HandleScope scope; + return v8::ThrowException(v8::Exception::TypeError(v8::String::New("This thread has been destroyed"))); +} +#endif + + + + + + + // Tell a thread to quit, either nicely or rudely. +#ifdef TAGG_USE_NEW_API +static void Destroy (const v8::FunctionCallbackInfo& args) { + v8::Isolate* iso= args.GetIsolate(); + v8::HandleScope scope(iso); +#else static v8::Handle Destroy (const v8::Arguments &args) { + v8::HandleScope scope; +#endif //thread.destroy() or thread.destroy(0) means nicely (the default) //thread destroy(1) means rudely. @@ -946,14 +1111,18 @@ static v8::Handle Destroy (const v8::Arguments &args) { //When done rudely it will try to exit the event loop regardless. //ToDo: If the thread is stuck in a ` while (1) ; ` or something this won't work... - v8::HandleScope scope; //TODO: Hay que comprobar que this en un objeto y que tiene hiddenRefTotypeThread_symbol y que no es nil //TODO: Aquí habría que usar static void TerminateExecution(int thread_id); //TODO: static void v8::V8::TerminateExecution ( Isolate * isolate= NULL ) typeThread* thread= isAThread(args.This()); if (!thread) { +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(iso->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(iso, "thread.destroy(): the receiver must be a thread object")))); + return; +#else return v8::ThrowException(v8::Exception::TypeError(v8::String::New("thread.destroy(): the receiver must be a thread object"))); +#endif } int nuSigkill= kKillNicely; @@ -963,7 +1132,11 @@ static v8::Handle Destroy (const v8::Arguments &args) { thread->hasDestroyCallback= (args.Length() > 1) && (args[1]->IsFunction()); if (thread->hasDestroyCallback) { +#ifdef TAGG_USE_NEW_API + thread->destroyCallback.Reset(iso, args[1]); +#else thread->destroyCallback= v8::Persistent::New(args[1]->ToObject()); +#endif } if (TAGG_DEBUG) { @@ -972,7 +1145,21 @@ static v8::Handle Destroy (const v8::Arguments &args) { } wakeUpThread(thread, nuSigkill); + +#ifdef TAGG_USE_NEW_API + v8::Local nodeObject; + nodeObject= v8::Local::New(iso, thread->nodeObject)->ToObject(); + nodeObject->Set(v8::String::NewFromUtf8(iso, "load"), v8::FunctionTemplate::New(iso, NOP)->GetFunction()); + nodeObject->Set(v8::String::NewFromUtf8(iso, "eval"), v8::FunctionTemplate::New(iso, NOP)->GetFunction()); + nodeObject->Set(v8::String::NewFromUtf8(iso, "emit"), v8::FunctionTemplate::New(iso, NOP)->GetFunction()); + nodeObject->Set(v8::String::NewFromUtf8(iso, "destroy"), v8::FunctionTemplate::New(iso, NOP)->GetFunction()); +#else + thread->nodeObject->ToObject()->Set(v8::String::NewSymbol("load"), v8::FunctionTemplate::New(NOP)->GetFunction()); + thread->nodeObject->ToObject()->Set(v8::String::NewSymbol("eval"), v8::FunctionTemplate::New(NOP)->GetFunction()); + thread->nodeObject->ToObject()->Set(v8::String::NewSymbol("emit"), v8::FunctionTemplate::New(NOP)->GetFunction()); + thread->nodeObject->ToObject()->Set(v8::String::NewSymbol("destroy"), v8::FunctionTemplate::New(NOP)->GetFunction()); return v8::Undefined(); +#endif } @@ -983,28 +1170,51 @@ static v8::Handle Destroy (const v8::Arguments &args) { // Eval: Pushes an eval job to the threadEventsQueue. +#ifdef TAGG_USE_NEW_API +static void Eval (const v8::FunctionCallbackInfo& args) { + v8::HandleScope scope(args.GetIsolate()); +#else static v8::Handle Eval (const v8::Arguments &args) { v8::HandleScope scope; - +#endif + if (!args.Length()) { +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.GetIsolate()->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(args.GetIsolate(), "thread.eval(program [,callback]): missing arguments")))); + return; +#else return v8::ThrowException(v8::Exception::TypeError(v8::String::New("thread.eval(program [,callback]): missing arguments"))); +#endif } typeThread* thread= isAThread(args.This()); if (!thread) { +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.GetIsolate()->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(args.GetIsolate(), "thread.eval(): the receiver must be a thread object")))); + return; +#else return v8::ThrowException(v8::Exception::TypeError(v8::String::New("thread.eval(): the receiver must be a thread object"))); +#endif } - eventsQueueItem* event= nuQitem(thread->threadEventsQueue); + eventsQueueItem* event= nuQitem(); event->eval.hasCallback= (args.Length() > 1) && (args[1]->IsFunction()); if (event->eval.hasCallback) { +#ifdef TAGG_USE_NEW_API + event->callback.Reset(args.GetIsolate(), args[1]); +#else event->callback= v8::Persistent::New(args[1]->ToObject()); +#endif } event->eval.scriptText= o2cstr(args[0]); - event->eventType= eventTypeEval; + event->eventType= kEventTypeEval; qPush(event, thread->threadEventsQueue); wakeUpThread(thread, thread->sigkill); +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.This()); +#else return args.This(); +#endif } @@ -1014,29 +1224,60 @@ static v8::Handle Eval (const v8::Arguments &args) { -// Load: emits a eventTypeLoad event to the thread +// Load: emits a kEventTypeLoad event to the thread +#ifdef TAGG_USE_NEW_API +static void Load (const v8::FunctionCallbackInfo& args) { + v8::HandleScope scope(args.GetIsolate()); +#else static v8::Handle Load (const v8::Arguments &args) { v8::HandleScope scope; +#endif if (!args.Length()) { + +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.GetIsolate()->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(args.GetIsolate(), "thread.load(filename [,callback]): missing arguments")))); + return; +#else return v8::ThrowException(v8::Exception::TypeError(v8::String::New("thread.load(filename [,callback]): missing arguments"))); +#endif + } typeThread* thread= isAThread(args.This()); if (!thread) { + +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.GetIsolate()->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(args.GetIsolate(), "thread.load(): the receiver must be a thread object")))); + return; +#else return v8::ThrowException(v8::Exception::TypeError(v8::String::New("thread.load(): the receiver must be a thread object"))); +#endif + } - eventsQueueItem* event= nuQitem(thread->threadEventsQueue); - event->eventType= eventTypeLoad; + eventsQueueItem* event= nuQitem(); event->load.path= o2cstr(args[0]); event->load.hasCallback= ((args.Length() > 1) && (args[1]->IsFunction())); if (event->load.hasCallback) { + +#ifdef TAGG_USE_NEW_API + event->callback.Reset(args.GetIsolate(), args[1]); +#else event->callback= v8::Persistent::New(args[1]->ToObject()); +#endif + } + + event->eventType= kEventTypeLoad; qPush(event, thread->threadEventsQueue); wakeUpThread(thread, thread->sigkill); + +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.This()); +#else return args.This(); +#endif } @@ -1045,11 +1286,15 @@ static v8::Handle Load (const v8::Arguments &args) { -//No se usa xq parece que el inline no va, pero sirve para acortar processEmit y threadEmit, -//por que casi todo el código es idéntico en ambas + +//por que casi todo el código es idéntico en processEmit y threadEmit +#ifdef TAGG_USE_NEW_API +static inline void pushEmitEvent (eventsQueue* queue, const v8::FunctionCallbackInfo& args) { +#else static inline void pushEmitEvent (eventsQueue* queue, const v8::Arguments &args) { +#endif - eventsQueueItem* event= nuQitem(queue); + eventsQueueItem* event= nuQitem(); event->emit.eventName= o2cstr(args[0]); event->emit.argc= (args.Length() > 1) ? (args.Length() - 1) : 0; if (event->emit.argc) { @@ -1063,7 +1308,7 @@ static inline void pushEmitEvent (eventsQueue* queue, const v8::Arguments &args) TAGG_DEBUG && printf("PROCESS EMIT TO THREAD #%ld\n", event->serial); - event->eventType= eventTypeEmit; + event->eventType= kEventTypeEmit; qPush(event, queue); } @@ -1075,34 +1320,31 @@ static inline void pushEmitEvent (eventsQueue* queue, const v8::Arguments &args) //La que emite los events de node hacia las threads +#ifdef TAGG_USE_NEW_API +static void processEmit (const v8::FunctionCallbackInfo& args) { + if (!args.Length()) return args.GetReturnValue().Set(args.This()); + typeThread* thread= isAThread(args.This()); + if (!thread) { + args.GetReturnValue().Set(args.GetIsolate()->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(args.GetIsolate(), "thread.emit(): 'this' must be a thread object")))); + return; + } +#else static v8::Handle processEmit (const v8::Arguments &args) { if (!args.Length()) return args.This(); typeThread* thread= isAThread(args.This()); if (!thread) { return v8::ThrowException(v8::Exception::TypeError(v8::String::New("thread.emit(): 'this' must be a thread object"))); } -/* - eventsQueueItem* event= nuQitem(thread->threadEventsQueue); - event->serial= serial++; - event->emit.eventName= o2cstr(args[0]); - event->emit.argc= (args.Length() > 1) ? (args.Length() - 1) : 0; - if (event->emit.argc) { - event->emit.argv= (char**) malloc(event->emit.argc * sizeof(char*)); - int i= 0; - while (i < event->emit.argc) { - event->emit.argv[i]= o2cstr(args[i+1]); - i++; - } - } - - TAGG_DEBUG && printf("PROCESS EMIT TO THREAD %ld #%ld\n", thread->id, event->serial); - - event->eventType= eventTypeEmit; - qPush(event, thread->threadEventsQueue); -*/ +#endif + pushEmitEvent(thread->threadEventsQueue, args); wakeUpThread(thread, thread->sigkill); + +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.This()); +#else return args.This(); +#endif } @@ -1111,34 +1353,26 @@ static v8::Handle processEmit (const v8::Arguments &args) { -//La que emite los events de las threads hacia node +//La que emite los events de las threads hacia node, se ejecuta en las threads. +#ifdef TAGG_USE_NEW_API +static void threadEmit (const v8::FunctionCallbackInfo& args) { + if (!args.Length()) return args.GetReturnValue().Set(args.This()); + typeThread* thread= (typeThread*) args.GetIsolate()->GetData(1); +#else static v8::Handle threadEmit (const v8::Arguments &args) { if (!args.Length()) return args.This(); typeThread* thread= (typeThread*) v8::Isolate::GetCurrent()->GetData(); +#endif + assert(thread != NULL); assert(thread->threadMagicCookie == kThreadMagicCookie); -/* - eventsQueueItem* event= nuQitem(thread->processEventsQueue); - event->serial= serial++; - event->emit.eventName= o2cstr(args[0]); - event->emit.argc= (args.Length() > 1) ? (args.Length() - 1) : 0; - if (event->emit.argc) { - event->emit.argv= (char**) malloc(event->emit.argc * sizeof(char*)); - int i= 0; - while (i < event->emit.argc) { - event->emit.argv[i]= o2cstr(args[i+1]); - i++; - } - } - - TAGG_DEBUG && printf("THREAD %ld EMIT #%ld\n", thread->id, event->serial); - - event->eventType= eventTypeEmit; - qPush(event, thread->processEventsQueue); -*/ pushEmitEvent(thread->processEventsQueue, args); WAKEUP_NODE_EVENT_LOOP +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(args.This()); +#else return args.This(); +#endif } @@ -1149,43 +1383,61 @@ static v8::Handle threadEmit (const v8::Arguments &args) { //Se ejecuta al hacer tagg.create(): Creates and launches a new isolate in a new background thread. +#ifdef TAGG_USE_NEW_API +static void Create (const v8::FunctionCallbackInfo& args) { + v8::HandleScope scope(args.GetIsolate()); + v8::Isolate* iso= args.GetIsolate(); +#else static v8::Handle Create (const v8::Arguments &args) { v8::HandleScope scope; +#endif typeThread* thread= (typeThread*) calloc(1, sizeof (typeThread)); thread->id= threadsCtr++; thread->threadMagicCookie= kThreadMagicCookie; thread->threadEventsQueue= nuQueue(); thread->processEventsQueue= nuQueue(); - thread->nodeObject= v8::Persistent::New(threadTemplate->NewInstance()); - thread->nodeObject->SetPointerInInternalField(0, thread); - thread->nodeObject->Set(id_symbol, v8::Integer::New(thread->id)); - thread->nodeObject->Set(version_symbol, v8::String::New(k_TAGG_VERSION)); - thread->nodeDispatchEvents= v8::Persistent::New(boot_js->CallAsFunction(thread->nodeObject, 0, NULL)->ToObject()); - + pthread_cond_init(&(thread->idle_cv), NULL); pthread_mutex_init(&(thread->idle_mutex), NULL); char* errstr; - int err, retry= 5; - do { - err= pthread_create(&(thread->thread), NULL, threadBootProc, thread); - //pthread_detach(pthread_t thread); ??? - if (!err) break; - errstr= strerror(err); - printf("THREAD %ld PTHREAD_CREATE() ERROR %d : %s RETRYING %d\n", thread->id, err, errstr, retry); - usleep(100000); - } while (--retry); - + //pthread_detach(pthread_t thread); ??? + int err= pthread_create(&(thread->thread), NULL, threadBootProc, thread); if (err) { + errstr= strerror(err); + printf("THREAD %ld PTHREAD_CREATE() ERROR %d : %s\n", thread->id, err, errstr); //Algo ha ido mal, toca deshacer todo - printf("THREAD %ld PTHREAD_CREATE() ERROR %d : %s NOT RETRYING ANY MORE\n", thread->id, err, errstr); TAGG_DEBUG && printf("CALLED cleanUpAfterThread %ld FROM CREATE()\n", thread->id); cleanUpAfterThread(thread); +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(iso->ThrowException(v8::Exception::TypeError(v8::String::NewFromUtf8(iso, "create(): error in pthread_create()")))); + return; +#else return v8::ThrowException(v8::Exception::TypeError(v8::String::New("create(): error in pthread_create()"))); +#endif } - else { - + +#ifdef TAGG_USE_NEW_API + v8::Local nodeObject= v8::Object::New(iso); + nodeObject->SetHiddenValue(v8::String::NewFromUtf8(iso, "ptr"), v8::Number::New(iso, (double) ((uintptr_t) thread))); + nodeObject->Set(v8::String::NewFromUtf8(iso, "load"), v8::FunctionTemplate::New(iso, Load)->GetFunction()); + nodeObject->Set(v8::String::NewFromUtf8(iso, "eval"), v8::FunctionTemplate::New(iso, Eval)->GetFunction()); + nodeObject->Set(v8::String::NewFromUtf8(iso, "emit"), v8::FunctionTemplate::New(iso, processEmit)->GetFunction()); + nodeObject->Set(v8::String::NewFromUtf8(iso, "destroy"), v8::FunctionTemplate::New(iso, Destroy)->GetFunction()); + nodeObject->Set(v8::String::NewFromUtf8(iso, "id"), v8::Integer::New(iso, thread->id)); + nodeObject->Set(v8::String::NewFromUtf8(iso, "version"), v8::String::NewFromUtf8(iso, k_TAGG_VERSION)); + v8::Local boot= v8::Local::New(iso, boot_js); + thread->nodeDispatchEvents.Reset(iso, boot->ToObject()->CallAsFunction(nodeObject, 0, NULL)->ToObject()); + thread->nodeObject.Reset(iso, nodeObject); +#else + thread->nodeObject= v8::Persistent::New(threadTemplate->NewInstance()); + thread->nodeObject->ToObject()->SetPointerInInternalField(0, thread); + thread->nodeObject->ToObject()->Set(v8::String::NewSymbol("id"), v8::Integer::New(thread->id)); + thread->nodeObject->ToObject()->Set(v8::String::NewSymbol("version"), v8::String::New(k_TAGG_VERSION)); + thread->nodeDispatchEvents= v8::Persistent::New(boot_js->ToObject()->CallAsFunction(thread->nodeObject->ToObject(), 0, NULL)->ToObject()); +#endif + #ifdef TAGG_USE_LIBUV uv_async_init(uv_default_loop(), &thread->async_watcher, Callback); #else @@ -1193,10 +1445,12 @@ static v8::Handle Create (const v8::Arguments &args) { ev_async_start(EV_DEFAULT_UC_ &thread->async_watcher); ev_ref(EV_DEFAULT_UC); #endif - - } +#ifdef TAGG_USE_NEW_API + args.GetReturnValue().Set(nodeObject); +#else return thread->nodeObject; +#endif } @@ -1207,10 +1461,19 @@ static v8::Handle Create (const v8::Arguments &args) { //Esto es lo primero que llama node al hacer require('threads_a_gogo') void Init (v8::Handle target) { - qitemStore= qitemStoreInit(); + qitemsStore= nuQueue(); useLocker= v8::Locker::IsActive(); - id_symbol= v8::Persistent::New(v8::String::NewSymbol("id")); - version_symbol= v8::Persistent::New(v8::String::NewSymbol("version")); + +#ifdef TAGG_USE_NEW_API + + v8::Isolate* iso= v8::Isolate::GetCurrent(); + boot_js.Reset(iso, v8::Script::Compile(v8::String::NewFromUtf8(iso, kBoot_js))->Run()->ToObject()); + target->Set(v8::String::NewFromUtf8(iso, "create"), v8::FunctionTemplate::New(iso, Create)->GetFunction()); + target->Set(v8::String::NewFromUtf8(iso, "createPool"), v8::Script::Compile(v8::String::NewFromUtf8(iso, kPool_js))->Run()->ToObject()); + target->Set(v8::String::NewFromUtf8(iso, "version"), v8::String::NewFromUtf8(iso, k_TAGG_VERSION)); + +#else + boot_js= v8::Persistent::New(v8::Script::Compile(v8::String::New(kBoot_js))->Run()->ToObject()); threadTemplate= v8::Persistent::New(v8::ObjectTemplate::New()); @@ -1222,7 +1485,10 @@ void Init (v8::Handle target) { target->Set(v8::String::NewSymbol("create"), v8::FunctionTemplate::New(Create)->GetFunction()); target->Set(v8::String::NewSymbol("createPool"), v8::Script::Compile(v8::String::New(kPool_js))->Run()->ToObject()); - target->Set(version_symbol, v8::String::New(k_TAGG_VERSION)); + target->Set(v8::String::NewSymbol("version"), v8::String::New(k_TAGG_VERSION)); + +#endif + } NODE_MODULE(threads_a_gogo, Init) diff --git a/test/02_events_storm.js b/test/02_events_storm.js index 2ba7a95..4ff3e33 100644 --- a/test/02_events_storm.js +++ b/test/02_events_storm.js @@ -168,11 +168,11 @@ function createRandomEvent (thread,eventType,what,i) { function emitEvents (i,howMany,what,n) { - //Hasta 2000 los mandamos randomly de una tacada + //Hasta 20000 los mandamos randomly de una tacada i= 0; howMany= Math.floor(howManyEvents/2); - howMany= (howMany > 10e3) ? 10e3 : howMany; + howMany= (howMany > 20e3) ? 20e3 : howMany; while (i < howMany) { what= createRandomEvent(); sentEvents.push(what); @@ -185,14 +185,11 @@ function emitEvents (i,howMany,what,n) { howMany= howManyEvents- howMany; (function loop (what,i) { if (howMany > 0) { - i= (howMany > 2048) ? 1024+ rnd(1024) : howMany; - howMany-= i; - while (i--) { - what= createRandomEvent(); - sentEvents.push(what); - what.thread.emit.apply(what.thread, what.argv); - setImmediate(loop); - } + howMany-= 1; + what= createRandomEvent(); + sentEvents.push(what); + what.thread.emit.apply(what.thread, what.argv); + setImmediate(loop); } })(); diff --git a/test/all.js b/test/all.js index 4a62b27..45772ce 100644 --- a/test/all.js +++ b/test/all.js @@ -34,7 +34,7 @@ function boot () { var name= rndStr(12)+ '.tagg.test.js'; var path= (process.env.TMPDIR || '/tmp/')+ name; -require('fs').writeFileSync(path, boot); +require('fs').writeFileSync(path, boot.toString()); var t= tagg.create().load(path, cb).eval("boot()", cb1); step('OK.'); diff --git a/test/test22_create_destroy_loop.js b/test/test22_create_destroy_loop.js index 6fcee0c..0bfdbd2 100644 --- a/test/test22_create_destroy_loop.js +++ b/test/test22_create_destroy_loop.js @@ -1,25 +1,19 @@ -var T= require('threads_a_gogo'); +var tagg= require('threads_a_gogo'); var i= 0; -var k= 5; (function again () { - var j= k; - while (j--) { - T.create().destroy(); - } - i+= k; - setImmediate(again); + i++; + tagg.create().destroy(1, again); })(); var t= Date.now(); -function display () { +(function display () { var e= Date.now()- t; var tps= (i*1e3/e).toFixed(1); process.stdout.write('\nt (ms) -> '+ e+ ', i -> '+ i+ ', created/destroyed-per-second -> '+ tps); -} - -setInterval(display, 1e3); \ No newline at end of file + setTimeout(display, 666); +})(); diff --git a/test/test95_eval_to_emit_to_callback_benchmark.js b/test/test95_eval_to_emit_to_callback_benchmark.js index 200b044..1c747b2 100644 --- a/test/test95_eval_to_emit_to_callback_benchmark.js +++ b/test/test95_eval_to_emit_to_callback_benchmark.js @@ -17,7 +17,7 @@ function cb1 () { function cb2 () { var e= (process.hrtime(this.t1)[1]/1000).toFixed(2)+'(µs) \r'; - var str= " EVAL TO EMIT: "+ this.t0+ " EMIT TO CB: "+ e; + var str= " EVAL TO EMIT CB: "+ this.t0+ " EMIT CB TO EVAL CB: "+ e; process.stdout.write('THREAD '+ this.id+ str+ '\r'); this.t0= process.hrtime(); this.eval('thread.emit("ping")', cb2); diff --git a/test/test96_destroy_loop_using_destroy_callbaks.js b/test/test96_destroy_loop_using_destroy_callbaks.js index 8d85747..b6a99ef 100644 --- a/test/test96_destroy_loop_using_destroy_callbaks.js +++ b/test/test96_destroy_loop_using_destroy_callbaks.js @@ -1,12 +1,12 @@ tagg= require('threads_a_gogo'); howmany= +process.argv[2] || 2; loops= +process.argv[3] || 2000; -process.stdout.write('Using '+ howmany+ ' threads, '+ loops+ 'loops \n'); +process.stdout.write('Using '+ howmany+ ' threads, '+ loops+ ' loops \n'); while (howmany--) create(); function nop () {} -function create (t) { t=tagg.create().eval('thread.nextTick(function (i) { i= 1e5; while (i--) ; })').destroy(0, cb); } +function create () { tagg.create().eval('thread.nextTick(function (i) { i= 1e5; while (i--) ; })').destroy(0, cb); } ctr= 0; function cb () { @@ -14,5 +14,3 @@ function cb () { console.log("DESTROY() CALLBACK #"+ ctr+ "\r"); if (ctr < loops) create(); } - -