Better thread API with Timing measuring, setup with modules returns and shared objects (Untested)

This commit is contained in:
Peaceultime 2021-11-22 10:36:47 +01:00
parent 8be7991b1d
commit 5ae0479e02
2 changed files with 42 additions and 95 deletions

View File

@ -1,78 +0,0 @@
// WORKER SIDE //
const Constants = {
STATE: {
WAITING: 1,
OK: 2,
ERROR: 3,
},
REQUEST: {
SETUP: 1,
CALL: 2,
TERMINATE: 3
},
};
function send(ret)
{
globalThis.VERBOSE && console.debug(`Sending ${ret} to the main thread`);
postMessage([Constants.STATE.OK, ret]);
}
class Process
{
static _isSetup = false;
static setup()
{
globalThis.DEBUG && console.log(`Setting up process`);
Process._isSetup = true;
Process._customFn = {};
onmessage = Process.onmessage;
}
static onmessage(e)
{
globalThis.VERBOSE && console.debug(`Received ${e.data}`);
if(e && e.data)
{
switch(e.data[0])
{
case Constants.REQUEST.SETUP:
send(Object.keys(Process._customFn));
break;
case Constants.REQUEST.CALL:
send(Process._customFn[e.data[1]](...e.data[2]));
break;
case Constants.REQUEST.TERMINATE:
Process.cleanUp();
send();
break;
default:
throw new Error("Invalid message received in the Process");
break;
}
}
else
{
throw new Error("Can't find any data");
}
}
static register(name, fn)
{
if(!Process._isSetup)
Process.setup();
if(name in Process._customFn)
throw new Error("This function name is already registered in the process. Please use a different one.");
else
Process._customFn[name] = fn;
}
static cleanUp()
{
}
}
export { Process, Constants };

View File

@ -23,16 +23,17 @@ function cleanUp(thread)
thread._waiting = false; thread._waiting = false;
thread._resolver = undefined; thread._resolver = undefined;
thread._rejecter = undefined; thread._rejecter = undefined;
thread._calledMod = "";
thread._calledFn = ""; thread._calledFn = "";
} }
function request(thread, req) function request(thread, req, shared)
{ {
thread._promise = new Promise(function(res, rej) { thread._promise = new Promise(function(res, rej) {
thread._waiting = true; thread._waiting = true;
thread._resolver = res; thread._resolver = res;
thread._rejecter = rej; thread._rejecter = rej;
thread._worker && thread._worker.postMessage(req); thread._worker && thread._worker.postMessage(req, shared);
}); });
return thread._promise; return thread._promise;
@ -50,12 +51,14 @@ function parseReturns(data)
} }
return data[1]; return data[1];
} }
function wrapper(thread, fn) function wrapper(thread, mod, fn)
{ {
return function() return function(args, shared)
{ {
globalThis.TIMINGS && console.time("Measuring " + fn); globalThis.TIMINGS && console.time("Measuring " + mod + "." + fn);
return request(thread, [Constants.REQUEST.CALL, fn, [...arguments]]); globalThis.TIMINGS && thread._calledMod = mod;
globalThis.TIMINGS && thread._calledFn = fn;
return request(thread, [Constants.REQUEST.CALL, mod, fn, args], shared);
} }
} }
@ -68,6 +71,7 @@ function onmessage(thread)
if(thread._waiting && ret) if(thread._waiting && ret)
{ {
globalThis.DEBUG && console.log(`Resolving ${thread._name}`); globalThis.DEBUG && console.log(`Resolving ${thread._name}`);
globalThis.TIMINGS && thread._calledFn !== "" && console.timeEnd("Measuring " + thread._calledMod + "." + thread._calledFn);
thread._ret = ret; thread._ret = ret;
thread._resolver(ret); thread._resolver(ret);
cleanUp(thread); cleanUp(thread);
@ -77,7 +81,7 @@ function onmessage(thread)
function onerror(thread) function onerror(thread)
{ {
return function(e) { return function(e) {
console.error(`Received error from ${thread._name} containing ${e}`); globalThis.DEBUG && console.error(`Received error from ${thread._name} containing ${e}`);
if(thread._waiting) if(thread._waiting)
{ {
@ -109,12 +113,15 @@ const Thread = Object.freeze(class
{ {
const result = await request(this, [Constants.REQUEST.SETUP]); const result = await request(this, [Constants.REQUEST.SETUP]);
if(result && result.length) if(result)
{ {
for(let i = 0; i < result.length; i++) for(let mod of Object.getOwnPropertyNames(result))
{ {
const custom = result[i]; this[mod] = {};
this[custom] = wrapper(this, custom); for(let i = 0; i < result[mod].length; ++i)
{
this[mod][fn] = wrapper(this, mod, fn);
}
} }
} }
} }
@ -129,6 +136,11 @@ const Thread = Object.freeze(class
return Promise.resolve(this._ret); return Promise.resolve(this._ret);
} }
} }
terminate()
{
await request(this, [Constants.REQUEST.TERMINATE]);
this._worker.terminate();
}
}); });
// WORKER SIDE // // WORKER SIDE //
@ -157,11 +169,20 @@ class Process
switch(e.data[0]) switch(e.data[0])
{ {
case Constants.REQUEST.SETUP: case Constants.REQUEST.SETUP:
send(Object.keys(Process._customFn)); const obj = {};
for(let mod of Object.getOwnPropertyNames(Process._customFn))
{
obj[mod] = [];
for(let fn of Object.getOwnPropertyNames(Process._customFn[mod]))
{
obj[mod].push(fn);
}
}
send(obj);
break; break;
case Constants.REQUEST.CALL: case Constants.REQUEST.CALL:
send(Process._customFn[e.data[1]](...e.data[2])); send(Process._customFn[e.data[1]][e.data[2]](...e.data[3]));
break; break;
case Constants.REQUEST.TERMINATE: case Constants.REQUEST.TERMINATE:
@ -191,16 +212,20 @@ class Process
if(typeof arr[i] != 'function') if(typeof arr[i] != 'function')
throw new Error("You can only register functions"); throw new Error("You can only register functions");
const name = mod + "_" arr[i].name; if(mod in Process._customFn)
if(name in Process._customFn) throw new Error("This module has already been registered in the process.");
const modObj = {};
const name = arr[i].name;
if(name in modObj)
throw new Error("This function name is already registered in the process. Please use a different one."); throw new Error("This function name is already registered in the process. Please use a different one.");
else else
Process._customFn[name] = arr[i]; modObj[name] = arr[i];
} }
} }
static cleanUp() static cleanUp()
{ {
Process._customFn = {};
} }
} }