blob: 0d535708277cd32b5e8db77abe913f7fad156e49 [file] [log] [blame]
/*
* This file is part of the Jikes RVM project (http://jikesrvm.org).
*
* This file is licensed to You under the Eclipse Public License (EPL);
* You may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.opensource.org/licenses/eclipse-1.0.php
*
* See the COPYRIGHT.txt file distributed with this work for information
* regarding copyright ownership.
*/
package org.mmtk.utility.deque;
import org.mmtk.policy.RawPageSpace;
import org.mmtk.policy.Space;
import org.mmtk.utility.Constants;
import org.mmtk.utility.Log;
import org.mmtk.vm.Lock;
import org.mmtk.vm.VM;
import org.vmmagic.pragma.Entrypoint;
import org.vmmagic.pragma.Inline;
import org.vmmagic.pragma.Uninterruptible;
import org.vmmagic.unboxed.Address;
import org.vmmagic.unboxed.Offset;
/**
* This supports <i>unsynchronized</i> enqueuing and dequeuing of buffers
* for shared use. The data can be added to and removed from either end
* of the deque.
*/
@Uninterruptible
public class SharedDeque extends Deque implements Constants {
private static final boolean DISABLE_WAITING = true;
private static final Offset NEXT_OFFSET = Offset.zero();
private static final Offset PREV_OFFSET = Offset.fromIntSignExtend(BYTES_IN_ADDRESS);
private static final boolean TRACE = false;
private static final boolean TRACE_DETAIL = false;
private static final boolean TRACE_BLOCKERS = false;
/****************************************************************************
*
* Public instance methods
*/
/**
* Constructor
*/
public SharedDeque(String name, RawPageSpace rps, int arity) {
this.rps = rps;
this.arity = arity;
this.name = name;
lock = VM.newLock("SharedDeque");
clearCompletionFlag();
head = HEAD_INITIAL_VALUE;
tail = TAIL_INITIAL_VALUE;
}
/** Get the arity (words per entry) of this queue */
@Inline
final int getArity() { return arity; }
/**
* Enqueue a block on the head or tail of the shared queue
*
* @param buf
* @param arity
* @param toTail
*/
final void enqueue(Address buf, int arity, boolean toTail) {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
lock();
if (toTail) {
// Add to the tail of the queue
setNext(buf, Address.zero());
if (tail.EQ(TAIL_INITIAL_VALUE))
head = buf;
else
setNext(tail, buf);
setPrev(buf, tail);
tail = buf;
} else {
// Add to the head of the queue
setPrev(buf, Address.zero());
if (head.EQ(HEAD_INITIAL_VALUE))
tail = buf;
else
setPrev(head, buf);
setNext(buf, head);
head = buf;
}
bufsenqueued++;
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(checkDequeLength(bufsenqueued));
unlock();
}
public final void clearDeque(int arity) {
Address buf = dequeue(arity);
while (!buf.isZero()) {
free(bufferStart(buf));
buf = dequeue(arity);
}
setCompletionFlag();
}
@Inline
final Address dequeue(int arity) {
return dequeue(arity, false);
}
final Address dequeue(int arity, boolean fromTail) {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
return dequeue(false, fromTail);
}
@Inline
final Address dequeueAndWait(int arity) {
return dequeueAndWait(arity, false);
}
final Address dequeueAndWait(int arity, boolean fromTail) {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(arity == this.arity);
Address buf = dequeue(false, fromTail);
if (buf.isZero() && (!complete())) {
buf = dequeue(true, fromTail); // Wait inside dequeue
}
return buf;
}
/**
* Prepare for parallel processing. All active GC threads will
* participate, and pop operations will block until all work
* is complete.
*/
public final void prepare() {
if (DISABLE_WAITING) {
prepareNonBlocking();
} else {
/* This should be the normal mode of operation once performance is fixed */
prepare(VM.collection.activeGCThreads());
}
}
/**
* Prepare for processing where pop operations on the deques
* will never block.
*/
public final void prepareNonBlocking() {
prepare(1);
}
/**
* Prepare for parallel processing where a specific number
* of threads take part.
*
* @param consumers # threads taking part.
*/
private void prepare(int consumers) {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
setNumConsumers(consumers);
clearCompletionFlag();
}
public final void reset() {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(numConsumersWaiting == 0);
clearCompletionFlag();
setNumConsumersWaiting(0);
assertExhausted();
}
public final void assertExhausted() {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero() && tail.isZero());
}
@Inline
final Address alloc() {
Address rtn = rps.acquire(PAGES_PER_BUFFER);
if (rtn.isZero()) {
Space.printUsageMB();
VM.assertions.fail("Failed to allocate space for queue. Is metadata virtual memory exhausted?");
}
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(rtn.EQ(bufferStart(rtn)));
return rtn;
}
@Inline
final void free(Address buf) {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(buf.EQ(bufferStart(buf)) && !buf.isZero());
rps.release(buf);
}
@Inline
public final int enqueuedPages() {
return (int) (bufsenqueued * PAGES_PER_BUFFER);
}
/****************************************************************************
*
* Private instance methods and fields
*/
/** The name of this shared deque - for diagnostics */
private final String name;
/** Raw page space from which to allocate */
private RawPageSpace rps;
/** Number of words per entry */
private final int arity;
/** Completion flag - set when all consumers have arrived at the barrier */
@Entrypoint
private volatile int completionFlag;
/** # active threads - processing is complete when # waiting == this */
@Entrypoint
private volatile int numConsumers;
/** # threads waiting */
@Entrypoint
private volatile int numConsumersWaiting;
/** Head of the shared deque */
@Entrypoint
protected volatile Address head;
/** Tail of the shared deque */
@Entrypoint
protected volatile Address tail;
@Entrypoint
private volatile int bufsenqueued;
private Lock lock;
private static final long WARN_PERIOD = (long)(2*1E9);
private static final long TIMEOUT_PERIOD = 10 * WARN_PERIOD;
/**
* Dequeue a block from the shared pool. If 'waiting' is true, and the
* queue is empty, wait for either a new block to show up or all the
* other consumers to join us.
*
* @param waiting
* @param fromTail
* @return the Address of the block
*/
private Address dequeue(boolean waiting, boolean fromTail) {
lock();
Address rtn = ((fromTail) ? tail : head);
if (rtn.isZero()) {
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero() && head.isZero());
// no buffers available
if (waiting) {
int ordinal = TRACE ? 0 : VM.activePlan.collector().getId();
setNumConsumersWaiting(numConsumersWaiting + 1);
while (rtn.isZero()) {
if (numConsumersWaiting == numConsumers)
setCompletionFlag();
if (TRACE) {
Log.write("-- ("); Log.write(ordinal);
Log.write(") joining wait queue of SharedDeque(");
Log.write(name); Log.write(") ");
Log.write(numConsumersWaiting); Log.write("/");
Log.write(numConsumers);
Log.write(" consumers waiting");
if (complete()) Log.write(" WAIT COMPLETE");
Log.writeln();
if (TRACE_BLOCKERS)
VM.assertions.dumpStack();
}
unlock();
// Spin and wait
spinWait(fromTail);
if (complete()) {
if (TRACE) {
Log.write("-- ("); Log.write(ordinal); Log.writeln(") EXITING");
}
lock();
setNumConsumersWaiting(numConsumersWaiting - 1);
unlock();
return Address.zero();
}
lock();
// Re-get the list head/tail while holding the lock
rtn = ((fromTail) ? tail : head);
}
setNumConsumersWaiting(numConsumersWaiting - 1);
if (TRACE) {
Log.write("-- ("); Log.write(ordinal); Log.write(") resuming work ");
Log.write(" n="); Log.writeln(numConsumersWaiting);
}
} else {
unlock();
return Address.zero();
}
}
if (fromTail) {
// dequeue the tail buffer
setTail(getPrev(tail));
if (head.EQ(rtn)) {
setHead(Address.zero());
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(tail.isZero());
} else {
setNext(tail, Address.zero());
}
} else {
// dequeue the head buffer
setHead(getNext(head));
if (tail.EQ(rtn)) {
setTail(Address.zero());
if (VM.VERIFY_ASSERTIONS) VM.assertions._assert(head.isZero());
} else {
setPrev(head, Address.zero());
}
}
bufsenqueued--;
unlock();
return rtn;
}
/**
* Spinwait for GC work to arrive
*
* @param fromTail Check the head or the tail ?
*/
private void spinWait(boolean fromTail) {
long startNano = 0;
long lastElapsedNano = 0;
while (true) {
long startCycles = VM.statistics.cycles();
long endCycles = startCycles + ((long) 1e9); // a few hundred milliseconds more or less.
long nowCycles;
do {
VM.memory.isync();
Address rtn = ((fromTail) ? tail : head);
if (!rtn.isZero() || complete()) return;
nowCycles = VM.statistics.cycles();
} while (startCycles < nowCycles && nowCycles < endCycles); /* check against both ends to guard against CPU migration */
/*
* According to the cycle counter, we've been spinning for a while.
* Time to check nanoTime and see if we should print a warning and/or fail.
* We lock the deque while doing this to avoid interleaved messages from multiple threads.
*/
lock();
if (startNano == 0) {
startNano = VM.statistics.nanoTime();
} else {
long nowNano = VM.statistics.nanoTime();
long elapsedNano = nowNano - startNano;
if (elapsedNano - lastElapsedNano > WARN_PERIOD) {
Log.write("GC Warning: SharedDeque("); Log.write(name);
Log.write(") wait has reached "); Log.write(VM.statistics.nanosToSecs(elapsedNano));
Log.write(", "); Log.write(numConsumersWaiting); Log.write("/");
Log.write(numConsumers); Log.writeln(" threads waiting");
lastElapsedNano = elapsedNano;
}
if (elapsedNano > TIMEOUT_PERIOD) {
unlock(); // To allow other GC threads to die in turn
VM.assertions.fail("GC Error: SharedDeque Timeout");
}
}
unlock();
}
}
/**
* Set the "next" pointer in a buffer forming the linked buffer chain.
*
* @param buf The buffer whose next field is to be set.
* @param next The reference to which next should point.
*/
private static void setNext(Address buf, Address next) {
buf.store(next, NEXT_OFFSET);
}
/**
* Get the "next" pointer in a buffer forming the linked buffer chain.
*
* @param buf The buffer whose next field is to be returned.
* @return The next field for this buffer.
*/
protected final Address getNext(Address buf) {
return buf.loadAddress(NEXT_OFFSET);
}
/**
* Set the "prev" pointer in a buffer forming the linked buffer chain.
*
* @param buf The buffer whose next field is to be set.
* @param prev The reference to which prev should point.
*/
private void setPrev(Address buf, Address prev) {
buf.store(prev, PREV_OFFSET);
}
/**
* Get the "next" pointer in a buffer forming the linked buffer chain.
*
* @param buf The buffer whose next field is to be returned.
* @return The next field for this buffer.
*/
protected final Address getPrev(Address buf) {
return buf.loadAddress(PREV_OFFSET);
}
/**
* Check the number of buffers in the work queue (for debugging
* purposes).
*
* @param length The number of buffers believed to be in the queue.
* @return True if the length of the queue matches length.
*/
private boolean checkDequeLength(int length) {
Address top = head;
int l = 0;
while (!top.isZero() && l <= length) {
top = getNext(top);
l++;
}
return l == length;
}
/**
* Lock this shared queue. We use one simple low-level lock to
* synchronize access to the shared queue of buffers.
*/
private void lock() {
lock.acquire();
}
/**
* Release the lock. We use one simple low-level lock to synchronize
* access to the shared queue of buffers.
*/
private void unlock() {
lock.release();
}
/**
* Is the current round of processing complete ?
*/
private boolean complete() {
return completionFlag == 1;
}
/**
* Set the completion flag.
*/
@Inline
private void setCompletionFlag() {
if (TRACE_DETAIL) {
Log.writeln("# setCompletionFlag: ");
}
completionFlag = 1;
}
/**
* Clear the completion flag.
*/
@Inline
private void clearCompletionFlag() {
if (TRACE_DETAIL) {
Log.writeln("# clearCompletionFlag: ");
}
completionFlag = 0;
}
@Inline
private void setNumConsumers(int newNumConsumers) {
if (TRACE_DETAIL) {
Log.write("# Num consumers "); Log.writeln(newNumConsumers);
}
numConsumers = newNumConsumers;
}
@Inline
private void setNumConsumersWaiting(int newNCW) {
if (TRACE_DETAIL) {
Log.write("# Num consumers waiting "); Log.writeln(newNCW);
}
numConsumersWaiting = newNCW;
}
@Inline
private void setHead(Address newHead) {
head = newHead;
VM.memory.sync();
}
@Inline
private void setTail(Address newTail) {
tail = newTail;
VM.memory.sync();
}
}