author | Shu-yu Guo <shu@rfrn.org> |
Fri, 11 Oct 2013 12:32:28 -0700 | |
changeset 162920 | 464e261cbcbe791df57a30053f83444519cc30a5 |
parent 162919 | 95f43cea056e8c66a1fa2642a96489b92f937f01 |
child 162921 | 976a1fe9f0807fa2a93688e9aea9afa4d4506361 |
push id | 25975 |
push user | ryanvm@gmail.com |
push date | Fri, 10 Jan 2014 19:46:47 +0000 |
treeherder | autoland@e89afc241513 [default view] [failures only] |
perfherder | [talos] [build metrics] [platform microbench] (compared to previous push) |
reviewers | shu, nmatsakis |
bugs | 919638 |
milestone | 29.0a1 |
first release with | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
last release without | nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
|
--- a/js/src/builtin/Array.js +++ b/js/src/builtin/Array.js @@ -603,16 +603,32 @@ function ArrayKeys() { */ function ComputeNumChunks(length) { var chunks = length >>> CHUNK_SHIFT; if (chunks << CHUNK_SHIFT === length) return chunks; return chunks + 1; } +#define SLICES_PER_WORKER 8 + +/** + * Compute the number of slices given an array length and the number of + * chunks. Used in tandem with the workstealing scheduler. + */ +function ComputeNumSlices(workers, length, chunks) { + if (length !== 0) { + var slices = workers * SLICES_PER_WORKER; + if (chunks < slices) + return workers; + return slices; + } + return workers; +} + /** * Computes the bounds for slice |sliceIndex| of |numItems| items, * assuming |numSlices| total slices. If numItems is not evenly * divisible by numSlices, then the final thread may have a bit of * extra work. */ function ComputeSliceBounds(numItems, sliceIndex, numSlices) { var sliceWidth = (numItems / numSlices) | 0; @@ -671,32 +687,33 @@ function ArrayMapPar(func, mode) { // - Breaking out of named blocks does not currently work (bug 684384); // - Unreachable Code Elim. can't properly handle if (a && b) (bug 669796) if (ShouldForceSequential()) break parallel; if (!TRY_PARALLEL(mode)) break parallel; var chunks = ComputeNumChunks(length); - var numSlices = ForkJoinSlices(); + var numWorkers = ForkJoinNumWorkers(); + var numSlices = ComputeNumSlices(numWorkers, length, chunks); var info = ComputeAllSliceBounds(chunks, numSlices); - ForkJoin(mapSlice, ForkJoinMode(mode)); + ForkJoin(mapSlice, ForkJoinMode(mode), numSlices); return buffer; } // Sequential fallback: ASSERT_SEQUENTIAL_IS_OK(mode); for (var i = 0; i < length; i++) { // Note: Unlike JS arrays, parallel arrays cannot have holes. var v = func(self[i], i, self); UnsafePutElements(buffer, i, v); } return buffer; - function mapSlice(sliceId, numSlices, warmup) { + function mapSlice(sliceId, warmup) { var chunkPos = info[SLICE_POS(sliceId)]; var chunkEnd = info[SLICE_END(sliceId)]; if (warmup && chunkEnd > chunkPos + 1) chunkEnd = chunkPos + 1; while (chunkPos < chunkEnd) { var indexStart = chunkPos << CHUNK_SHIFT; @@ -730,37 +747,38 @@ function ArrayReducePar(func, mode) { parallel: for (;;) { // see ArrayMapPar() to explain why for(;;) etc if (ShouldForceSequential()) break parallel; if (!TRY_PARALLEL(mode)) break parallel; var chunks = ComputeNumChunks(length); - var numSlices = ForkJoinSlices(); - if (chunks < numSlices) + var numWorkers = ForkJoinNumWorkers(); + if (chunks < numWorkers) break parallel; + var numSlices = ComputeNumSlices(numWorkers, length, chunks); var info = ComputeAllSliceBounds(chunks, numSlices); var subreductions = NewDenseArray(numSlices); - ForkJoin(reduceSlice, ForkJoinMode(mode)); + ForkJoin(reduceSlice, ForkJoinMode(mode), numSlices); var accumulator = subreductions[0]; for (var i = 1; i < numSlices; i++) accumulator = func(accumulator, subreductions[i]); return accumulator; } // Sequential fallback: ASSERT_SEQUENTIAL_IS_OK(mode); var accumulator = self[0]; for (var i = 1; i < length; i++) accumulator = func(accumulator, self[i]); return accumulator; - function reduceSlice(sliceId, numSlices, warmup) { + function reduceSlice(sliceId, warmup) { var chunkStart = info[SLICE_START(sliceId)]; var chunkPos = info[SLICE_POS(sliceId)]; var chunkEnd = info[SLICE_END(sliceId)]; // (*) This function is carefully designed so that the warmup // (which executes with chunkStart === chunkPos) will execute all // potential loads and stores. In particular, the warmup run // processes two chunks rather than one. Moreover, it stores @@ -819,23 +837,25 @@ function ArrayScanPar(func, mode) { parallel: for (;;) { // see ArrayMapPar() to explain why for(;;) etc if (ShouldForceSequential()) break parallel; if (!TRY_PARALLEL(mode)) break parallel; var chunks = ComputeNumChunks(length); - var numSlices = ForkJoinSlices(); - if (chunks < numSlices) + var numWorkers = ForkJoinNumWorkers(); + if (chunks < numWorkers) break parallel; + + var numSlices = ComputeNumSlices(numWorkers, length, chunks); var info = ComputeAllSliceBounds(chunks, numSlices); // Scan slices individually (see comment on phase1()). - ForkJoin(phase1, ForkJoinMode(mode)); + ForkJoin(phase1, ForkJoinMode(mode), numSlices); // Compute intermediates array (see comment on phase2()). var intermediates = []; var accumulator = buffer[finalElement(0)]; ARRAY_PUSH(intermediates, accumulator); for (var i = 1; i < numSlices - 1; i++) { accumulator = func(accumulator, buffer[finalElement(i)]); ARRAY_PUSH(intermediates, accumulator); @@ -845,17 +865,17 @@ function ArrayScanPar(func, mode) { // convert from chunks to indices (see comment on phase2()). for (var i = 0; i < numSlices; i++) { info[SLICE_POS(i)] = info[SLICE_START(i)] << CHUNK_SHIFT; info[SLICE_END(i)] = info[SLICE_END(i)] << CHUNK_SHIFT; } info[SLICE_END(numSlices - 1)] = std_Math_min(info[SLICE_END(numSlices - 1)], length); // Complete each slice using intermediates array (see comment on phase2()). - ForkJoin(phase2, ForkJoinMode(mode)); + ForkJoin(phase2, ForkJoinMode(mode), numSlices); return buffer; } // Sequential fallback: ASSERT_SEQUENTIAL_IS_OK(mode); scan(self[0], 0, length); return buffer; @@ -879,17 +899,17 @@ function ArrayScanPar(func, mode) { * result array like: * * [A, A+B, A+B+C, D, D+E, D+E+F, G, G+H, G+H+I] * ^~~~~~~~~~~~^ ^~~~~~~~~~~~^ ^~~~~~~~~~~~~^ * Slice 0 Slice 1 Slice 2 * * Read on in phase2 to see what we do next! */ - function phase1(sliceId, numSlices, warmup) { + function phase1(sliceId, warmup) { var chunkStart = info[SLICE_START(sliceId)]; var chunkPos = info[SLICE_POS(sliceId)]; var chunkEnd = info[SLICE_END(sliceId)]; if (warmup && chunkEnd > chunkPos + 2) chunkEnd = chunkPos + 2; if (chunkPos === chunkStart) { @@ -962,17 +982,17 @@ function ArrayScanPar(func, mode) { * SUBTLE: Because we are mutating |buffer| in place, we have to * be very careful about bailouts! We cannot checkpoint a chunk * at a time as we do elsewhere because that assumes it is safe to * replay the portion of a chunk which was already processed. * Therefore, in this phase, we track the current position at an * index granularity, although this requires two memory writes per * index. */ - function phase2(sliceId, numSlices, warmup) { + function phase2(sliceId, warmup) { if (sliceId === 0) return true; // No work to do for the 0th slice. var indexPos = info[SLICE_POS(sliceId)]; var indexEnd = info[SLICE_END(sliceId)]; if (warmup) indexEnd = std_Math_min(indexEnd, indexPos + CHUNK_SIZE); @@ -1097,33 +1117,33 @@ function ArrayScatterPar(targets, defaul ThrowError(JSMSG_PAR_ARRAY_SCATTER_CONFLICT); return conflictFunc(elem1, elem2); } function parDivideOutputRange() { var chunks = ComputeNumChunks(targetsLength); - var numSlices = ForkJoinSlices(); + var numSlices = ComputeNumSlices(ForkJoinNumWorkers(), length, chunks); var checkpoints = NewDenseArray(numSlices); for (var i = 0; i < numSlices; i++) UnsafePutElements(checkpoints, i, 0); var buffer = NewDenseArray(length); var conflicts = NewDenseArray(length); for (var i = 0; i < length; i++) { UnsafePutElements(buffer, i, defaultValue); UnsafePutElements(conflicts, i, false); } - ForkJoin(fill, ForkJoinMode(mode)); + ForkJoin(fill, ForkJoinMode(mode), numSlices); return buffer; - function fill(sliceId, numSlices, warmup) { + function fill(sliceId, warmup) { var indexPos = checkpoints[sliceId]; var indexEnd = targetsLength; if (warmup) indexEnd = std_Math_min(indexEnd, indexPos + CHUNK_SIZE); // Range in the output for which we are responsible: var [outputStart, outputEnd] = ComputeSliceBounds(length, sliceId, numSlices); @@ -1144,17 +1164,17 @@ function ArrayScatterPar(targets, defaul } function parDivideScatterVector() { // Subtle: because we will be mutating the localBuffers and // conflict arrays in place, we can never replay an entry in the // target array for fear of inducing a conflict where none existed // before. Therefore, we must proceed not by chunks but rather by // individual indices. - var numSlices = ForkJoinSlices(); + var numSlices = ComputeNumSlices(ForkJoinNumWorkers(), length, ComputeNumChunks(length)); var info = ComputeAllSliceBounds(targetsLength, numSlices); // FIXME(bug 844890): Use typed arrays here. var localBuffers = NewDenseArray(numSlices); for (var i = 0; i < numSlices; i++) UnsafePutElements(localBuffers, i, NewDenseArray(length)); var localConflicts = NewDenseArray(numSlices); for (var i = 0; i < numSlices; i++) { @@ -1167,21 +1187,21 @@ function ArrayScatterPar(targets, defaul // Initialize the 0th buffer, which will become the output. For // the other buffers, we track which parts have been written to // using the conflict buffer so they do not need to be // initialized. var outputBuffer = localBuffers[0]; for (var i = 0; i < length; i++) UnsafePutElements(outputBuffer, i, defaultValue); - ForkJoin(fill, ForkJoinMode(mode)); + ForkJoin(fill, ForkJoinMode(mode), numSlices); mergeBuffers(); return outputBuffer; - function fill(sliceId, numSlices, warmup) { + function fill(sliceId, warmup) { var indexPos = info[SLICE_POS(sliceId)]; var indexEnd = info[SLICE_END(sliceId)]; if (warmup) indexEnd = std_Math_min(indexEnd, indexPos + CHUNK_SIZE); var localbuffer = localBuffers[sliceId]; var conflicts = localConflicts[sliceId]; while (indexPos < indexEnd) { @@ -1270,43 +1290,44 @@ function ArrayFilterPar(func, mode) { parallel: for (;;) { // see ArrayMapPar() to explain why for(;;) etc if (ShouldForceSequential()) break parallel; if (!TRY_PARALLEL(mode)) break parallel; var chunks = ComputeNumChunks(length); - var numSlices = ForkJoinSlices(); - if (chunks < numSlices * 2) + var numWorkers = ForkJoinNumWorkers(); + if (chunks < numWorkers * 2) break parallel; + var numSlices = ComputeNumSlices(numWorkers, length, chunks); var info = ComputeAllSliceBounds(chunks, numSlices); // Step 1. Compute which items from each slice of the result // buffer should be preserved. When we're done, we have an array // |survivors| containing a bitset for each chunk, indicating // which members of the chunk survived. We also keep an array // |counts| containing the total number of items that are being // preserved from within one slice. // // FIXME(bug 844890): Use typed arrays here. var counts = NewDenseArray(numSlices); for (var i = 0; i < numSlices; i++) UnsafePutElements(counts, i, 0); var survivors = NewDenseArray(chunks); - ForkJoin(findSurvivorsInSlice, ForkJoinMode(mode)); + ForkJoin(findSurvivorsInSlice, ForkJoinMode(mode), numSlices); // Step 2. Compress the slices into one contiguous set. var count = 0; for (var i = 0; i < numSlices; i++) count += counts[i]; var buffer = NewDenseArray(count); if (count > 0) - ForkJoin(copySurvivorsInSlice, ForkJoinMode(mode)); + ForkJoin(copySurvivorsInSlice, ForkJoinMode(mode), numSlices); return buffer; } // Sequential fallback: ASSERT_SEQUENTIAL_IS_OK(mode); var buffer = []; for (var i = 0; i < length; i++) { @@ -1317,17 +1338,17 @@ function ArrayFilterPar(func, mode) { return buffer; /** * As described above, our goal is to determine which items we * will preserve from a given slice. We do this one chunk at a * time. When we finish a chunk, we record our current count and * the next chunk sliceId, lest we should bail. */ - function findSurvivorsInSlice(sliceId, numSlices, warmup) { + function findSurvivorsInSlice(sliceId, warmup) { var chunkPos = info[SLICE_POS(sliceId)]; var chunkEnd = info[SLICE_END(sliceId)]; if (warmup && chunkEnd > chunkPos) chunkEnd = chunkPos + 1; var count = counts[sliceId]; while (chunkPos < chunkEnd) { @@ -1344,17 +1365,17 @@ function ArrayFilterPar(func, mode) { UnsafePutElements(survivors, chunkPos, chunkBits, counts, sliceId, count, info, SLICE_POS(sliceId), ++chunkPos); } return chunkEnd === info[SLICE_END(sliceId)]; } - function copySurvivorsInSlice(sliceId, numSlices, warmup) { + function copySurvivorsInSlice(sliceId, warmup) { // Copies the survivors from this slice into the correct position. // Note that this is an idempotent operation that does not invoke // user code. Therefore, we don't expect bailouts and make an // effort to proceed chunk by chunk or avoid duplicating work. // Total up the items preserved by previous slices. var count = 0; if (sliceId > 0) { // FIXME(#819219)---work around a bug in Ion's range checks @@ -1427,28 +1448,29 @@ function ArrayStaticBuildPar(length, fun parallel: for (;;) { if (ShouldForceSequential()) break parallel; if (!TRY_PARALLEL(mode)) break parallel; var chunks = ComputeNumChunks(length); - var numSlices = ForkJoinSlices(); + var numWorkers = ForkJoinNumWorkers(); + var numSlices = ComputeNumSlices(numWorkers, length, chunks); var info = ComputeAllSliceBounds(chunks, numSlices); - ForkJoin(constructSlice, ForkJoinMode(mode)); + ForkJoin(constructSlice, ForkJoinMode(mode), numSlices); return buffer; } // Sequential fallback: ASSERT_SEQUENTIAL_IS_OK(mode); fill(0, length); return buffer; - function constructSlice(sliceId, numSlices, warmup) { + function constructSlice(sliceId, warmup) { var chunkPos = info[SLICE_POS(sliceId)]; var chunkEnd = info[SLICE_END(sliceId)]; if (warmup && chunkEnd > chunkPos) chunkEnd = chunkPos + 1; while (chunkPos < chunkEnd) { var indexStart = chunkPos << CHUNK_SHIFT;
--- a/js/src/jit-test/tests/parallel/Array-reducePar-bail.js +++ b/js/src/jit-test/tests/parallel/Array-reducePar-bail.js @@ -10,17 +10,18 @@ function testReduce() { var aCounter = 0; function sum(a, b) { var r = a + b; if (r == 234) // occurs once per slice aCounter++; return r; } - var array = build(4096, function() { return 1; }); + // We use a big array, to make sure that the test runs with 64 slices. + var array = build(8 * 4096, function() { return 1; }); var seqResult = array.reduce(sum); var seqCounter = aCounter; aCounter = 0; var parResult = array.reducePar(sum); var parCounter = aCounter; assertEq(true, parCounter >= seqCounter);
--- a/js/src/jit-test/tests/parallel/bailout-executed.js +++ b/js/src/jit-test/tests/parallel/bailout-executed.js @@ -1,14 +1,14 @@ load(libdir + "parallelarray-helpers.js"); function makeObject(e, i, c) { var v = {element: e, index: i, collection: c}; - if (e == 512) // note: happens once + if (e == 0) // note: happens once delete v.index; return v; } function test() { var array = range(0, 768); var array1 = array.map(makeObject);
--- a/js/src/vm/ForkJoin.cpp +++ b/js/src/vm/ForkJoin.cpp @@ -1,14 +1,20 @@ /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- * vim: set ts=8 sts=4 et sw=4 tw=99: * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +#if defined(XP_OS2) || defined(XP_WIN) +# include <io.h> // for isatty() +#else +# include <unistd.h> // for isatty() +#endif + #include "vm/ForkJoin.h" #include "mozilla/ThreadLocal.h" #include "jscntxt.h" #include "jslock.h" #include "jsprf.h" @@ -35,33 +41,27 @@ using mozilla::ThreadLocal; /////////////////////////////////////////////////////////////////////////// // Degenerate configurations // // When JS_THREADSAFE or JS_ION is not defined, we simply run the // |func| callback sequentially. We also forego the feedback // altogether. static bool -ExecuteSequentially(JSContext *cx_, HandleValue funVal, bool *complete); +ExecuteSequentially(JSContext *cx_, HandleValue funVal, bool *complete, uint16_t numSlices); #if !defined(JS_THREADSAFE) || !defined(JS_ION) bool js::ForkJoin(JSContext *cx, CallArgs &args) { RootedValue argZero(cx, args[0]); bool complete = false; // since warmup is false, will always complete return ExecuteSequentially(cx, argZero, &complete); } -uint32_t -js::ForkJoinSlices(JSContext *cx) -{ - return 1; // just the main thread -} - JSContext * ForkJoinSlice::acquireContext() { return nullptr; } void ForkJoinSlice::releaseContext() @@ -145,30 +145,28 @@ js::ParallelTestsShouldPass(JSContext *c #endif // !JS_THREADSAFE || !JS_ION /////////////////////////////////////////////////////////////////////////// // All configurations // // Some code that is shared between degenerate and parallel configurations. static bool -ExecuteSequentially(JSContext *cx, HandleValue funVal, bool *complete) +ExecuteSequentially(JSContext *cx, HandleValue funVal, bool *complete, uint16_t numSlices) { - uint32_t numSlices = ForkJoinSlices(cx); bool allComplete = true; - for (uint32_t i = 0; i < numSlices; i++) { + for (uint16_t i = 0; i < numSlices; i++) { FastInvokeGuard fig(cx, funVal); InvokeArgs &args = fig.args(); - if (!args.init(3)) + if (!args.init(2)) return false; args.setCallee(funVal); args.setThis(UndefinedValue()); args[0].setInt32(i); - args[1].setInt32(numSlices); - args[2].setBoolean(!!cx->runtime()->parallelWarmup); + args[1].setBoolean(!!cx->runtime()->parallelWarmup); if (!fig.invoke(cx)) return false; allComplete = allComplete & args.rval().toBoolean(); } *complete = allComplete; return true; } @@ -225,29 +223,29 @@ enum ForkJoinMode { // Expects all parallel executions to yield a bailout. If this is not // the case, reports an error. ForkJoinModeBailout, NumForkJoinModes }; -class ParallelDo +class ForkJoinOperation { public: // For tests, make sure to keep this in sync with minItemsTestingThreshold. static const uint32_t MAX_BAILOUTS = 3; uint32_t bailouts; // Information about the bailout: ParallelBailoutCause bailoutCause; RootedScript bailoutScript; jsbytecode *bailoutBytecode; - ParallelDo(JSContext *cx, HandleObject fun, ForkJoinMode mode); + ForkJoinOperation(JSContext *cx, HandleObject fun, ForkJoinMode mode, uint16_t numSlices); ExecutionStatus apply(); private: // Most of the functions involved in managing the parallel // compilation follow a similar control-flow. They return RedLight // if they have either encountered a fatal error or completed the // execution, such that no further work is needed. In that event, // they take an `ExecutionStatus*` which they use to report @@ -280,16 +278,17 @@ class ParallelDo }; JSContext *cx_; HandleObject fun_; Vector<ParallelBailoutRecord, 16> bailoutRecords_; AutoScriptVector worklist_; Vector<WorklistData, 16> worklistData_; ForkJoinMode mode_; + uint16_t numSlices_; TrafficLight enqueueInitialScript(ExecutionStatus *status); TrafficLight compileForParallelExecution(ExecutionStatus *status); TrafficLight warmupExecution(bool stopIfComplete, ExecutionStatus *status); TrafficLight parallelExecution(ExecutionStatus *status); TrafficLight sequentialExecution(bool disqualified, ExecutionStatus *status); TrafficLight recoverFromBailout(ExecutionStatus *status); @@ -299,111 +298,83 @@ class ParallelDo ExecutionStatus sequentialExecution(bool disqualified); TrafficLight appendCallTargetsToWorklist(uint32_t index, ExecutionStatus *status); TrafficLight appendCallTargetToWorklist(HandleScript script, ExecutionStatus *status); bool addToWorklist(HandleScript script); inline bool hasScript(Vector<types::RecompileInfo> &scripts, JSScript *script); -}; // class ParallelDo +}; // class ForkJoinOperation -class ForkJoinShared : public TaskExecutor, public Monitor +class ForkJoinShared : public ParallelJob, public Monitor { ///////////////////////////////////////////////////////////////////////// // Constant fields JSContext *const cx_; // Current context ThreadPool *const threadPool_; // The thread pool. HandleObject fun_; // The JavaScript function to execute. - const uint32_t numSlices_; // Total number of threads. - PRCondVar *rendezvousEnd_; // Cond. var used to signal end of rendezvous. + uint16_t numSlices_; // Total number of slices. Dynamically changed PRLock *cxLock_; // Locks cx_ for parallel VM calls. ParallelBailoutRecord *const records_; // Bailout records for each slice ///////////////////////////////////////////////////////////////////////// // Per-thread arenas // // Each worker thread gets an arena to use when allocating. Vector<Allocator *, 16> allocators_; ///////////////////////////////////////////////////////////////////////// // Locked Fields // // Only to be accessed while holding the lock. - uint32_t uncompleted_; // Number of uncompleted worker threads - uint32_t blocked_; // Number of threads that have joined rendezvous - uint32_t rendezvousIndex_; // Number of rendezvous attempts bool gcRequested_; // True if a worker requested a GC JS::gcreason::Reason gcReason_; // Reason given to request GC Zone *gcZone_; // Zone for GC, or nullptr for full ///////////////////////////////////////////////////////////////////////// // Asynchronous Flags // // These can be read without the lock (hence the |volatile| declaration). // All fields should be *written with the lock*, however. // Set to true when parallel execution should abort. volatile bool abort_; // Set to true when a worker bails for a fatal reason. volatile bool fatal_; - // The main thread has requested a rendezvous. - volatile bool rendezvous_; - - // Invoked only from the main thread: - void executeFromMainThread(); - - // Executes slice #threadId of the work, either from a worker or - // the main thread. - void executePortion(PerThreadData *perThread, uint32_t threadId); - - // Rendezvous protocol: - // - // Use AutoRendezvous rather than invoking initiateRendezvous() and - // endRendezvous() directly. - - friend class AutoRendezvous; - - // Requests that the other threads stop. Must be invoked from the main - // thread. - void initiateRendezvous(ForkJoinSlice &threadCx); - - // If a rendezvous has been requested, blocks until the main thread says - // we may continue. - void joinRendezvous(ForkJoinSlice &threadCx); - - // Permits other threads to resume execution. Must be invoked from the - // main thread after a call to initiateRendezvous(). - void endRendezvous(ForkJoinSlice &threadCx); - public: ForkJoinShared(JSContext *cx, ThreadPool *threadPool, HandleObject fun, - uint32_t numSlices, - uint32_t uncompleted, + uint16_t numSlices, ParallelBailoutRecord *records); ~ForkJoinShared(); bool init(); ParallelResult execute(); // Invoked from parallel worker threads: - virtual void executeFromWorker(uint32_t threadId, uintptr_t stackLimit); + virtual bool executeFromWorker(uint16_t sliceId, uint32_t workerId, + uintptr_t stackLimit) MOZ_OVERRIDE; + + // Invoked only from the main thread: + virtual bool executeFromMainThread(uint16_t sliceId) MOZ_OVERRIDE; - // Moves all the per-thread arenas into the main compartment and - // processes any pending requests for a GC. This can only safely - // be invoked on the main thread, either during a rendezvous or - // after the workers have completed. + // Executes slice |sliceId| either from a worker or the main thread. + void executePortion(PerThreadData *perThread, uint16_t sliceId, uint32_t workerId); + + // Moves all the per-thread arenas into the main compartment and processes + // any pending requests for a GC. This can only safely be invoked on the + // main thread after the workers have completed. void transferArenasToCompartmentAndProcessGCRequests(); // Invoked during processing by worker threads to "check in". bool check(ForkJoinSlice &threadCx); // Requests a GC, either full or specific to a zone. void requestGC(JS::gcreason::Reason reason); void requestZoneGC(JS::Zone *zone, JS::gcreason::Reason reason); @@ -426,33 +397,16 @@ class AutoEnterWarmup { JSRuntime *runtime_; public: AutoEnterWarmup(JSRuntime *runtime) : runtime_(runtime) { runtime_->parallelWarmup++; } ~AutoEnterWarmup() { runtime_->parallelWarmup--; } }; -class AutoRendezvous -{ - private: - ForkJoinSlice &threadCx; - - public: - AutoRendezvous(ForkJoinSlice &threadCx) - : threadCx(threadCx) - { - threadCx.shared->initiateRendezvous(threadCx); - } - - ~AutoRendezvous() { - threadCx.shared->endRendezvous(threadCx); - } -}; - class AutoSetForkJoinSlice { public: AutoSetForkJoinSlice(ForkJoinSlice *threadCx) { ForkJoinSlice::tlsForkJoinSlice.set(threadCx); } ~AutoSetForkJoinSlice() { @@ -495,39 +449,40 @@ ForkJoinActivation::ForkJoinActivation(J } ForkJoinActivation::~ForkJoinActivation() { cx_->mainThread().ionTop = prevIonTop_; } /////////////////////////////////////////////////////////////////////////// -// js::ForkJoin() and ParallelDo class +// js::ForkJoin() and ForkJoinOperation class // // These are the top-level objects that manage the parallel execution. // They handle parallel compilation (if necessary), triggering // parallel execution, and recovering from bailouts. static const char *ForkJoinModeString(ForkJoinMode mode); bool js::ForkJoin(JSContext *cx, CallArgs &args) { - JS_ASSERT(args[0].isObject()); // else the self-hosted code is wrong + JS_ASSERT(args.length() == 3); // else the self-hosted code is wrong + JS_ASSERT(args[0].isObject()); JS_ASSERT(args[0].toObject().is<JSFunction>()); - - ForkJoinMode mode = ForkJoinModeNormal; - if (args.length() > 1) { - JS_ASSERT(args[1].isInt32()); // else the self-hosted code is wrong - JS_ASSERT(args[1].toInt32() < NumForkJoinModes); - mode = (ForkJoinMode) args[1].toInt32(); - } + JS_ASSERT(args[1].isInt32()); + JS_ASSERT(args[1].toInt32() < NumForkJoinModes); + JS_ASSERT(args[2].isInt32()); RootedObject fun(cx, &args[0].toObject()); - ParallelDo op(cx, fun, mode); + ForkJoinMode mode = (ForkJoinMode) args[1].toInt32(); + uint32_t numSlices = args[2].toInt32(); + MOZ_ASSERT(uint32_t(uint16_t(numSlices)) == numSlices); + + ForkJoinOperation op(cx, fun, mode, numSlices); ExecutionStatus status = op.apply(); if (status == ExecutionFatal) return false; switch (mode) { case ForkJoinModeNormal: case ForkJoinModeCompile: return true; @@ -576,33 +531,33 @@ ForkJoinModeString(ForkJoinMode mode) { case ForkJoinModeParallel: return "parallel"; case ForkJoinModeRecover: return "recover"; case ForkJoinModeBailout: return "bailout"; case NumForkJoinModes: return "max"; } return "???"; } -js::ParallelDo::ParallelDo(JSContext *cx, - HandleObject fun, - ForkJoinMode mode) +js::ForkJoinOperation::ForkJoinOperation(JSContext *cx, HandleObject fun, ForkJoinMode mode, + uint16_t numSlices) : bailouts(0), bailoutCause(ParallelBailoutNone), bailoutScript(cx), bailoutBytecode(nullptr), cx_(cx), fun_(fun), bailoutRecords_(cx), worklist_(cx), worklistData_(cx), - mode_(mode) + mode_(mode), + numSlices_(numSlices) { } ExecutionStatus -js::ParallelDo::apply() +js::ForkJoinOperation::apply() { ExecutionStatus status; // High level outline of the procedure: // // - As we enter, we check for parallel script without "uncompiled" flag. // - If present, skip initial enqueue. // - While not too many bailouts: @@ -622,24 +577,25 @@ js::ParallelDo::apply() // - Invalidate any scripts that may need to be invalidated // - Re-enqueue main script and any uncompiled scripts that were called // - Too many bailouts: Fallback to sequential JS_ASSERT_IF(!jit::IsBaselineEnabled(cx_), !jit::IsIonEnabled(cx_)); if (!jit::IsBaselineEnabled(cx_) || !jit::IsIonEnabled(cx_)) return sequentialExecution(true); - SpewBeginOp(cx_, "ParallelDo"); + SpewBeginOp(cx_, "ForkJoinOperation"); - uint32_t slices = ForkJoinSlices(cx_); + // How many workers do we have, counting the main thread. + unsigned numWorkersWithMain = cx_->runtime()->threadPool.numWorkers() + 1; - if (!bailoutRecords_.resize(slices)) + if (!bailoutRecords_.resize(numWorkersWithMain)) return SpewEndOp(ExecutionFatal); - for (uint32_t i = 0; i < slices; i++) + for (uint32_t i = 0; i < numWorkersWithMain; i++) bailoutRecords_[i].init(cx_); if (enqueueInitialScript(&status) == RedLight) return SpewEndOp(status); Spew(SpewOps, "Execution mode: %s", ForkJoinModeString(mode_)); switch (mode_) { case ForkJoinModeNormal: @@ -659,17 +615,17 @@ js::ParallelDo::apply() } break; case NumForkJoinModes: MOZ_ASSUME_UNREACHABLE("Invalid mode"); } while (bailouts < MAX_BAILOUTS) { - for (uint32_t i = 0; i < slices; i++) + for (uint32_t i = 0; i < numWorkersWithMain; i++) bailoutRecords_[i].reset(cx_); if (compileForParallelExecution(&status) == RedLight) return SpewEndOp(status); JS_ASSERT(worklist_.length() == 0); if (parallelExecution(&status) == RedLight) return SpewEndOp(status); @@ -677,18 +633,18 @@ js::ParallelDo::apply() if (recoverFromBailout(&status) == RedLight) return SpewEndOp(status); } // After enough tries, just execute sequentially. return SpewEndOp(sequentialExecution(true)); } -js::ParallelDo::TrafficLight -js::ParallelDo::enqueueInitialScript(ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::enqueueInitialScript(ExecutionStatus *status) { // GreenLight: script successfully enqueued if necessary // RedLight: fatal error or fell back to sequential // The kernel should be a self-hosted function. if (!fun_->is<JSFunction>()) return sequentialExecution(true, status); @@ -715,18 +671,18 @@ js::ParallelDo::enqueueInitialScript(Exe } // Otherwise, add to the worklist of scripts to process. if (addToWorklist(script) == RedLight) return fatalError(status); return GreenLight; } -js::ParallelDo::TrafficLight -js::ParallelDo::compileForParallelExecution(ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::compileForParallelExecution(ExecutionStatus *status) { // GreenLight: all scripts compiled // RedLight: fatal error or completed work via warmups or fallback // This routine attempts to do whatever compilation is necessary // to execute a single parallel attempt. When it returns, either // (1) we have fallen back to sequential; (2) we have run enough // warmup runs to complete all the work; or (3) we have compiled @@ -905,19 +861,18 @@ js::ParallelDo::compileForParallelExecut JS_ASSERT(worklistData_[i].stallCount >= stallThreshold); } } worklist_.clear(); worklistData_.clear(); return GreenLight; } -js::ParallelDo::TrafficLight -js::ParallelDo::appendCallTargetsToWorklist(uint32_t index, - ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::appendCallTargetsToWorklist(uint32_t index, ExecutionStatus *status) { // GreenLight: call targets appended // RedLight: fatal error or completed work via warmups or fallback JS_ASSERT(worklist_[index]->hasParallelIonScript()); // Check whether we have already enqueued the targets for // this entry and avoid doing it again if so. @@ -935,19 +890,18 @@ js::ParallelDo::appendCallTargetsToWorkl target->filename(), target->lineno()); if (appendCallTargetToWorklist(target, status) == RedLight) return RedLight; } return GreenLight; } -js::ParallelDo::TrafficLight -js::ParallelDo::appendCallTargetToWorklist(HandleScript script, - ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::appendCallTargetToWorklist(HandleScript script, ExecutionStatus *status) { // GreenLight: call target appended if necessary // RedLight: fatal error or completed work via warmups or fallback JS_ASSERT(script); // Fallback to sequential if disabled. if (!script->canParallelIonCompile()) { @@ -967,17 +921,17 @@ js::ParallelDo::appendCallTargetToWorkli if (!addToWorklist(script)) return fatalError(status); return GreenLight; } bool -js::ParallelDo::addToWorklist(HandleScript script) +js::ForkJoinOperation::addToWorklist(HandleScript script) { for (uint32_t i = 0; i < worklist_.length(); i++) { if (worklist_[i] == script) { Spew(SpewCompile, "Skipping %p:%s:%u, already in worklist", script.get(), script->filename(), script->lineno()); return true; } } @@ -995,46 +949,46 @@ js::ParallelDo::addToWorklist(HandleScri // we have not yet enqueued the callees of this script if (!worklistData_.append(WorklistData())) return false; worklistData_[worklistData_.length() - 1].reset(); return true; } -js::ParallelDo::TrafficLight -js::ParallelDo::sequentialExecution(bool disqualified, ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::sequentialExecution(bool disqualified, ExecutionStatus *status) { // RedLight: fatal error or completed work *status = sequentialExecution(disqualified); return RedLight; } ExecutionStatus -js::ParallelDo::sequentialExecution(bool disqualified) +js::ForkJoinOperation::sequentialExecution(bool disqualified) { // XXX use disqualified to set parallelIon to ION_DISABLED_SCRIPT? Spew(SpewOps, "Executing sequential execution (disqualified=%d).", disqualified); bool complete = false; RootedValue funVal(cx_, ObjectValue(*fun_)); - if (!ExecuteSequentially(cx_, funVal, &complete)) + if (!ExecuteSequentially(cx_, funVal, &complete, numSlices_)) return ExecutionFatal; // When invoked without the warmup flag set to true, the kernel // function OUGHT to complete successfully, barring an exception. JS_ASSERT(complete); return ExecutionSequential; } -js::ParallelDo::TrafficLight -js::ParallelDo::fatalError(ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::fatalError(ExecutionStatus *status) { // RedLight: fatal error *status = ExecutionFatal; return RedLight; } static const char * @@ -1076,17 +1030,17 @@ BailoutExplanation(ParallelBailoutCause case ParallelBailoutRequestedZoneGC: return "requested zone GC"; default: return "no known reason"; } } void -js::ParallelDo::determineBailoutCause() +js::ForkJoinOperation::determineBailoutCause() { bailoutCause = ParallelBailoutNone; for (uint32_t i = 0; i < bailoutRecords_.length(); i++) { if (bailoutRecords_[i].cause == ParallelBailoutNone) continue; if (bailoutRecords_[i].cause == ParallelBailoutInterrupt) continue; @@ -1114,17 +1068,17 @@ js::ParallelDo::determineBailoutCause() Spew(SpewBailouts, "Bailout from thread %d: cause %d, unknown loc", i, bailoutCause); } } } bool -js::ParallelDo::invalidateBailedOutScripts() +js::ForkJoinOperation::invalidateBailedOutScripts() { Vector<types::RecompileInfo> invalid(cx_); for (uint32_t i = 0; i < bailoutRecords_.length(); i++) { RootedScript script(cx_, bailoutRecords_[i].topScript); // No script to invalidate. if (!script || !script->hasParallelIonScript()) continue; @@ -1166,61 +1120,58 @@ js::ParallelDo::invalidateBailedOutScrip return false; } Invalidate(cx_, invalid); return true; } -js::ParallelDo::TrafficLight -js::ParallelDo::warmupExecution(bool stopIfComplete, - ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::warmupExecution(bool stopIfComplete, ExecutionStatus *status) { // GreenLight: warmup succeeded, still more work to do // RedLight: fatal error or warmup completed all work (check status) Spew(SpewOps, "Executing warmup."); AutoEnterWarmup warmup(cx_->runtime()); RootedValue funVal(cx_, ObjectValue(*fun_)); bool complete; - if (!ExecuteSequentially(cx_, funVal, &complete)) { + if (!ExecuteSequentially(cx_, funVal, &complete, numSlices_)) { *status = ExecutionFatal; return RedLight; } if (complete && stopIfComplete) { Spew(SpewOps, "Warmup execution finished all the work."); *status = ExecutionWarmup; return RedLight; } return GreenLight; } -js::ParallelDo::TrafficLight -js::ParallelDo::parallelExecution(ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::parallelExecution(ExecutionStatus *status) { // GreenLight: bailout occurred, keep trying // RedLight: fatal error or all work completed // Recursive use of the ThreadPool is not supported. Right now we // cannot get here because parallel code cannot invoke native // functions such as ForkJoin(). JS_ASSERT(ForkJoinSlice::current() == nullptr); ForkJoinActivation activation(cx_); ThreadPool *threadPool = &cx_->runtime()->threadPool; - uint32_t numSlices = ForkJoinSlices(cx_); RootedObject rootedFun(cx_, fun_); - ForkJoinShared shared(cx_, threadPool, rootedFun, numSlices, numSlices - 1, - &bailoutRecords_[0]); + ForkJoinShared shared(cx_, threadPool, rootedFun, numSlices_, &bailoutRecords_[0]); if (!shared.init()) { *status = ExecutionFatal; return RedLight; } switch (shared.execute()) { case TP_SUCCESS: *status = ExecutionParallel; @@ -1233,18 +1184,18 @@ js::ParallelDo::parallelExecution(Execut case TP_RETRY_SEQUENTIALLY: case TP_RETRY_AFTER_GC: break; // bailout } return GreenLight; } -js::ParallelDo::TrafficLight -js::ParallelDo::recoverFromBailout(ExecutionStatus *status) +js::ForkJoinOperation::TrafficLight +js::ForkJoinOperation::recoverFromBailout(ExecutionStatus *status) { // GreenLight: bailout recovered, try to compile-and-run again // RedLight: fatal error bailouts += 1; determineBailoutCause(); SpewBailout(bailouts, bailoutScript, bailoutBytecode, bailoutCause); @@ -1262,17 +1213,17 @@ js::ParallelDo::recoverFromBailout(Execu if (warmupExecution(/*stopIfComplete:*/true, status) == RedLight) return RedLight; return GreenLight; } bool -js::ParallelDo::hasScript(Vector<types::RecompileInfo> &scripts, JSScript *script) +js::ForkJoinOperation::hasScript(Vector<types::RecompileInfo> &scripts, JSScript *script) { for (uint32_t i = 0; i < scripts.length(); i++) { if (scripts[i] == script->parallelIonScript()->recompileInfo()) return true; } return false; } @@ -1319,36 +1270,30 @@ class ParallelIonInvoke ///////////////////////////////////////////////////////////////////////////// // ForkJoinShared // ForkJoinShared::ForkJoinShared(JSContext *cx, ThreadPool *threadPool, HandleObject fun, - uint32_t numSlices, - uint32_t uncompleted, + uint16_t numSlices, ParallelBailoutRecord *records) : cx_(cx), threadPool_(threadPool), fun_(fun), numSlices_(numSlices), - rendezvousEnd_(nullptr), cxLock_(nullptr), records_(records), allocators_(cx), - uncompleted_(uncompleted), - blocked_(0), - rendezvousIndex_(0), gcRequested_(false), gcReason_(JS::gcreason::NUM_REASONS), gcZone_(nullptr), abort_(false), - fatal_(false), - rendezvous_(false) + fatal_(false) { } bool ForkJoinShared::init() { // Create temporary arenas to hold the data allocated during the // parallel code. @@ -1358,43 +1303,36 @@ ForkJoinShared::init() // because when executing parallel code we sometimes check what // arena list an object is in to decide if it is writable. If we // used the compartment |Allocator| for the main thread, then the // main thread would be permitted to write to any object it wants. if (!Monitor::init()) return false; - rendezvousEnd_ = PR_NewCondVar(lock_); - if (!rendezvousEnd_) - return false; - cxLock_ = PR_NewLock(); if (!cxLock_) return false; - for (unsigned i = 0; i < numSlices_; i++) { + for (unsigned i = 0; i < (threadPool_->numWorkers() + 1); i++) { Allocator *allocator = cx_->new_<Allocator>(cx_->zone()); if (!allocator) return false; if (!allocators_.append(allocator)) { js_delete(allocator); return false; } } return true; } ForkJoinShared::~ForkJoinShared() { - if (rendezvousEnd_) - PR_DestroyCondVar(rendezvousEnd_); - PR_DestroyLock(cxLock_); while (allocators_.length() > 0) js_delete(allocators_.popCopy()); } ParallelResult ForkJoinShared::execute() @@ -1402,111 +1340,114 @@ ForkJoinShared::execute() // Sometimes a GC request occurs *just before* we enter into the // parallel section. Rather than enter into the parallel section // and then abort, we just check here and abort early. if (cx_->runtime()->interrupt) return TP_RETRY_SEQUENTIALLY; AutoLockMonitor lock(*this); - // Notify workers to start and execute one portion on this thread. + ParallelResult jobResult = TP_SUCCESS; { AutoUnlockMonitor unlock(*this); - if (!threadPool_->submitAll(cx_, this)) + + // Push parallel tasks and wait until they're all done. + jobResult = threadPool_->executeJob(cx_, this, numSlices_); + if (jobResult == TP_FATAL) return TP_FATAL; - executeFromMainThread(); } - // Wait for workers to complete. - while (uncompleted_ > 0) - lock.wait(); - transferArenasToCompartmentAndProcessGCRequests(); // Check if any of the workers failed. if (abort_) { if (fatal_) return TP_FATAL; - else - return TP_RETRY_SEQUENTIALLY; + return TP_RETRY_SEQUENTIALLY; } +#ifdef DEBUG + Spew(SpewOps, "Completed parallel job [slices %d, threads: %d (+1), stolen: %d (work stealing:%s)]", + numSlices_, + threadPool_->numWorkers(), + threadPool_->stolenSlices(), + threadPool_->workStealing() ? "ON" : "OFF"); +#endif + // Everything went swimmingly. Give yourself a pat on the back. - return TP_SUCCESS; + return jobResult; } void ForkJoinShared::transferArenasToCompartmentAndProcessGCRequests() { JSCompartment *comp = cx_->compartment(); - for (unsigned i = 0; i < numSlices_; i++) + for (unsigned i = 0; i < (threadPool_->numWorkers() + 1); i++) comp->adoptWorkerAllocator(allocators_[i]); if (gcRequested_) { if (!gcZone_) TriggerGC(cx_->runtime(), gcReason_); else TriggerZoneGC(gcZone_, gcReason_); gcRequested_ = false; gcZone_ = nullptr; } } -void -ForkJoinShared::executeFromWorker(uint32_t workerId, uintptr_t stackLimit) +bool +ForkJoinShared::executeFromWorker(uint16_t sliceId, uint32_t workerId, uintptr_t stackLimit) { - JS_ASSERT(workerId < numSlices_ - 1); + JS_ASSERT(sliceId <= numSlices_); PerThreadData thisThread(cx_->runtime()); if (!thisThread.init()) { setAbortFlag(true); - return; + return false; } TlsPerThreadData.set(&thisThread); // Don't use setIonStackLimit() because that acquires the ionStackLimitLock, and the // lock has not been initialized in these cases. thisThread.ionStackLimit = stackLimit; - executePortion(&thisThread, workerId); + executePortion(&thisThread, sliceId, workerId); TlsPerThreadData.set(nullptr); - AutoLockMonitor lock(*this); - uncompleted_ -= 1; - if (blocked_ == uncompleted_) { - // Signal the main thread that we have terminated. It will be either - // working, arranging a rendezvous, or waiting for workers to - // complete. - lock.notify(); - } + return !abort_; +} + +bool +ForkJoinShared::executeFromMainThread(uint16_t sliceId) +{ + executePortion(&cx_->mainThread(), sliceId, threadPool_->numWorkers()); + return !abort_; } void -ForkJoinShared::executeFromMainThread() -{ - executePortion(&cx_->mainThread(), numSlices_ - 1); -} - -void -ForkJoinShared::executePortion(PerThreadData *perThread, - uint32_t threadId) +ForkJoinShared::executePortion(PerThreadData *perThread, uint16_t sliceId, uint32_t workerId) { // WARNING: This code runs ON THE PARALLEL WORKER THREAD. - // Therefore, it should NOT access `cx_` in any way! + // Be careful when accessing cx_. // ForkJoinSlice already contains an AutoAssertNoGC; however, the analysis // does not propagate this type information. We duplicate the assertion // here for maximum clarity. JS::AutoAssertNoGC nogc(runtime()); - Allocator *allocator = allocators_[threadId]; - ForkJoinSlice slice(perThread, threadId, numSlices_, allocator, - this, &records_[threadId]); + Allocator *allocator = allocators_[workerId]; + ForkJoinSlice slice(perThread, sliceId, workerId, allocator, this, &records_[workerId]); AutoSetForkJoinSlice autoContext(&slice); - Spew(SpewOps, "Up"); +#ifdef DEBUG + // Set the maximum worker and slice number for prettier spewing. + slice.maxSliceId = numSlices_ - 1; + slice.maxWorkerId = threadPool_->numWorkers(); +#endif + + Spew(SpewOps, "Slice up"); // Make a new IonContext for the slice, which is needed if we need to // re-enter the VM. IonContext icx(CompileRuntime::get(cx_->runtime()), CompileCompartment::get(cx_->compartment()), nullptr); JS_ASSERT(slice.bailoutRecord->topScript == nullptr); @@ -1518,29 +1459,28 @@ ForkJoinShared::executePortion(PerThread // Sometimes, particularly with GCZeal, the parallel ion // script can be collected between starting the parallel // op and reaching this point. In that case, we just fail // and fallback. Spew(SpewOps, "Down (Script no longer present)"); slice.bailoutRecord->setCause(ParallelBailoutMainScriptNotPresent); setAbortFlag(false); } else { - ParallelIonInvoke<3> fii(cx_->runtime(), callee, 3); + ParallelIonInvoke<2> fii(cx_->runtime(), callee, 2); fii.args[0] = Int32Value(slice.sliceId); - fii.args[1] = Int32Value(slice.numSlices); - fii.args[2] = BooleanValue(false); + fii.args[1] = BooleanValue(false); bool ok = fii.invoke(perThread); JS_ASSERT(ok == !slice.bailoutRecord->topScript); if (!ok) setAbortFlag(false); } - Spew(SpewOps, "Down"); + Spew(SpewOps, "Slice down"); } bool ForkJoinShared::check(ForkJoinSlice &slice) { JS_ASSERT(cx_->runtime()->interrupt); if (abort_) @@ -1550,115 +1490,26 @@ ForkJoinShared::check(ForkJoinSlice &sli JS_ASSERT(!cx_->runtime()->gcIsNeeded); if (cx_->runtime()->interrupt) { // The GC Needed flag should not be set during parallel // execution. Instead, one of the requestGC() or // requestZoneGC() methods should be invoked. JS_ASSERT(!cx_->runtime()->gcIsNeeded); - // If interrupt is requested, bring worker threads to a halt, - // service the interrupt, then let them start back up again. - // AutoRendezvous autoRendezvous(slice); - // if (!js_HandleExecutionInterrupt(cx_)) - // return setAbortFlag(true); slice.bailoutRecord->setCause(ParallelBailoutInterrupt); setAbortFlag(false); return false; } - } else if (rendezvous_) { - joinRendezvous(slice); } return true; } void -ForkJoinShared::initiateRendezvous(ForkJoinSlice &slice) -{ - // The rendezvous protocol is always initiated by the main thread. The - // main thread sets the rendezvous flag to true. Seeing this flag, other - // threads will invoke |joinRendezvous()|, which causes them to (1) read - // |rendezvousIndex| and (2) increment the |blocked| counter. Once the - // |blocked| counter is equal to |uncompleted|, all parallel threads have - // joined the rendezvous, and so the main thread is signaled. That will - // cause this function to return. - // - // Some subtle points: - // - // - Worker threads may potentially terminate their work before they see - // the rendezvous flag. In this case, they would decrement - // |uncompleted| rather than incrementing |blocked|. Either way, if the - // two variables become equal, the main thread will be notified - // - // - The |rendezvousIndex| counter is used to detect the case where the - // main thread signals the end of the rendezvous and then starts another - // rendezvous before the workers have a chance to exit. We circumvent - // this by having the workers read the |rendezvousIndex| counter as they - // enter the rendezvous, and then they only block until that counter is - // incremented. Another alternative would be for the main thread to - // block in |endRendezvous()| until all workers have exited, but that - // would be slower and involve unnecessary synchronization. - // - // Note that the main thread cannot ever get more than one rendezvous - // ahead of the workers, because it must wait for all of them to enter - // the rendezvous before it can end it, so the solution of using a - // counter is perfectly general and we need not fear rollover. - - JS_ASSERT(slice.isMainThread()); - JS_ASSERT(!rendezvous_ && blocked_ == 0); - JS_ASSERT(cx_->runtime()->interrupt); - - AutoLockMonitor lock(*this); - - // Signal other threads we want to start a rendezvous. - rendezvous_ = true; - - // Wait until all the other threads blocked themselves. - while (blocked_ != uncompleted_) - lock.wait(); -} - -void -ForkJoinShared::joinRendezvous(ForkJoinSlice &slice) -{ - JS_ASSERT(!slice.isMainThread()); - JS_ASSERT(rendezvous_); - - AutoLockMonitor lock(*this); - const uint32_t index = rendezvousIndex_; - blocked_ += 1; - - // If we're the last to arrive, let the main thread know about it. - if (blocked_ == uncompleted_) - lock.notify(); - - // Wait until the main thread terminates the rendezvous. We use a - // separate condition variable here to distinguish between workers - // notifying the main thread that they have completed and the main - // thread notifying the workers to resume. - while (rendezvousIndex_ == index) - PR_WaitCondVar(rendezvousEnd_, PR_INTERVAL_NO_TIMEOUT); -} - -void -ForkJoinShared::endRendezvous(ForkJoinSlice &slice) -{ - JS_ASSERT(slice.isMainThread()); - - AutoLockMonitor lock(*this); - rendezvous_ = false; - blocked_ = 0; - rendezvousIndex_++; - - // Signal other threads that rendezvous is over. - PR_NotifyAllCondVar(rendezvousEnd_); -} - -void ForkJoinShared::setAbortFlag(bool fatal) { AutoLockMonitor lock(*this); abort_ = true; fatal_ = fatal_ || fatal; // Note: DontStopIon here avoids the expensive memory protection needed to @@ -1695,22 +1546,22 @@ ForkJoinShared::requestZoneGC(JS::Zone * } } ///////////////////////////////////////////////////////////////////////////// // ForkJoinSlice // ForkJoinSlice::ForkJoinSlice(PerThreadData *perThreadData, - uint32_t sliceId, uint32_t numSlices, + uint16_t sliceId, uint32_t workerId, Allocator *allocator, ForkJoinShared *shared, ParallelBailoutRecord *bailoutRecord) : ThreadSafeContext(shared->runtime(), perThreadData, Context_ForkJoin), sliceId(sliceId), - numSlices(numSlices), + workerId(workerId), bailoutRecord(bailoutRecord), shared(shared), acquiredContext_(false), nogc_(shared->runtime()) { /* * Unsafely set the zone. This is used to track malloc counters and to * trigger GCs and is otherwise not thread-safe to access. @@ -1789,25 +1640,16 @@ ForkJoinSlice::requestZoneGC(JS::Zone *z bool ForkJoinSlice::setPendingAbortFatal(ParallelBailoutCause cause) { shared->setPendingAbortFatal(); bailoutRecord->setCause(cause); return false; } -///////////////////////////////////////////////////////////////////////////// - -uint32_t -js::ForkJoinSlices(JSContext *cx) -{ - // Parallel workers plus this main thread. - return cx->runtime()->threadPool.numWorkers() + 1; -} - ////////////////////////////////////////////////////////////////////////////// // ParallelBailoutRecord void js::ParallelBailoutRecord::init(JSContext *cx) { reset(cx); } @@ -1903,16 +1745,29 @@ MethodStatusToString(MethodStatus status case Method_Skipped: return "skipped"; case Method_Compiled: return "compiled"; } return "(unknown status)"; } +static unsigned +NumberOfDigits(unsigned n) +{ + if (n == 0) + return 1; + unsigned d = 0; + while (n != 0) { + d++; + n /= 10; + } + return d; +} + static const size_t BufferSize = 4096; class ParallelSpewer { uint32_t depth; bool colorable; bool active[NumSpewChannels]; @@ -1923,17 +1778,17 @@ class ParallelSpewer } const char *reset() { return color("\x1b[0m"); } const char *bold() { return color("\x1b[1m"); } const char *red() { return color("\x1b[31m"); } const char *green() { return color("\x1b[32m"); } const char *yellow() { return color("\x1b[33m"); } const char *cyan() { return color("\x1b[36m"); } - const char *sliceColor(uint32_t id) { + const char *workerColor(uint32_t id) { static const char *colors[] = { "\x1b[7m\x1b[31m", "\x1b[7m\x1b[32m", "\x1b[7m\x1b[33m", "\x1b[7m\x1b[34m", "\x1b[7m\x1b[35m", "\x1b[7m\x1b[36m", "\x1b[7m\x1b[37m", "\x1b[31m", "\x1b[32m", "\x1b[33m", "\x1b[34m", "\x1b[35m", "\x1b[36m", "\x1b[37m" }; @@ -1957,17 +1812,17 @@ class ParallelSpewer active[SpewBailouts] = true; if (strstr(env, "full")) { for (uint32_t i = 0; i < NumSpewChannels; i++) active[i] = true; } } env = getenv("TERM"); - if (env) { + if (env && isatty(fileno(stderr))) { if (strcmp(env, "xterm-color") == 0 || strcmp(env, "xterm-256color") == 0) colorable = true; } } bool isActive(js::parallel::SpewChannel channel) { return active[channel]; } @@ -1976,18 +1831,23 @@ class ParallelSpewer if (!active[channel]) return; // Print into a buffer first so we use one fprintf, which usually // doesn't get interrupted when running with multiple threads. char buf[BufferSize]; if (ForkJoinSlice *slice = ForkJoinSlice::current()) { - JS_snprintf(buf, BufferSize, "[%sParallel:%u%s] ", - sliceColor(slice->sliceId), slice->sliceId, reset()); + // Print the format first into a buffer to right-justify the + // worker and slice ids. + char bufbuf[BufferSize]; + JS_snprintf(bufbuf, BufferSize, "[%%sParallel:%%0%du(%%0%du)%%s] ", + NumberOfDigits(slice->maxWorkerId), NumberOfDigits(slice->maxSliceId)); + JS_snprintf(buf, BufferSize, bufbuf, workerColor(slice->workerId), + slice->workerId, slice->sliceId, reset()); } else { JS_snprintf(buf, BufferSize, "[Parallel:M] "); } for (uint32_t i = 0; i < depth; i++) JS_snprintf(buf + strlen(buf), BufferSize, " "); JS_vsnprintf(buf + strlen(buf), BufferSize, fmt, ap);
--- a/js/src/vm/ForkJoin.h +++ b/js/src/vm/ForkJoin.h @@ -25,41 +25,39 @@ // shared memory (as distinct from Web Workers). The idea is that you have // some (typically data-parallel) operation which you wish to execute in // parallel across as many threads as you have available. // // The ForkJoin abstraction is intended to be used by self-hosted code // to enable parallel execution. At the top-level, it consists of a native // function (exposed as the ForkJoin intrinsic) that is used like so: // -// ForkJoin(func, feedback) +// ForkJoin(func, feedback, N) // -// The intention of this statement is to start N copies of |func()| -// running in parallel. Each copy will then do 1/Nth of the total -// work. Here N is number of workers in the threadpool (see -// ThreadPool.h---by default, N is the number of cores on the -// computer). +// The intention of this statement is to start |N| copies of |func()| +// running in parallel. Each copy will then do more or less 1/Nth of +// the total work, depending on workstealing-based load balancing. // -// Typically, each of the N slices will execute from a different -// worker thread, but that is not something you should rely upon---if -// we implement work-stealing, for example, then it could be that a -// single worker thread winds up handling multiple slices. +// Typically, each of the N slices runs in a different worker thread, +// but that is not something you should rely upon---if work-stealing +// is enabled it could be that a single worker thread winds up +// handling multiple slices. // // The second argument, |feedback|, is an optional callback that will // receiver information about how execution proceeded. This is // intended for use in unit testing but also for providing feedback to // users. Note that gathering the data to provide to |feedback| is // not free and so execution will run somewhat slower if |feedback| is // provided. // // func() should expect the following arguments: // // func(id, n, warmup) // -// Here, |id| is the slice id. |n| is the total number of slices. The +// Here, |id| is the slice id. |n| is the total number of slices. The // parameter |warmup| is true for a *warmup or recovery phase*. // Warmup phases are discussed below in more detail, but the general // idea is that if |warmup| is true, |func| should only do a fixed // amount of work. If |warmup| is false, |func| should try to do all // remaining work is assigned. // // Note that we implicitly assume that |func| is tracking how much // work it has accomplished thus far; some techniques for doing this @@ -114,24 +112,18 @@ // - If more than a fixed number of bailouts occur, we give up on // parallelization and just invoke |func()| N times in a row (once // for each worker) but with |warmup| set to false. // // Operation callback: // // During parallel execution, |slice.check()| must be periodically // invoked to check for the operation callback. This is automatically -// done by the ion-generated code. If the operation callback is -// necessary, |slice.check()| will arrange a rendezvous---that is, as -// each active worker invokes |check()|, it will come to a halt until -// everyone is blocked (Stop The World). At this point, we perform -// the callback on the main thread, and then resume execution. If a -// worker thread terminates before calling |check()|, that's fine too. -// We assume that you do not do unbounded work without invoking -// |check()|. +// done by the Ion-generated code. If the operation callback is +// necessary, |slice.check()| abort the parallel execution. // // Transitive compilation: // // One of the challenges for parallel compilation is that we // (currently) have to abort when we encounter an uncompiled script. // Therefore, we try to compile everything that might be needed // beforehand. The exact strategy is described in `ParallelDo::apply()` // in ForkJoin.cpp, but at the highest level the idea is: @@ -183,27 +175,34 @@ // incremental GC terminology). However, to be safe, we also block // upon entering a parallel section to ensure that any concurrent // marking or incremental GC has completed. // // In the future, it should be possible to lift the restriction that // we must block until inc. GC has completed and also to permit GC // during parallel exeution. But we're not there yet. // +// Load balancing (work stealing): + +// The ForkJoin job is dynamically divided into a fixed number of slices, +// and is submitted for parallel execution in the pool. When the number +// of slices is big enough (typically greater than the number of workers +// in the pool) -and the workload is unbalanced- each worker thread +// will perform load balancing through work stealing. The number +// of slices is computed by the self-hosted function |ComputeNumSlices| +// and can be used to know how many slices will be executed by the +// runtime for an array of the given size. +// // Current Limitations: // // - The API does not support recursive or nested use. That is, the // JavaScript function given to |ForkJoin| should not itself invoke // |ForkJoin()|. Instead, use the intrinsic |InParallelSection()| to // check for recursive use and execute a sequential fallback. // -// - No load balancing is performed between worker threads. That means that -// the fork-join system is best suited for problems that can be slice into -// uniform bits. -// /////////////////////////////////////////////////////////////////////////// namespace js { class ForkJoinActivation : public Activation { uint8_t *prevIonTop_; @@ -217,20 +216,16 @@ class ForkJoinActivation : public Activa ForkJoinActivation(JSContext *cx); ~ForkJoinActivation(); }; class ForkJoinSlice; bool ForkJoin(JSContext *cx, CallArgs &args); -// Returns the number of slices that a fork-join op will have when -// executed. -uint32_t ForkJoinSlices(JSContext *cx); - struct IonLIRTraceData { uint32_t blockIndex; uint32_t lirIndex; uint32_t execModeInt; const char *lirOpName; const char *mirOpName; JSScript *script; jsbytecode *pc; @@ -301,31 +296,35 @@ struct ParallelBailoutRecord { jsbytecode *pc); }; struct ForkJoinShared; class ForkJoinSlice : public ThreadSafeContext { public: - // Which slice should you process? Ranges from 0 to |numSlices|. - const uint32_t sliceId; + // The slice that is being processed. + const uint16_t sliceId; - // How many slices are there in total? - const uint32_t numSlices; + // The worker that is doing the work. + const uint32_t workerId; // Bailout record used to record the reason this thread stopped executing ParallelBailoutRecord *const bailoutRecord; #ifdef DEBUG // Records the last instr. to execute on this thread. IonLIRTraceData traceData; + + // The maximum worker and slice id. + uint16_t maxSliceId; + uint32_t maxWorkerId; #endif - ForkJoinSlice(PerThreadData *perThreadData, uint32_t sliceId, uint32_t numSlices, + ForkJoinSlice(PerThreadData *perThreadData, uint16_t sliceId, uint32_t workerId, Allocator *allocator, ForkJoinShared *shared, ParallelBailoutRecord *bailoutRecord); // True if this is the main thread, false if it is one of the parallel workers. bool isMainThread() const; // When the code would normally trigger a GC, we don't trigger it // immediately but instead record that request here. This will @@ -375,17 +374,16 @@ class ForkJoinSlice : public ThreadSafeC // Check the current state of parallel execution. static inline ForkJoinSlice *current(); // Initializes the thread-local state. static bool initialize(); private: - friend class AutoRendezvous; friend class AutoSetForkJoinSlice; // Initialized by initialize() static mozilla::ThreadLocal<ForkJoinSlice*> tlsForkJoinSlice; ForkJoinShared *const shared; bool acquiredContext_;
--- a/js/src/vm/Monitor.h +++ b/js/src/vm/Monitor.h @@ -28,16 +28,21 @@ class Monitor { protected: friend class AutoLockMonitor; friend class AutoUnlockMonitor; PRLock *lock_; PRCondVar *condVar_; + + void assertIsHoldingLock() const { + PR_ASSERT_CURRENT_THREAD_OWNS_LOCK(lock_); + } + public: Monitor() : lock_(nullptr), condVar_(nullptr) { } ~Monitor() { #ifdef JS_THREADSAFE
--- a/js/src/vm/Runtime.cpp +++ b/js/src/vm/Runtime.cpp @@ -378,16 +378,19 @@ JSRuntime::init(uint32_t maxbytes) #endif if (!mainThread.init()) return false; js::TlsPerThreadData.set(&mainThread); mainThread.addToThreadList(); + if (!threadPool.init()) + return false; + if (!js_InitGC(this, maxbytes)) return false; if (!gcMarker.init(gcMode())) return false; const char *size = getenv("JSGC_MARK_STACK_LIMIT"); if (size)
--- a/js/src/vm/SelfHosting.cpp +++ b/js/src/vm/SelfHosting.cpp @@ -297,24 +297,24 @@ const JSJitInfo intrinsic_ParallelSpew_j static bool intrinsic_ForkJoin(JSContext *cx, unsigned argc, Value *vp) { CallArgs args = CallArgsFromVp(argc, vp); return ForkJoin(cx, args); } /* - * ForkJoinSlices(): Returns the number of parallel slices that will - * be created by ForkJoin(). + * ForkJoinWorkerNumWorkers(): Returns the number of workers in the fork join + * thread pool, including the main thread. */ static bool -intrinsic_ForkJoinSlices(JSContext *cx, unsigned argc, Value *vp) +intrinsic_ForkJoinNumWorkers(JSContext *cx, unsigned argc, Value *vp) { CallArgs args = CallArgsFromVp(argc, vp); - args.rval().setInt32(ForkJoinSlices(cx)); + args.rval().setInt32(cx->runtime()->threadPool.numWorkers() + 1); return true; } /* * NewDenseArray(length): Allocates and returns a new dense array with * the given length where all values are initialized to holes. */ bool @@ -615,17 +615,17 @@ static const JSFunctionSpec intrinsic_fu JS_FN("NewArrayIterator", intrinsic_NewArrayIterator, 0,0), JS_FN("IsArrayIterator", intrinsic_IsArrayIterator, 1,0), JS_FN("NewStringIterator", intrinsic_NewStringIterator, 0,0), JS_FN("IsStringIterator", intrinsic_IsStringIterator, 1,0), JS_FN("ForkJoin", intrinsic_ForkJoin, 2,0), - JS_FN("ForkJoinSlices", intrinsic_ForkJoinSlices, 0,0), + JS_FN("ForkJoinNumWorkers", intrinsic_ForkJoinNumWorkers, 0,0), JS_FN("NewDenseArray", intrinsic_NewDenseArray, 1,0), JS_FN("ShouldForceSequential", intrinsic_ShouldForceSequential, 0,0), JS_FN("ParallelTestsShouldPass", intrinsic_ParallelTestsShouldPass, 0,0), // See builtin/TypedObject.h for descriptors of the typedobj functions. JS_FN("NewTypedHandle", js::NewTypedHandle, 1, 0),
--- a/js/src/vm/ThreadPool.cpp +++ b/js/src/vm/ThreadPool.cpp @@ -1,94 +1,213 @@ /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- * vim: set ts=8 sts=4 et sw=4 tw=99: * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "vm/ThreadPool.h" +#include "mozilla/Atomics.h" + #include "jslock.h" +#include "vm/ForkJoin.h" #include "vm/Monitor.h" #include "vm/Runtime.h" using namespace js; +const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024; + +///////////////////////////////////////////////////////////////////////////// +// ThreadPoolBaseWorker +// +// Base class for worker threads in the pool. + +class js::ThreadPoolBaseWorker +{ + protected: + const uint32_t workerId_; + ThreadPool *pool_; + + private: + // Slices this thread is responsible for. + // + // This a uint32 composed of two uint16s (the lower and upper bounds) so + // that we may do a single CAS. See {Compose,Decompose}SliceBounds + // functions below. + mozilla::Atomic<uint32_t, mozilla::ReleaseAcquire> sliceBounds_; + + protected: + static uint32_t ComposeSliceBounds(uint16_t from, uint16_t to) { + MOZ_ASSERT(from <= to); + return (uint32_t(from) << 16) | to; + } + + static void DecomposeSliceBounds(uint32_t bounds, uint16_t *from, uint16_t *to) { + *from = bounds >> 16; + *to = bounds & uint16_t(~0); + MOZ_ASSERT(*from <= *to); + } + + bool hasWork() const { + uint16_t from, to; + DecomposeSliceBounds(sliceBounds_, &from, &to); + return from != to; + } + + bool popSliceFront(uint16_t *sliceId); + bool popSliceBack(uint16_t *sliceId); + bool stealFrom(ThreadPoolBaseWorker *victim, uint16_t *sliceId); + + public: + ThreadPoolBaseWorker(uint32_t workerId, ThreadPool *pool) + : workerId_(workerId), + pool_(pool), + sliceBounds_(0) + { } + + void submitSlices(uint16_t sliceFrom, uint16_t sliceTo) { + MOZ_ASSERT(!hasWork()); + MOZ_ASSERT(sliceFrom < sliceTo); + sliceBounds_ = ComposeSliceBounds(sliceFrom, sliceTo); + } + + void abort(); +}; + ///////////////////////////////////////////////////////////////////////////// // ThreadPoolWorker // -// Each |ThreadPoolWorker| just hangs around waiting for items to be added -// to its |worklist_|. Whenever something is added, it gets executed. -// Once the worker's state is set to |TERMINATING|, the worker will -// exit as soon as its queue is empty. +// Each |ThreadPoolWorker| just hangs around waiting for slices to be added to +// its worklist. Whenever something is added, it gets executed. Once the +// worker's state is set to |TERMINATED|, the worker will exit as soon as its +// queue is empty. -static const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024; - -class js::ThreadPoolWorker : public Monitor +class js::ThreadPoolWorker : public ThreadPoolBaseWorker { - const size_t workerId_; + friend class ThreadPoolMainWorker; // Current point in the worker's lifecycle. // // Modified only while holding the ThreadPoolWorker's lock. - enum WorkerState { - CREATED, ACTIVE, TERMINATING, TERMINATED + volatile enum WorkerState { + CREATED, ACTIVE, TERMINATED } state_; - // Worklist for this thread. - // - // Modified only while holding the ThreadPoolWorker's lock. - js::Vector<TaskExecutor*, 4, SystemAllocPolicy> worklist_; - // The thread's main function static void ThreadMain(void *arg); void run(); + // Get a slice of work, from ourself or steal work from other workers + // (or from the main thread). + bool getSlice(uint16_t *sliceId); + public: - ThreadPoolWorker(size_t workerId); - ~ThreadPoolWorker(); - - bool init(); + ThreadPoolWorker(uint32_t workerId, ThreadPool *pool) + : ThreadPoolBaseWorker(workerId, pool), + state_(CREATED) + { } // Invoked from main thread; signals worker to start. bool start(); - // Submit work to be executed. If this returns true, you are guaranteed - // that the task will execute before the thread-pool terminates (barring - // an infinite loop in some prior task). - bool submit(TaskExecutor *task); - - // Invoked from main thread; signals worker to terminate and blocks until - // termination completes. + // Invoked from main thread; signals the worker loop to return. void terminate(); }; -ThreadPoolWorker::ThreadPoolWorker(size_t workerId) - : workerId_(workerId), - state_(CREATED), - worklist_() -{ } +// ThreadPoolMainWorker +// +// This class abstracts the main thread as a worker thread with a private +// queue to allow for work stealing. + +class js::ThreadPoolMainWorker : public ThreadPoolBaseWorker +{ + friend class ThreadPoolWorker; + + // Get a slice of work, from ourself or steal work from other workers. + bool getSlice(uint16_t *sliceId); + + public: + ThreadPoolMainWorker(ThreadPool *pool) + : ThreadPoolBaseWorker(0, pool) + { } -ThreadPoolWorker::~ThreadPoolWorker() -{ } + // Execute a job on the main thread. + void executeJob(); +}; + +bool +ThreadPoolBaseWorker::popSliceFront(uint16_t *sliceId) +{ + uint32_t bounds; + uint16_t from, to; + do { + bounds = sliceBounds_; + DecomposeSliceBounds(bounds, &from, &to); + if (from == to) + return false; + } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from + 1, to))); + + *sliceId = from; + pool_->pendingSlices_--; + return true; +} bool -ThreadPoolWorker::init() +ThreadPoolBaseWorker::popSliceBack(uint16_t *sliceId) +{ + uint32_t bounds; + uint16_t from, to; + do { + bounds = sliceBounds_; + DecomposeSliceBounds(bounds, &from, &to); + if (from == to) + return false; + } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from, to - 1))); + + *sliceId = to - 1; + pool_->pendingSlices_--; + return true; +} + +void +ThreadPoolBaseWorker::abort() { - return Monitor::init(); + uint32_t bounds; + uint16_t from, to; + do { + bounds = sliceBounds_; + DecomposeSliceBounds(bounds, &from, &to); + } while (!sliceBounds_.compareExchange(bounds, 0)); + + pool_->pendingSlices_ -= to - from; +} + +bool +ThreadPoolBaseWorker::stealFrom(ThreadPoolBaseWorker *victim, uint16_t *sliceId) +{ + // Instead of popping the slice from the front by incrementing sliceFrom_, + // decrement sliceTo_. Usually this gives us better locality. + if (!victim->popSliceBack(sliceId)) + return false; +#ifdef DEBUG + pool_->stolenSlices_++; +#endif + return true; } bool ThreadPoolWorker::start() { #ifndef JS_THREADSAFE return false; #else - JS_ASSERT(state_ == CREATED); + MOZ_ASSERT(state_ == CREATED); // Set state to active now, *before* the thread starts: state_ = ACTIVE; if (!PR_CreateThread(PR_USER_THREAD, ThreadMain, this, PR_PRIORITY_NORMAL, PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, @@ -101,184 +220,357 @@ ThreadPoolWorker::start() return true; #endif } void ThreadPoolWorker::ThreadMain(void *arg) { - ThreadPoolWorker *thread = (ThreadPoolWorker*) arg; - thread->run(); + ThreadPoolWorker *worker = (ThreadPoolWorker*) arg; + worker->run(); +} + +bool +ThreadPoolWorker::getSlice(uint16_t *sliceId) +{ + // First see whether we have any work ourself. + if (popSliceFront(sliceId)) + return true; + + // Try to steal work. + if (!pool_->workStealing()) + return false; + + ThreadPoolBaseWorker *victim; + do { + if (!pool_->hasWork()) + return false; + + // Add one to add the main thread into the mix. + uint32_t victimId = rand() % (pool_->numWorkers() + 1); + + // By convention consider worker id 0 the main thread. + if (victimId == 0) + victim = pool_->mainWorker_; + else + victim = pool_->workers_[victimId - 1]; + } while (!stealFrom(victim, sliceId)); + + return true; } void ThreadPoolWorker::run() { // This is hokey in the extreme. To compute the stack limit, // subtract the size of the stack from the address of a local // variable and give a 10k buffer. Is there a better way? // (Note: 2k proved to be fine on Mac, but too little on Linux) uintptr_t stackLimitOffset = WORKER_THREAD_STACK_SIZE - 10*1024; uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) + stackLimitOffset * JS_STACK_GROWTH_DIRECTION); - AutoLockMonitor lock(*this); + for (;;) { + // Wait for work to arrive or for us to terminate. + { + AutoLockMonitor lock(*pool_); + while (state_ == ACTIVE && !pool_->hasWork()) + lock.wait(); - for (;;) { - while (!worklist_.empty()) { - TaskExecutor *task = worklist_.popCopy(); - { - // Unlock so that new things can be added to the - // worklist while we are processing the current item: - AutoUnlockMonitor unlock(*this); - task->executeFromWorker(workerId_, stackLimit); + if (state_ == TERMINATED) { + pool_->join(); + return; + } + + pool_->activeWorkers_++; + } + + ParallelJob *job = pool_->job(); + uint16_t sliceId; + while (getSlice(&sliceId)) { + if (!job->executeFromWorker(sliceId, workerId_, stackLimit)) { + pool_->abortJob(); + break; } } - if (state_ == TERMINATING) - break; - - JS_ASSERT(state_ == ACTIVE); - - lock.wait(); + // Join the pool. + { + AutoLockMonitor lock(*pool_); + pool_->join(); + } } - - JS_ASSERT(worklist_.empty() && state_ == TERMINATING); - state_ = TERMINATED; - lock.notify(); -} - -bool -ThreadPoolWorker::submit(TaskExecutor *task) -{ - AutoLockMonitor lock(*this); - JS_ASSERT(state_ == ACTIVE); - if (!worklist_.append(task)) - return false; - lock.notify(); - return true; } void ThreadPoolWorker::terminate() { - AutoLockMonitor lock(*this); + MOZ_ASSERT(state_ != TERMINATED); + pool_->assertIsHoldingLock(); + state_ = TERMINATED; +} + +void +ThreadPoolMainWorker::executeJob() +{ + ParallelJob *job = pool_->job(); + uint16_t sliceId; + while (getSlice(&sliceId)) { + if (!job->executeFromMainThread(sliceId)) { + pool_->abortJob(); + return; + } + } +} - if (state_ == CREATED) { - state_ = TERMINATED; - return; - } else if (state_ == ACTIVE) { - state_ = TERMINATING; - lock.notify(); - while (state_ != TERMINATED) - lock.wait(); - } else { - JS_ASSERT(state_ == TERMINATED); - } +bool +ThreadPoolMainWorker::getSlice(uint16_t *sliceId) +{ + // First see whether we have any work ourself. + if (popSliceFront(sliceId)) + return true; + + // Try to steal work. + if (!pool_->workStealing()) + return false; + + // Pick a random target with work left over. + ThreadPoolWorker *victim; + do { + if (!pool_->hasWork()) + return false; + + victim = pool_->workers_[rand() % pool_->numWorkers()]; + } while (!stealFrom(victim, sliceId)); + + return true; } ///////////////////////////////////////////////////////////////////////////// // ThreadPool // // The |ThreadPool| starts up workers, submits work to them, and shuts // them down when requested. ThreadPool::ThreadPool(JSRuntime *rt) - : runtime_(rt) -{ -} + : runtime_(rt), + mainWorker_(nullptr), + activeWorkers_(0), + joinBarrier_(nullptr), + job_(nullptr), +#ifdef DEBUG + stolenSlices_(0), +#endif + pendingSlices_(0) +{ } ThreadPool::~ThreadPool() { terminateWorkers(); + if (joinBarrier_) + PR_DestroyCondVar(joinBarrier_); +} + +bool +ThreadPool::init() +{ +#ifdef JS_THREADSAFE + if (!Monitor::init()) + return false; + joinBarrier_ = PR_NewCondVar(lock_); + return !!joinBarrier_; +#else + return true; +#endif } -size_t +uint32_t ThreadPool::numWorkers() const { - return runtime_->workerThreadCount(); + // Subtract one for the main thread, which always exists. + return runtime_->cpuCount() - 1; +} + +bool +ThreadPool::workStealing() const +{ +#ifdef DEBUG + if (char *stealEnv = getenv("JS_THREADPOOL_STEAL")) + return !!strtol(stealEnv, nullptr, 10); +#endif + + return true; } bool ThreadPool::lazyStartWorkers(JSContext *cx) { // Starts the workers if they have not already been started. If // something goes wrong, reports an error and ensures that all // partially started threads are terminated. Therefore, upon exit // from this function, the workers array is either full (upon // success) or empty (upon failure). -#ifndef JS_THREADSAFE - return true; -#else +#ifdef JS_THREADSAFE if (!workers_.empty()) { - JS_ASSERT(workers_.length() == numWorkers()); + MOZ_ASSERT(workers_.length() == numWorkers()); return true; } // Allocate workers array and then start the worker threads. // Note that numWorkers() is the number of *desired* workers, // but workers_.length() is the number of *successfully // initialized* workers. - for (size_t workerId = 0; workerId < numWorkers(); workerId++) { - ThreadPoolWorker *worker = js_new<ThreadPoolWorker>(workerId); - if (!worker) { + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { + ThreadPoolWorker *worker = cx->new_<ThreadPoolWorker>(workerId, this); + if (!worker || !workers_.append(worker)) { terminateWorkersAndReportOOM(cx); return false; } - if (!worker->init() || !workers_.append(worker)) { - js_delete(worker); - terminateWorkersAndReportOOM(cx); - return false; - } - if (!worker->start()) { + } + + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { + if (!workers_[workerId]->start()) { // Note: do not delete worker here because it has been // added to the array and hence will be deleted by // |terminateWorkersAndReportOOM()|. terminateWorkersAndReportOOM(cx); return false; } } +#endif return true; -#endif } void ThreadPool::terminateWorkersAndReportOOM(JSContext *cx) { terminateWorkers(); - JS_ASSERT(workers_.empty()); - JS_ReportOutOfMemory(cx); + MOZ_ASSERT(workers_.empty()); + js_ReportOutOfMemory(cx); } void ThreadPool::terminateWorkers() { - while (workers_.length() > 0) { - ThreadPoolWorker *worker = workers_.popCopy(); - worker->terminate(); - js_delete(worker); + if (workers_.length() > 0) { + AutoLockMonitor lock(*this); + + // Signal to the workers they should quit. + for (uint32_t i = 0; i < workers_.length(); i++) + workers_[i]->terminate(); + + // Wake up all the workers. Set the number of active workers to the + // current number of workers so we can make sure they all join. + activeWorkers_ = workers_.length(); + lock.notifyAll(); + + // Wait for all workers to join. + waitForWorkers(); + + while (workers_.length() > 0) + js_delete(workers_.popCopy()); } + + js_delete(mainWorker_); } -bool -ThreadPool::submitAll(JSContext *cx, TaskExecutor *executor) -{ - JS_ASSERT(CurrentThreadCanAccessRuntime(runtime_)); - - if (!lazyStartWorkers(cx)) - return false; - - for (size_t id = 0; id < numWorkers(); id++) { - if (!workers_[id]->submit(executor)) - return false; - } - return true; -} - -bool +void ThreadPool::terminate() { terminateWorkers(); - return true; +} + +void +ThreadPool::join() +{ +#ifdef JS_THREADSAFE + assertIsHoldingLock(); + if (--activeWorkers_ == 0) + PR_NotifyCondVar(joinBarrier_); +#endif +} + +void +ThreadPool::waitForWorkers() +{ +#ifdef JS_THREADSAFE + assertIsHoldingLock(); + while (activeWorkers_ > 0) { + mozilla::DebugOnly<PRStatus> status = + PR_WaitCondVar(joinBarrier_, PR_INTERVAL_NO_TIMEOUT); + MOZ_ASSERT(status == PR_SUCCESS); + } + job_ = nullptr; +#endif } + +ParallelResult +ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t numSlices) +{ + MOZ_ASSERT(CurrentThreadCanAccessRuntime(runtime_)); + MOZ_ASSERT(activeWorkers_ == 0); + MOZ_ASSERT(!hasWork()); + + // Create the main thread worker and off-main-thread workers if necessary. + if (!mainWorker_) { + mainWorker_ = cx->new_<ThreadPoolMainWorker>(this); + if (!mainWorker_) { + terminateWorkersAndReportOOM(cx); + return TP_FATAL; + } + } + + if (!lazyStartWorkers(cx)) + return TP_FATAL; + + // Evenly distribute slices to the workers. + uint16_t slicesPerWorker = numSlices / (numWorkers() + 1); + uint16_t leftover = numSlices % slicesPerWorker; + uint16_t sliceFrom = 0; + uint16_t sliceTo = 0; + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { + if (leftover > 0) { + sliceTo += slicesPerWorker + 1; + leftover--; + } else { + sliceTo += slicesPerWorker; + } + workers_[workerId]->submitSlices(sliceFrom, sliceTo); + sliceFrom = sliceTo; + } + MOZ_ASSERT(leftover == 0); + mainWorker_->submitSlices(sliceFrom, sliceFrom + slicesPerWorker); + + // Notify the worker threads that there's work now. + { + job_ = job; + pendingSlices_ = numSlices; +#ifdef DEBUG + stolenSlices_ = 0; +#endif + AutoLockMonitor lock(*this); + lock.notifyAll(); + } + + // Do work on the main thread. + mainWorker_->executeJob(); + + // Wait for all threads to join. While there are no pending slices at this + // point, the slices themselves may not be finished processing. + { + AutoLockMonitor lock(*this); + waitForWorkers(); + } + + // Everything went swimmingly. Give yourself a pat on the back. + return TP_SUCCESS; +} + +void +ThreadPool::abortJob() +{ + mainWorker_->abort(); + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) + workers_[workerId]->abort(); +}
--- a/js/src/vm/ThreadPool.h +++ b/js/src/vm/ThreadPool.h @@ -2,85 +2,154 @@ * vim: set ts=8 sts=4 et sw=4 tw=99: * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #ifndef vm_ThreadPool_h #define vm_ThreadPool_h -#include <stddef.h> -#include <stdint.h> +#include "mozilla/Atomics.h" #include "jsalloc.h" #include "jslock.h" #include "jspubtd.h" #include "js/Vector.h" +#include "vm/Monitor.h" struct JSRuntime; struct JSCompartment; namespace js { +class ThreadPoolBaseWorker; class ThreadPoolWorker; +class ThreadPoolMainWorker; -typedef void (*TaskFun)(void *userdata, uint32_t workerId, uintptr_t stackLimit); - -class TaskExecutor +// A ParallelJob is the main runnable abstraction in the ThreadPool. +// ParallelJobs are composed of one or more slices. Each slice is executed by +// the pool by calling one of the execute method with the unique |sliceId| +// as argument. The pool executes multiple slices in parallel. +class ParallelJob { public: - virtual void executeFromWorker(uint32_t workerId, uintptr_t stackLimit) = 0; + virtual bool executeFromWorker(uint16_t sliceId, uint32_t workerId, uintptr_t stackLimit) = 0; + virtual bool executeFromMainThread(uint16_t sliceId) = 0; }; -// ThreadPool used for parallel JavaScript execution as well as -// parallel compilation. Unless you are building a new kind of -// parallel service, it is very likely that you do not wish to -// interact with the threadpool directly. In particular, if you wish -// to execute JavaScript in parallel, you probably want to look at -// |js::ForkJoin| in |forkjoin.cpp|. +// ThreadPool used for parallel JavaScript execution. Unless you are building +// a new kind of parallel service, it is very likely that you do not wish to +// interact with the threadpool directly. In particular, if you wish to +// execute JavaScript in parallel, you probably want to look at |js::ForkJoin| +// in |forkjoin.cpp|. +// +// The ThreadPool always maintains a fixed pool of worker threads. You can +// query the number of worker threads via the method |numWorkers()|. Note +// that this number may be zero (generally if threads are disabled, or when +// manually specified for benchmarking purposes). +// +// The way to submit a job is using |executeJob()|---in this case, the job +// will be executed by all worker threads, including the main thread. This +// does not fail if there are no worker threads, it simply runs all the work +// using the main thread only. // -// The ThreadPool always maintains a fixed pool of worker threads. -// You can query the number of worker threads via the method -// |numWorkers()|. Note that this number may be zero (generally if -// threads are disabled, or when manually specified for benchmarking -// purposes). +// Of course, each thread may have any number of previously submitted things +// that they are already working on, and so they will finish those before they +// get to this job. Therefore it is possible to have some worker threads pick +// up (and even finish) their piece of the job before others have even +// started. The main thread is also used by the pool as a worker thread. // -// The way to submit a job is using |submitAll()|---in this -// case, the job will be executed by all worker threads. This does -// not fail if there are no worker threads, it simply does nothing. -// Of course, each thread may have any number of previously submitted -// things that they are already working on, and so they will finish -// those before they get to this job. Therefore it is possible to -// have some worker threads pick up (and even finish) their piece of -// the job before others have even started. -class ThreadPool +// The ThreadPool supports work stealing. Every time a worker completes all +// the slices in its local queue, it tries to acquire some work from other +// workers (including the main thread). Execution terminates when there is no +// work left to be done, i.e., when all the workers have an empty queue. The +// stealing algorithm operates in 2 phases: (1) workers process all the slices +// in their local queue, and then (2) workers try to steal from other peers. +// Since workers start to steal only *after* they have completed all the +// slices in their queue, the design is particularly convenient in the context +// of Fork/Join-like parallelism, where workers receive a bunch of slices to +// be done at the very beginning of the job, and have to wait until all the +// threads have joined back. During phase (1) there is no synchronization +// overhead between workers introduced by the stealing algorithm, and +// therefore the execution overhead introduced is almost zero with balanced +// workloads. The way a |ParallelJob| is divided into multiple slices has to +// be specified by the instance implementing the job (e.g., |ForkJoinShared| +// in |ForkJoin.cpp|). + +class ThreadPool : public Monitor { private: + friend class ThreadPoolBaseWorker; friend class ThreadPoolWorker; + friend class ThreadPoolMainWorker; + + // Initialized at startup only. + JSRuntime *const runtime_; + + // Worker threads and the main thread worker have different + // logic. Initialized lazily. + js::Vector<ThreadPoolWorker *, 8, SystemAllocPolicy> workers_; + ThreadPoolMainWorker *mainWorker_; - // Initialized at startup only: - JSRuntime *const runtime_; - js::Vector<ThreadPoolWorker*, 8, SystemAllocPolicy> workers_; + // The number of active workers. Should only access under lock. + uint32_t activeWorkers_; + PRCondVar *joinBarrier_; + + // The current job. + ParallelJob *job_; + +#ifdef DEBUG + // Number of stolen slices in the last parallel job. + mozilla::Atomic<uint32_t, mozilla::ReleaseAcquire> stolenSlices_; +#endif + + // Number of pending slices in the current job. + mozilla::Atomic<uint32_t, mozilla::ReleaseAcquire> pendingSlices_; bool lazyStartWorkers(JSContext *cx); void terminateWorkers(); void terminateWorkersAndReportOOM(JSContext *cx); + void join(); + void waitForWorkers(); public: ThreadPool(JSRuntime *rt); ~ThreadPool(); - // Return number of worker threads in the pool. - size_t numWorkers() const; + bool init(); + + // Return number of worker threads in the pool, not counting the main thread. + uint32_t numWorkers() const; + + // Returns whether we have any pending slices. + bool hasWork() const { return pendingSlices_ != 0; } - // See comment on class: - bool submitAll(JSContext *cx, TaskExecutor *executor); + // Returns the current job. Must have one. + ParallelJob *job() const { + MOZ_ASSERT(job_); + return job_; + } + + // Returns whether or not the scheduler should perform work stealing. + bool workStealing() const; + +#ifdef DEBUG + // Return the number of stolen slices in the last parallel job. + uint16_t stolenSlices() { return stolenSlices_; } +#endif // Wait until all worker threads have finished their current set - // of jobs and then return. You must not submit new jobs after + // of slices and then return. You must not submit new jobs after // invoking |terminate()|. - bool terminate(); + void terminate(); + + // Execute the given ParallelJob using the main thread and any available worker. + // Blocks until the main thread has completed execution. + ParallelResult executeJob(JSContext *cx, ParallelJob *job, uint16_t numSlices); + + // Abort the current job. + void abortJob(); }; } // namespace js #endif /* vm_ThreadPool_h */