-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
TaskProcessor.js
377 lines (327 loc) · 11.1 KB
/
TaskProcessor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
import Uri from "../ThirdParty/Uri.js";
import buildModuleUrl from "./buildModuleUrl.js";
import defaultValue from "./defaultValue.js";
import defer from "./defer.js";
import defined from "./defined.js";
import destroyObject from "./destroyObject.js";
import DeveloperError from "./DeveloperError.js";
import Event from "./Event.js";
import FeatureDetection from "./FeatureDetection.js";
import isCrossOriginUrl from "./isCrossOriginUrl.js";
import Resource from "./Resource.js";
import RuntimeError from "./RuntimeError.js";
function canTransferArrayBuffer() {
if (!defined(TaskProcessor._canTransferArrayBuffer)) {
const worker = new Worker(
getWorkerUrl("Workers/transferTypedArrayTest.js")
);
worker.postMessage = defaultValue(
worker.webkitPostMessage,
worker.postMessage
);
const value = 99;
const array = new Int8Array([value]);
try {
// postMessage might fail with a DataCloneError
// if transferring array buffers is not supported.
worker.postMessage(
{
array: array,
},
[array.buffer]
);
} catch (e) {
TaskProcessor._canTransferArrayBuffer = false;
return TaskProcessor._canTransferArrayBuffer;
}
const deferred = defer();
worker.onmessage = function (event) {
const array = event.data.array;
// some versions of Firefox silently fail to transfer typed arrays.
// https://bugzilla.mozilla.org/show_bug.cgi?id=841904
// Check to make sure the value round-trips successfully.
const result = defined(array) && array[0] === value;
deferred.resolve(result);
worker.terminate();
TaskProcessor._canTransferArrayBuffer = result;
};
TaskProcessor._canTransferArrayBuffer = deferred.promise;
}
return TaskProcessor._canTransferArrayBuffer;
}
const taskCompletedEvent = new Event();
function completeTask(processor, data) {
--processor._activeTasks;
const id = data.id;
if (!defined(id)) {
// This is not one of ours.
return;
}
const deferreds = processor._deferreds;
const deferred = deferreds[id];
if (defined(data.error)) {
let error = data.error;
if (error.name === "RuntimeError") {
error = new RuntimeError(data.error.message);
error.stack = data.error.stack;
} else if (error.name === "DeveloperError") {
error = new DeveloperError(data.error.message);
error.stack = data.error.stack;
}
taskCompletedEvent.raiseEvent(error);
deferred.reject(error);
} else {
taskCompletedEvent.raiseEvent();
deferred.resolve(data.result);
}
delete deferreds[id];
}
function getWorkerUrl(moduleID) {
let url = buildModuleUrl(moduleID);
if (isCrossOriginUrl(url)) {
//to load cross-origin, create a shim worker from a blob URL
const script = `importScripts("${url}");`;
let blob;
try {
blob = new Blob([script], {
type: "application/javascript",
});
} catch (e) {
const BlobBuilder =
window.BlobBuilder ||
window.WebKitBlobBuilder ||
window.MozBlobBuilder ||
window.MSBlobBuilder;
const blobBuilder = new BlobBuilder();
blobBuilder.append(script);
blob = blobBuilder.getBlob("application/javascript");
}
const URL = window.URL || window.webkitURL;
url = URL.createObjectURL(blob);
}
return url;
}
let bootstrapperUrlResult;
function getBootstrapperUrl() {
if (!defined(bootstrapperUrlResult)) {
bootstrapperUrlResult = getWorkerUrl("Workers/cesiumWorkerBootstrapper.js");
}
return bootstrapperUrlResult;
}
function createWorker(processor) {
const worker = new Worker(getBootstrapperUrl());
worker.postMessage = defaultValue(
worker.webkitPostMessage,
worker.postMessage
);
const bootstrapMessage = {
loaderConfig: {
paths: {
Workers: buildModuleUrl("Workers"),
},
baseUrl: buildModuleUrl.getCesiumBaseUrl().url,
},
workerModule: processor._workerPath,
};
worker.postMessage(bootstrapMessage);
worker.onmessage = function (event) {
completeTask(processor, event.data);
};
return worker;
}
function getWebAssemblyLoaderConfig(processor, wasmOptions) {
const config = {
modulePath: undefined,
wasmBinaryFile: undefined,
wasmBinary: undefined,
};
// Web assembly not supported, use fallback js module if provided
if (!FeatureDetection.supportsWebAssembly()) {
if (!defined(wasmOptions.fallbackModulePath)) {
throw new RuntimeError(
`This browser does not support Web Assembly, and no backup module was provided for ${processor._workerPath}`
);
}
config.modulePath = buildModuleUrl(wasmOptions.fallbackModulePath);
return Promise.resolve(config);
}
config.modulePath = buildModuleUrl(wasmOptions.modulePath);
config.wasmBinaryFile = buildModuleUrl(wasmOptions.wasmBinaryFile);
return Resource.fetchArrayBuffer({
url: config.wasmBinaryFile,
}).then(function (arrayBuffer) {
config.wasmBinary = arrayBuffer;
return config;
});
}
/**
* A wrapper around a web worker that allows scheduling tasks for a given worker,
* returning results asynchronously via a promise.
*
* The Worker is not constructed until a task is scheduled.
*
* @alias TaskProcessor
* @constructor
*
* @param {String} workerPath The Url to the worker. This can either be an absolute path or relative to the Cesium Workers folder.
* @param {Number} [maximumActiveTasks=Number.POSITIVE_INFINITY] The maximum number of active tasks. Once exceeded,
* scheduleTask will not queue any more tasks, allowing
* work to be rescheduled in future frames.
*/
function TaskProcessor(workerPath, maximumActiveTasks) {
const uri = new Uri(workerPath);
this._workerPath =
uri.scheme().length !== 0 && uri.fragment().length === 0
? workerPath
: TaskProcessor._workerModulePrefix + workerPath;
this._maximumActiveTasks = defaultValue(
maximumActiveTasks,
Number.POSITIVE_INFINITY
);
this._activeTasks = 0;
this._deferreds = {};
this._nextID = 0;
}
const emptyTransferableObjectArray = [];
/**
* Schedule a task to be processed by the web worker asynchronously. If there are currently more
* tasks active than the maximum set by the constructor, will immediately return undefined.
* Otherwise, returns a promise that will resolve to the result posted back by the worker when
* finished.
*
* @param {Object} parameters Any input data that will be posted to the worker.
* @param {Object[]} [transferableObjects] An array of objects contained in parameters that should be
* transferred to the worker instead of copied.
* @returns {Promise.<Object>|undefined} Either a promise that will resolve to the result when available, or undefined
* if there are too many active tasks,
*
* @example
* const taskProcessor = new Cesium.TaskProcessor('myWorkerPath');
* const promise = taskProcessor.scheduleTask({
* someParameter : true,
* another : 'hello'
* });
* if (!Cesium.defined(promise)) {
* // too many active tasks - try again later
* } else {
* promise.then(function(result) {
* // use the result of the task
* });
* }
*/
TaskProcessor.prototype.scheduleTask = function (
parameters,
transferableObjects
) {
if (!defined(this._worker)) {
this._worker = createWorker(this);
}
if (this._activeTasks >= this._maximumActiveTasks) {
return undefined;
}
++this._activeTasks;
const processor = this;
return Promise.resolve(canTransferArrayBuffer()).then(function (
canTransferArrayBuffer
) {
if (!defined(transferableObjects)) {
transferableObjects = emptyTransferableObjectArray;
} else if (!canTransferArrayBuffer) {
transferableObjects.length = 0;
}
const id = processor._nextID++;
const deferred = defer();
processor._deferreds[id] = deferred;
processor._worker.postMessage(
{
id: id,
parameters: parameters,
canTransferArrayBuffer: canTransferArrayBuffer,
},
transferableObjects
);
return deferred.promise;
});
};
/**
* Posts a message to a web worker with configuration to initialize loading
* and compiling a web assembly module asychronously, as well as an optional
* fallback JavaScript module to use if Web Assembly is not supported.
*
* @param {Object} [webAssemblyOptions] An object with the following properties:
* @param {String} [webAssemblyOptions.modulePath] The path of the web assembly JavaScript wrapper module.
* @param {String} [webAssemblyOptions.wasmBinaryFile] The path of the web assembly binary file.
* @param {String} [webAssemblyOptions.fallbackModulePath] The path of the fallback JavaScript module to use if web assembly is not supported.
* @returns {Promise.<Object>} A promise that resolves to the result when the web worker has loaded and compiled the web assembly module and is ready to process tasks.
*/
TaskProcessor.prototype.initWebAssemblyModule = function (webAssemblyOptions) {
if (!defined(this._worker)) {
this._worker = createWorker(this);
}
const deferred = defer();
const processor = this;
const worker = this._worker;
getWebAssemblyLoaderConfig(this, webAssemblyOptions).then(function (
wasmConfig
) {
return Promise.resolve(canTransferArrayBuffer()).then(function (
canTransferArrayBuffer
) {
let transferableObjects;
const binary = wasmConfig.wasmBinary;
if (defined(binary) && canTransferArrayBuffer) {
transferableObjects = [binary];
}
worker.onmessage = function (event) {
worker.onmessage = function (event) {
completeTask(processor, event.data);
};
deferred.resolve(event.data);
};
worker.postMessage(
{ webAssemblyConfig: wasmConfig },
transferableObjects
);
});
});
return deferred.promise;
};
/**
* Returns true if this object was destroyed; otherwise, false.
* <br /><br />
* If this object was destroyed, it should not be used; calling any function other than
* <code>isDestroyed</code> will result in a {@link DeveloperError} exception.
*
* @returns {Boolean} True if this object was destroyed; otherwise, false.
*
* @see TaskProcessor#destroy
*/
TaskProcessor.prototype.isDestroyed = function () {
return false;
};
/**
* Destroys this object. This will immediately terminate the Worker.
* <br /><br />
* Once an object is destroyed, it should not be used; calling any function other than
* <code>isDestroyed</code> will result in a {@link DeveloperError} exception.
*/
TaskProcessor.prototype.destroy = function () {
if (defined(this._worker)) {
this._worker.terminate();
}
return destroyObject(this);
};
/**
* An event that's raised when a task is completed successfully. Event handlers are passed
* the error object is a task fails.
*
* @type {Event}
*
* @private
*/
TaskProcessor.taskCompletedEvent = taskCompletedEvent;
// exposed for testing purposes
TaskProcessor._defaultWorkerModulePrefix = "Workers/";
TaskProcessor._workerModulePrefix = TaskProcessor._defaultWorkerModulePrefix;
TaskProcessor._canTransferArrayBuffer = undefined;
export default TaskProcessor;