[freenet-cvs] r20189 - in branches/db4o/freenet/src/freenet: client client/async clients/http/bookmark node
toad at freenetproject.org
toad at freenetproject.org
Tue Jun 3 20:49:21 UTC 2008
Author: toad
Date: 2008-06-03 20:49:20 +0000 (Tue, 03 Jun 2008)
New Revision: 20189
Modified:
branches/db4o/freenet/src/freenet/client/FECCodec.java
branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
branches/db4o/freenet/src/freenet/client/async/ClientPutState.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java
branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java
branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java
branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
branches/db4o/freenet/src/freenet/client/async/USKCallback.java
branches/db4o/freenet/src/freenet/client/async/USKChecker.java
branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java
branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java
branches/db4o/freenet/src/freenet/client/async/USKInserter.java
branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java
branches/db4o/freenet/src/freenet/node/BaseSendableGet.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
branches/db4o/freenet/src/freenet/node/RequestScheduler.java
branches/db4o/freenet/src/freenet/node/SendableGet.java
branches/db4o/freenet/src/freenet/node/SendableInsert.java
branches/db4o/freenet/src/freenet/node/SendableRequest.java
branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
Log:
[21:22] * toad_ commits his bomb before it gets any bigger... doesn't compile? who cares! :)
Pass in an ObjectContainer to almost everything doing anything in the client layer. Obviously it will be null for nonpersistent requests.
Add callFailure and callSuccess to RequestScheduler for calling callbacks on the database thread (from e.g. the request starter thread).
Add a dedicated datastore checker thread (also prioritised but serial). Refactor register() for SendableGet's significantly. 3 phase register: on db thread, add pending keys and add RegisterMe. on store checker thread, check everything. then on db thread, add to queue and remove RegisterMe. Obviously it is simpler for nonpersistent requests (the store check is still on the db thread though).
- The code in SchedCore dealing with this (e.g. on startup) is rewritten somewhat.
Get rid of splitfilefetcher.scheduleOffThread. (Given the above, not needed; the one-thread-per-priority scheduler threads will go away soon).
Remove some cooldown error checking which hopefully isn't needed now. :( Can't do that check on the request starter thread.
Move addPendingKeys() to ClientRequestSchedulerBase.
Check whether on the database thread in various places.
Get rid of SimpleSingleFileFetcher.onFailure(FetchException, RequestScheduler). (trivial, use the other one)
Comments/javadocs.
This should probably have been several commits. Of course each one would have been incomplete, so is this one...
Modified: branches/db4o/freenet/src/freenet/client/FECCodec.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECCodec.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/FECCodec.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -8,6 +8,7 @@
import java.io.OutputStream;
import java.util.LinkedList;
+import com.db4o.ObjectContainer;
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.util.Buffer;
@@ -478,8 +479,8 @@
*/
public interface StandardOnionFECCodecEncoderCallback {
- public void onEncodedSegment();
+ public void onEncodedSegment(ObjectContainer container);
- public void onDecodedSegment();
+ public void onDecodedSegment(ObjectContainer container);
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/BaseSingleFileFetcher.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.keys.ClientKey;
import freenet.keys.ClientSSK;
@@ -36,24 +38,24 @@
cooldownWakeupTime = -1;
}
- public Object[] allKeys() {
+ public Object[] allKeys(ObjectContainer container) {
return keys;
}
- public Object[] sendableKeys() {
+ public Object[] sendableKeys(ObjectContainer container) {
return keys;
}
- public Object chooseKey(KeysFetchingLocally fetching) {
+ public Object chooseKey(KeysFetchingLocally fetching, ObjectContainer container) {
if(fetching.hasKey(key.getNodeKey())) return null;
return keys[0];
}
- public boolean hasValidKeys(KeysFetchingLocally fetching) {
+ public boolean hasValidKeys(KeysFetchingLocally fetching, ObjectContainer container) {
return !fetching.hasKey(key.getNodeKey());
}
- public ClientKey getKey(Object token) {
+ public ClientKey getKey(Object token, ObjectContainer container) {
return key;
}
@@ -67,7 +69,7 @@
/** Try again - returns true if we can retry
* @param sched */
- protected boolean retry(RequestScheduler sched) {
+ protected boolean retry(RequestScheduler sched, ObjectContainer container) {
retryCount++;
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Attempting to retry... (max "+maxRetries+", current "+retryCount+ ')');
@@ -82,7 +84,7 @@
cooldownWakeupTime = sched.queueCooldown(key, this);
return true; // We will retry, just not yet. See requeueAfterCooldown(Key).
} else {
- schedule();
+ schedule(container);
}
return true;
}
@@ -105,7 +107,7 @@
return ctx.ignoreStore;
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
synchronized(this) {
cancelled = true;
}
@@ -133,7 +135,7 @@
return true;
}
- public void onGotKey(Key key, KeyBlock block, RequestScheduler sched) {
+ public void onGotKey(Key key, KeyBlock block, RequestScheduler sched, ObjectContainer container) {
synchronized(this) {
if(isCancelled()) return;
if(!key.equals(this.key.getNodeKey())) {
@@ -142,7 +144,7 @@
}
}
try {
- onSuccess(Key.createKeyBlock(this.key, block), false, null, sched);
+ onSuccess(Key.createKeyBlock(this.key, block), false, null, sched, container);
} catch (KeyVerifyException e) {
Logger.error(this, "onGotKey("+key+","+block+") got "+e+" for "+this, e);
// FIXME if we get rid of the direct route this must call onFailure()
@@ -150,19 +152,19 @@
}
- public long getCooldownWakeup(Object token) {
+ public long getCooldownWakeup(Object token, ObjectContainer container) {
return cooldownWakeupTime;
}
- public long getCooldownWakeupByKey(Key key) {
+ public long getCooldownWakeupByKey(Key key, ObjectContainer container) {
return cooldownWakeupTime;
}
- public synchronized void resetCooldownTimes() {
+ public synchronized void resetCooldownTimes(ObjectContainer container) {
cooldownWakeupTime = -1;
}
- public void requeueAfterCooldown(Key key, long time) {
+ public void requeueAfterCooldown(Key key, long time, ObjectContainer container) {
if(cooldownWakeupTime > time) {
if(Logger.shouldLog(Logger.MINOR, this)) Logger.minor(this, "Not requeueing as deadline has not passed yet");
return;
@@ -173,7 +175,7 @@
}
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+" for "+this);
- schedule();
+ schedule(container);
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientGetState.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientGetState.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientGetState.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,15 +3,17 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
/**
* A ClientGetState.
* Represents a stage in the fetch process.
*/
public interface ClientGetState {
- public void schedule();
+ public void schedule(ObjectContainer container);
- public void cancel();
+ public void cancel(ObjectContainer container);
public long getToken();
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientPutState.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientPutState.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientPutState.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertException;
import freenet.support.SimpleFieldSet;
@@ -17,10 +19,10 @@
public abstract BaseClientPutter getParent();
/** Cancel the request. */
- public abstract void cancel();
+ public abstract void cancel(ObjectContainer container);
/** Schedule the request. */
- public abstract void schedule() throws InsertException;
+ public abstract void schedule(ObjectContainer container) throws InsertException;
/**
* Get the token, an object which is passed around with the insert and may be
Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -19,6 +19,7 @@
import freenet.node.BaseSendableGet;
import freenet.node.KeysFetchingLocally;
import freenet.node.LowLevelGetException;
+import freenet.node.LowLevelPutException;
import freenet.node.Node;
import freenet.node.NodeClientCore;
import freenet.node.RequestScheduler;
@@ -98,6 +99,7 @@
private final CooldownQueue transientCooldownQueue;
private final CooldownQueue persistentCooldownQueue;
final PrioritizedSerialExecutor databaseExecutor;
+ final PrioritizedSerialExecutor datastoreCheckerExecutor;
public static final String PRIORITY_NONE = "NONE";
public static final String PRIORITY_SOFT = "SOFT";
@@ -112,6 +114,7 @@
schedCore.start();
persistentCooldownQueue = schedCore.persistentCooldownQueue;
this.databaseExecutor = core.clientDatabaseExecutor;
+ this.datastoreCheckerExecutor = core.datastoreCheckerExecutor;
this.starter = starter;
this.random = random;
this.node = node;
@@ -158,81 +161,159 @@
if(isInsertScheduler != (req instanceof SendableInsert))
throw new IllegalArgumentException("Expected a SendableInsert: "+req);
if(req instanceof SendableGet) {
- SendableGet getter = (SendableGet)req;
- if(!getter.ignoreStore()) {
- boolean anyValid = false;
- Object[] keyTokens = getter.sendableKeys();
- for(int i=0;i<keyTokens.length;i++) {
- Object tok = keyTokens[i];
- ClientKeyBlock block = null;
- try {
- ClientKey key = getter.getKey(tok);
- if(key == null) {
- if(logMINOR)
- Logger.minor(this, "No key for "+tok+" for "+getter+" - already finished?");
- continue;
- } else {
- if(getter.getContext().blocks != null)
- block = getter.getContext().blocks.get(key);
- if(block == null)
- block = node.fetchKey(key, getter.dontCache());
- if(block == null) {
- if(!persistent) {
- schedTransient.addPendingKey(key, getter);
- } // If persistent, when it is registered (in a later job) the keys will be added first.
- } else {
- if(logMINOR)
- Logger.minor(this, "Got "+block);
+ final SendableGet getter = (SendableGet)req;
+
+ if(persistent && onDatabaseThread) {
+ schedCore.addPendingKeys(getter, selectorContainer);
+ schedCore.queueRegister(getter, databaseExecutor);
+ final Object[] keyTokens = getter.sendableKeys(selectorContainer);
+ final ClientKey[] keys = new ClientKey[keyTokens.length];
+ for(int i=0;i<keyTokens.length;i++)
+ keys[i] = getter.getKey(keyTokens[i], selectorContainer);
+ datastoreCheckerExecutor.execute(new Runnable() {
+
+ public void run() {
+ registerCheckStore(getter, true, keyTokens, keys);
+ }
+
+ }, getter.getPriorityClass(), "Checking datastore");
+ } else if(persistent) {
+ databaseExecutor.execute(new Runnable() {
+
+ public void run() {
+ schedCore.addPendingKeys(getter, selectorContainer);
+ schedCore.queueRegister(getter, databaseExecutor);
+ final Object[] keyTokens = getter.sendableKeys(selectorContainer);
+ final ClientKey[] keys = new ClientKey[keyTokens.length];
+ for(int i=0;i<keyTokens.length;i++)
+ keys[i] = getter.getKey(keyTokens[i], selectorContainer);
+ datastoreCheckerExecutor.execute(new Runnable() {
+
+ public void run() {
+ registerCheckStore(getter, true, keyTokens, keys);
}
- }
- } catch (KeyVerifyException e) {
- // Verify exception, probably bogus at source;
- // verifies at low-level, but not at decode.
- if(logMINOR)
- Logger.minor(this, "Decode failed: "+e, e);
- if(onDatabaseThread)
- getter.onFailure(new LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok, this);
- else {
- final SendableGet g = getter;
- final Object token = tok;
- databaseExecutor.execute(new Runnable() {
- public void run() {
- g.onFailure(new LowLevelGetException(LowLevelGetException.DECODE_FAILED), token, ClientRequestScheduler.this);
- }
- }, NativeThread.NORM_PRIORITY, "Block decode failed");
- }
- continue; // other keys might be valid
+
+ }, getter.getPriorityClass(), "Checking datastore");
}
- if(block != null) {
- if(logMINOR) Logger.minor(this, "Can fulfill "+req+" ("+tok+") immediately from store");
- getter.onSuccess(block, true, tok, this);
- // Even with working thread priorities, we still get very high latency accessing
- // the datastore when background threads are doing it in parallel.
- // So yield() here, unless priority is very high.
- if(req.getPriorityClass() > RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS)
- Thread.yield();
+
+ }, NativeThread.NORM_PRIORITY, "Registering request");
+ } else {
+ // Not persistent
+ schedTransient.addPendingKeys(getter, null);
+ // Check the store off-thread anyway.
+ final Object[] keyTokens = getter.sendableKeys(null);
+ final ClientKey[] keys = new ClientKey[keyTokens.length];
+ for(int i=0;i<keyTokens.length;i++)
+ keys[i] = getter.getKey(keyTokens[i], null);
+ datastoreCheckerExecutor.execute(new Runnable() {
+
+ public void run() {
+ registerCheckStore(getter, false, keyTokens, keys);
+ }
+
+ }, getter.getPriorityClass(), "Checking datastore");
+ }
+ } else {
+ finishRegister(req, persistent, onDatabaseThread);
+ }
+ }
+
+ /**
+ * Check the store for all the keys on the SendableGet. By now the pendingKeys will have
+ * been set up, and this is run on the datastore checker thread. Once completed, this should
+ * (for a persistent request) queue a job on the databaseExecutor and (for a transient
+ * request) finish registering the request immediately.
+ * @param getter
+ */
+ protected void registerCheckStore(SendableGet getter, boolean persistent, Object[] keyTokens, ClientKey[] keys) {
+ boolean anyValid = false;
+ for(int i=0;i<keyTokens.length;i++) {
+ Object tok = keyTokens[i];
+ ClientKeyBlock block = null;
+ try {
+ ClientKey key = keys[i];
+ if(key == null) {
+ if(logMINOR)
+ Logger.minor(this, "No key for "+tok+" for "+getter+" - already finished?");
+ continue;
+ } else {
+ if(getter.getContext().blocks != null)
+ block = getter.getContext().blocks.get(key);
+ if(block == null)
+ block = node.fetchKey(key, getter.dontCache());
+ if(block == null) {
+ if(!persistent) {
+ schedTransient.addPendingKey(key, getter);
+ } // If persistent, when it is registered (in a later job) the keys will be added first.
} else {
- anyValid = true;
+ if(logMINOR)
+ Logger.minor(this, "Got "+block);
}
}
- if(!anyValid) {
- if(logMINOR)
- Logger.minor(this, "No valid keys, returning without registering for "+req);
- return;
+ } catch (KeyVerifyException e) {
+ // Verify exception, probably bogus at source;
+ // verifies at low-level, but not at decode.
+ if(logMINOR)
+ Logger.minor(this, "Decode failed: "+e, e);
+ if(!persistent)
+ getter.onFailure(new LowLevelGetException(LowLevelGetException.DECODE_FAILED), tok, this, persistent ? selectorContainer : null);
+ else {
+ final SendableGet g = getter;
+ final Object token = tok;
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ g.onFailure(new LowLevelGetException(LowLevelGetException.DECODE_FAILED), token, ClientRequestScheduler.this, selectorContainer);
+ selectorContainer.commit();
+ }
+ }, NativeThread.NORM_PRIORITY, "Block decode failed");
}
+ continue; // other keys might be valid
}
+ if(block != null) {
+ if(logMINOR) Logger.minor(this, "Can fulfill "+getter+" ("+tok+") immediately from store");
+ if(!persistent)
+ getter.onSuccess(block, true, tok, this, persistent ? selectorContainer : null);
+ else {
+ final ClientKeyBlock b = block;
+ final Object t = tok;
+ final SendableGet g = getter;
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ g.onSuccess(b, true, t, ClientRequestScheduler.this, selectorContainer);
+ }
+ }, NativeThread.NORM_PRIORITY, "Block found on register");
+ }
+ // Even with working thread priorities, we still get very high latency accessing
+ // the datastore when background threads are doing it in parallel.
+ // So yield() here, unless priority is very high.
+ if(getter.getPriorityClass() > RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS)
+ Thread.yield();
+ } else {
+ anyValid = true;
+ }
}
+ if(!anyValid) {
+ if(logMINOR)
+ Logger.minor(this, "No valid keys, returning without registering for "+getter);
+ return;
+ }
+ finishRegister(getter, persistent, false);
+ }
+
+ private void finishRegister(final SendableRequest req, boolean persistent, boolean onDatabaseThread) {
if(persistent) {
// Add to the persistent registration queue
if(onDatabaseThread) {
if(!databaseExecutor.onThread()) {
throw new IllegalStateException("Not on database thread!");
}
- schedCore.queueRegister(req, databaseExecutor);
+ schedCore.innerRegister(req, random);
+ schedCore.deleteRegisterMe(req);
} else {
databaseExecutor.execute(new Runnable() {
public void run() {
- schedCore.queueRegister(req, databaseExecutor);
+ schedCore.innerRegister(req, random);
+ schedCore.deleteRegisterMe(req);
selectorContainer.commit();
}
}, NativeThread.NORM_PRIORITY, "Add persistent job to queue");
@@ -326,14 +407,14 @@
offeredKeys[i].remove(key);
}
if(transientCooldownQueue != null)
- transientCooldownQueue.removeKey(key, getter, getter.getCooldownWakeupByKey(key), null);
+ transientCooldownQueue.removeKey(key, getter, getter.getCooldownWakeupByKey(key, null), null);
} else {
databaseExecutor.execute(new Runnable() {
public void run() {
try {
schedCore.removePendingKey(getter, complain, key);
if(persistentCooldownQueue != null)
- persistentCooldownQueue.removeKey(key, getter, getter.getCooldownWakeupByKey(key), selectorContainer);
+ persistentCooldownQueue.removeKey(key, getter, getter.getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
selectorContainer.commit();
} catch (Throwable t) {
Logger.error(this, "Caught "+t, t);
@@ -351,11 +432,19 @@
* @param complain
*/
public void removePendingKeys(SendableGet getter, boolean complain) {
- // FIXME should this be a single databaseExecutor thread??
- Object[] keyTokens = getter.allKeys();
+ ObjectContainer container;
+ if(getter.persistent()) {
+ container = selectorContainer;
+ if(!databaseExecutor.onThread()) {
+ throw new IllegalStateException("Not on database thread!");
+ }
+ } else {
+ container = null;
+ }
+ Object[] keyTokens = getter.allKeys(container);
for(int i=0;i<keyTokens.length;i++) {
Object tok = keyTokens[i];
- ClientKey ckey = getter.getKey(tok);
+ ClientKey ckey = getter.getKey(tok, container);
if(ckey == null) {
if(complain)
Logger.error(this, "Key "+tok+" is null for "+getter, new Exception("debug"));
@@ -422,7 +511,7 @@
for(int i=0;i<transientGets.length;i++) {
try {
if(logMINOR) Logger.minor(this, "Calling callback for "+transientGets[i]+" for "+key);
- transientGets[i].onGotKey(key, block, ClientRequestScheduler.this);
+ transientGets[i].onGotKey(key, block, ClientRequestScheduler.this, null);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" running callback "+transientGets[i]+" for "+key);
}
@@ -431,7 +520,7 @@
}, "Running off-thread callbacks for "+block.getKey());
if(transientCooldownQueue != null) {
for(int i=0;i<transientGets.length;i++)
- transientCooldownQueue.removeKey(key, transientGets[i], transientGets[i].getCooldownWakeupByKey(key), null);
+ transientCooldownQueue.removeKey(key, transientGets[i], transientGets[i].getCooldownWakeupByKey(key, null), null);
}
// Now the persistent stuff
@@ -443,7 +532,7 @@
if(gets == null) return;
if(persistentCooldownQueue != null) {
for(int i=0;i<gets.length;i++)
- persistentCooldownQueue.removeKey(key, gets[i], gets[i].getCooldownWakeupByKey(key), selectorContainer);
+ persistentCooldownQueue.removeKey(key, gets[i], gets[i].getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
}
// Call the callbacks on the database executor thread, because the first thing
// they will need to do is access the database to decide whether they need to
@@ -451,7 +540,7 @@
for(int i=0;i<gets.length;i++) {
try {
if(logMINOR) Logger.minor(this, "Calling callback for "+gets[i]+" for "+key);
- gets[i].onGotKey(key, block, ClientRequestScheduler.this);
+ gets[i].onGotKey(key, block, ClientRequestScheduler.this, selectorContainer);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" running callback "+gets[i]+" for "+key);
}
@@ -544,10 +633,10 @@
} else {
if(gets != null)
for(int i=0;i<gets.length;i++)
- gets[i].requeueAfterCooldown(key, now);
+ gets[i].requeueAfterCooldown(key, now, container);
if(transientGets != null)
for(int i=0;i<transientGets.length;i++)
- transientGets[i].requeueAfterCooldown(key, now);
+ transientGets[i].requeueAfterCooldown(key, now, container);
}
}
if(keys.length < MAX_KEYS) return found;
@@ -574,10 +663,28 @@
public void callFailure(final SendableGet get, final LowLevelGetException e, final Object keyNum, int prio, String name) {
databaseExecutor.execute(new Runnable() {
public void run() {
- get.onFailure(e, keyNum, ClientRequestScheduler.this);
+ get.onFailure(e, keyNum, ClientRequestScheduler.this, selectorContainer);
selectorContainer.commit();
}
}, prio, name);
}
+
+ public void callFailure(final SendableInsert put, final LowLevelPutException e, final Object keyNum, int prio, String name) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ put.onFailure(e, keyNum, selectorContainer);
+ selectorContainer.commit();
+ }
+ }, prio, name);
+ }
+ public void callSuccess(final SendableInsert put, final Object keyNum, int prio, String name) {
+ databaseExecutor.execute(new Runnable() {
+ public void run() {
+ put.onSuccess(keyNum, selectorContainer);
+ selectorContainer.commit();
+ }
+ }, prio, name);
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -327,4 +327,19 @@
}
}
+ public void addPendingKeys(SendableGet getter, ObjectContainer container) {
+ Object[] keyTokens = getter.sendableKeys(container);
+ for(int i=0;i<keyTokens.length;i++) {
+ Object tok = keyTokens[i];
+ ClientKey key = getter.getKey(tok, container);
+ if(key == null) {
+ if(logMINOR)
+ Logger.minor(this, "No key for "+tok+" for "+getter+" - already finished?");
+ continue;
+ } else {
+ addPendingKey(key, getter);
+ }
+ }
+ }
+
}
Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -178,7 +178,7 @@
// The worry is ... is there any nested locking outside of the hierarchy?
ChosenRequest removeFirst(int fuzz, RandomSource random, OfferedKeysList[] offeredKeys, RequestStarter starter, ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, short maxPrio, int retryCount) {
SendableRequest req = removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount);
- Object token = req.chooseKey(this);
+ Object token = req.chooseKey(this, req.persistent() ? container : null);
if(token == null) {
return null;
} else {
@@ -186,7 +186,7 @@
if(isInsertScheduler)
key = null;
else
- key = ((BaseSendableGet)req).getNodeKey(token);
+ key = ((BaseSendableGet)req).getNodeKey(token, persistent() ? container : null);
PersistentChosenRequest ret = new PersistentChosenRequest(this, req, token, key);
if(req.persistent())
container.set(ret);
@@ -213,7 +213,7 @@
for(;choosenPriorityClass <= RequestStarter.MINIMUM_PRIORITY_CLASS;choosenPriorityClass++) {
if(logMINOR) Logger.minor(this, "Using priority "+choosenPriorityClass);
if(tryOfferedKeys) {
- if(offeredKeys[choosenPriorityClass].hasValidKeys(this))
+ if(offeredKeys[choosenPriorityClass].hasValidKeys(this, null))
return offeredKeys[choosenPriorityClass];
}
SortedVectorByNumber perm = null;
@@ -414,21 +414,16 @@
query.descend("priority").orderAscending();
query.descend("addedTime").orderAscending();
ObjectSet result = query.execute();
- if(result.hasNext()) {
+ while(result.hasNext()) {
RegisterMe reg = (RegisterMe) result.next();
- if(result.hasNext()) {
- databaseExecutor.execute(registerMeRunner, NativeThread.NORM_PRIORITY, "Register request");
- }
container.delete(reg);
// Don't need to activate, fields should exist? FIXME
try {
- if(reg.getter instanceof SendableGet)
- addPendingKeys((SendableGet) reg.getter);
- innerRegister(reg.getter, random);
+ sched.register(reg.getter, true);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" running RegisterMeRunner", t);
// Cancel the request, and commit so it isn't tried again.
- reg.getter.internalError(null, t, sched);
+ reg.getter.internalError(null, t, sched, container);
}
container.commit();
}
@@ -441,24 +436,22 @@
}
RegisterMe reg = new RegisterMe(req, this);
container.set(reg);
- databaseExecutor.execute(registerMeRunner, NativeThread.NORM_PRIORITY, "Register request");
}
- public void addPendingKeys(SendableGet getter) {
- Object[] keyTokens = getter.sendableKeys();
- for(int i=0;i<keyTokens.length;i++) {
- Object tok = keyTokens[i];
- ClientKey key = getter.getKey(tok);
- if(key == null) {
- if(logMINOR)
- Logger.minor(this, "No key for "+tok+" for "+getter+" - already finished?");
- continue;
- } else {
- addPendingKey(key, getter);
+ public void deleteRegisterMe(final SendableRequest req) {
+ ObjectSet result = container.query(new Predicate() {
+ public boolean match(RegisterMe reg) {
+ if(reg.core != ClientRequestSchedulerCore.this) return false;
+ if(reg.getter != req) return false;
+ return true;
}
+ });
+ while(result.hasNext()) {
+ RegisterMe me = (RegisterMe) result.next();
+ container.delete(me);
}
}
-
+
public boolean hasKey(Key key) {
synchronized(keysFetching) {
return keysFetching.contains(key);
@@ -485,7 +478,7 @@
}
}, NativeThread.NORM_PRIORITY, "Remove fetching key");
}
-
+
}
class RegisterMe {
Modified: branches/db4o/freenet/src/freenet/client/async/ClientRequester.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequester.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequester.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.keys.FreenetURI;
import freenet.node.RequestClient;
import freenet.support.Logger;
@@ -14,7 +16,7 @@
*/
public abstract class ClientRequester {
- public abstract void onTransition(ClientGetState oldState, ClientGetState newState);
+ public abstract void onTransition(ClientGetState oldState, ClientGetState newState, ObjectContainer container);
// FIXME move the priority classes from RequestStarter here
protected short priorityClass;
Modified: branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/GetCompletionCallback.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchException;
import freenet.client.FetchResult;
@@ -12,21 +14,21 @@
*/
public interface GetCompletionCallback {
- public void onSuccess(FetchResult result, ClientGetState state);
+ public void onSuccess(FetchResult result, ClientGetState state, ObjectContainer container);
- public void onFailure(FetchException e, ClientGetState state);
+ public void onFailure(FetchException e, ClientGetState state, ObjectContainer container);
/** Called when the ClientGetState knows that it knows about
* all the blocks it will need to fetch.
*/
- public void onBlockSetFinished(ClientGetState state);
+ public void onBlockSetFinished(ClientGetState state, ObjectContainer container);
- public void onTransition(ClientGetState oldState, ClientGetState newState);
+ public void onTransition(ClientGetState oldState, ClientGetState newState, ObjectContainer container);
- public void onExpectedSize(long size);
+ public void onExpectedSize(long size, ObjectContainer container);
- public void onExpectedMIME(String mime);
+ public void onExpectedMIME(String mime, ObjectContainer container);
- public void onFinalizedMetadata();
+ public void onFinalizedMetadata(ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/OfferedKeysList.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -6,6 +6,8 @@
import java.util.HashSet;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.crypt.RandomSource;
import freenet.keys.Key;
import freenet.node.BaseSendableGet;
@@ -63,17 +65,17 @@
return keys.isEmpty();
}
- public Object[] allKeys() {
+ public Object[] allKeys(ObjectContainer container) {
// Not supported.
throw new UnsupportedOperationException();
}
- public Object[] sendableKeys() {
+ public Object[] sendableKeys(ObjectContainer container) {
// Not supported.
throw new UnsupportedOperationException();
}
- public synchronized Object chooseKey(KeysFetchingLocally fetching) {
+ public synchronized Object chooseKey(KeysFetchingLocally fetching, ObjectContainer container) {
assert(keysList.size() == keys.size());
if(keys.size() == 1) {
// Shortcut the common case
@@ -99,7 +101,7 @@
return null;
}
- public synchronized boolean hasValidKeys(KeysFetchingLocally fetching) {
+ public synchronized boolean hasValidKeys(KeysFetchingLocally fetching, ObjectContainer container) {
assert(keysList.size() == keys.size());
if(keys.size() == 1) {
// Shortcut the common case
@@ -135,7 +137,7 @@
return 0; // All keys have equal chance even if they've been tried before.
}
- public void internalError(Object keyNum, Throwable t, RequestScheduler sched) {
+ public void internalError(Object keyNum, Throwable t, RequestScheduler sched, ObjectContainer container) {
Logger.error(this, "Internal error: "+t, t);
}
@@ -172,7 +174,7 @@
assert(keysList.size() == keys.size());
}
- public Key getNodeKey(Object token) {
+ public Key getNodeKey(Object token, ObjectContainer container) {
return (Key) token;
}
Modified: branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/PutCompletionCallback.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -1,5 +1,7 @@
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertException;
import freenet.client.Metadata;
import freenet.keys.BaseClientKey;
@@ -9,29 +11,29 @@
*/
public interface PutCompletionCallback {
- public void onSuccess(ClientPutState state);
+ public void onSuccess(ClientPutState state, ObjectContainer container);
- public void onFailure(InsertException e, ClientPutState state);
+ public void onFailure(InsertException e, ClientPutState state, ObjectContainer container);
- public void onEncode(BaseClientKey usk, ClientPutState state);
+ public void onEncode(BaseClientKey usk, ClientPutState state, ObjectContainer container);
- public void onTransition(ClientPutState oldState, ClientPutState newState);
+ public void onTransition(ClientPutState oldState, ClientPutState newState, ObjectContainer container);
/** Only called if explicitly asked for, in which case, generally
* the metadata won't be inserted. Won't be called if there isn't
* any!
*/
- public void onMetadata(Metadata m, ClientPutState state);
+ public void onMetadata(Metadata m, ClientPutState state, ObjectContainer container);
/** Called when enough data has been inserted that the file can be
* retrieved, even if not all data has been inserted yet. Note that this
* is only supported for splitfiles; if you get onSuccess() first, assume
* that onFetchable() isn't coming. */
- public void onFetchable(ClientPutState state);
+ public void onFetchable(ClientPutState state, ObjectContainer container);
/** Called when the ClientPutState knows that it knows about
* all the blocks it will need to put.
*/
- public void onBlockSetFinished(ClientPutState state);
+ public void onBlockSetFinished(ClientPutState state, ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleHealingQueue.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -5,6 +5,8 @@
import java.util.HashMap;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertContext;
import freenet.client.InsertException;
import freenet.client.Metadata;
@@ -49,7 +51,7 @@
runningInserters.put(data, sbi);
}
try {
- sbi.schedule();
+ sbi.schedule(null);
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Started healing insert "+ctr+" for "+data);
return true;
@@ -80,7 +82,7 @@
// Do nothing
}
- public void onSuccess(ClientPutState state) {
+ public void onSuccess(ClientPutState state, ObjectContainer container) {
SingleBlockInserter sbi = (SingleBlockInserter)state;
Bucket data = (Bucket) sbi.getToken();
synchronized(this) {
@@ -91,7 +93,7 @@
data.free();
}
- public void onFailure(InsertException e, ClientPutState state) {
+ public void onFailure(InsertException e, ClientPutState state, ObjectContainer container) {
SingleBlockInserter sbi = (SingleBlockInserter)state;
Bucket data = (Bucket) sbi.getToken();
synchronized(this) {
@@ -102,29 +104,29 @@
data.free();
}
- public void onEncode(BaseClientKey usk, ClientPutState state) {
+ public void onEncode(BaseClientKey usk, ClientPutState state, ObjectContainer container) {
// Ignore
}
- public void onTransition(ClientPutState oldState, ClientPutState newState) {
+ public void onTransition(ClientPutState oldState, ClientPutState newState, ObjectContainer container) {
// Should never happen
Logger.error(this, "impossible: onTransition on SimpleHealingQueue from "+oldState+" to "+newState, new Exception("debug"));
}
- public void onMetadata(Metadata m, ClientPutState state) {
+ public void onMetadata(Metadata m, ClientPutState state, ObjectContainer container) {
// Should never happen
Logger.error(this, "Got metadata on SimpleHealingQueue from "+state+": "+m, new Exception("debug"));
}
- public void onBlockSetFinished(ClientPutState state) {
+ public void onBlockSetFinished(ClientPutState state, ObjectContainer container) {
// Ignore
}
- public void onFetchable(ClientPutState state) {
+ public void onFetchable(ClientPutState state, ObjectContainer container) {
// Ignore
}
- public void onTransition(ClientGetState oldState, ClientGetState newState) {
+ public void onTransition(ClientGetState oldState, ClientGetState newState, ObjectContainer container) {
// Ignore
}
Modified: branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SimpleSingleFileFetcher.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -5,6 +5,8 @@
import java.io.IOException;
+import com.db4o.ObjectContainer;
+
import freenet.client.ClientMetadata;
import freenet.client.FetchContext;
import freenet.client.FetchException;
@@ -41,51 +43,47 @@
final long token;
// Translate it, then call the real onFailure
- public void onFailure(LowLevelGetException e, Object reqTokenIgnored, RequestScheduler sched) {
+ public void onFailure(LowLevelGetException e, Object reqTokenIgnored, RequestScheduler sched, ObjectContainer container) {
switch(e.code) {
case LowLevelGetException.DATA_NOT_FOUND:
- onFailure(new FetchException(FetchException.DATA_NOT_FOUND), sched);
+ onFailure(new FetchException(FetchException.DATA_NOT_FOUND), false, sched, container);
return;
case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- onFailure(new FetchException(FetchException.DATA_NOT_FOUND), sched);
+ onFailure(new FetchException(FetchException.DATA_NOT_FOUND), false, sched, container);
return;
case LowLevelGetException.RECENTLY_FAILED:
- onFailure(new FetchException(FetchException.RECENTLY_FAILED), sched);
+ onFailure(new FetchException(FetchException.RECENTLY_FAILED), false, sched, container);
return;
case LowLevelGetException.DECODE_FAILED:
- onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), sched);
+ onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), false, sched, container);
return;
case LowLevelGetException.INTERNAL_ERROR:
- onFailure(new FetchException(FetchException.INTERNAL_ERROR), sched);
+ onFailure(new FetchException(FetchException.INTERNAL_ERROR), false, sched, container);
return;
case LowLevelGetException.REJECTED_OVERLOAD:
- onFailure(new FetchException(FetchException.REJECTED_OVERLOAD), sched);
+ onFailure(new FetchException(FetchException.REJECTED_OVERLOAD), false, sched, container);
return;
case LowLevelGetException.ROUTE_NOT_FOUND:
- onFailure(new FetchException(FetchException.ROUTE_NOT_FOUND), sched);
+ onFailure(new FetchException(FetchException.ROUTE_NOT_FOUND), false, sched, container);
return;
case LowLevelGetException.TRANSFER_FAILED:
- onFailure(new FetchException(FetchException.TRANSFER_FAILED), sched);
+ onFailure(new FetchException(FetchException.TRANSFER_FAILED), false, sched, container);
return;
case LowLevelGetException.VERIFY_FAILED:
- onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), sched);
+ onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), false, sched, container);
return;
case LowLevelGetException.CANCELLED:
- onFailure(new FetchException(FetchException.CANCELLED), sched);
+ onFailure(new FetchException(FetchException.CANCELLED), false, sched, container);
return;
default:
Logger.error(this, "Unknown LowLevelGetException code: "+e.code);
- onFailure(new FetchException(FetchException.INTERNAL_ERROR), sched);
+ onFailure(new FetchException(FetchException.INTERNAL_ERROR), false, sched, container);
return;
}
}
- final void onFailure(FetchException e, RequestScheduler sched) {
- onFailure(e, false, sched);
- }
-
// Real onFailure
- protected void onFailure(FetchException e, boolean forceFatal, RequestScheduler sched) {
+ protected void onFailure(FetchException e, boolean forceFatal, RequestScheduler sched, ObjectContainer container) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "onFailure( "+e+" , "+forceFatal+")", e);
if(parent.isCancelled() || cancelled) {
@@ -94,7 +92,7 @@
forceFatal = true;
}
if(!(e.isFatal() || forceFatal) ) {
- if(retry(sched)) {
+ if(retry(sched, container)) {
if(logMINOR) Logger.minor(this, "Retrying");
return;
}
@@ -105,50 +103,50 @@
parent.fatallyFailedBlock();
else
parent.failedBlock();
- rcb.onFailure(e, this);
+ rcb.onFailure(e, this, container);
}
/** Will be overridden by SingleFileFetcher */
- protected void onSuccess(FetchResult data, RequestScheduler sched) {
+ protected void onSuccess(FetchResult data, RequestScheduler sched, ObjectContainer container) {
unregister(false);
if(parent.isCancelled()) {
data.asBucket().free();
- onFailure(new FetchException(FetchException.CANCELLED), sched);
+ onFailure(new FetchException(FetchException.CANCELLED), false, sched, container);
return;
}
- rcb.onSuccess(data, this);
+ rcb.onSuccess(data, this, container);
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object reqTokenIgnored, RequestScheduler sched) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object reqTokenIgnored, RequestScheduler sched, ObjectContainer container) {
if(parent instanceof ClientGetter)
((ClientGetter)parent).addKeyToBinaryBlob(block);
- Bucket data = extract(block, sched);
+ Bucket data = extract(block, sched, container);
if(data == null) return; // failed
if(!block.isMetadata()) {
- onSuccess(new FetchResult((ClientMetadata)null, data), sched);
+ onSuccess(new FetchResult((ClientMetadata)null, data), sched, container);
} else {
- onFailure(new FetchException(FetchException.INVALID_METADATA, "Metadata where expected data"), sched);
+ onFailure(new FetchException(FetchException.INVALID_METADATA, "Metadata where expected data"), false, sched, container);
}
}
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it via onFailure
* and return null.
*/
- protected Bucket extract(ClientKeyBlock block, RequestScheduler sched) {
+ protected Bucket extract(ClientKeyBlock block, RequestScheduler sched, ObjectContainer container) {
Bucket data;
try {
data = block.decode(ctx.bucketFactory, (int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
} catch (KeyDecodeException e1) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Decode failure: "+e1, e1);
- onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), sched);
+ onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), false, sched, container);
return null;
} catch (TooBigException e) {
- onFailure(new FetchException(FetchException.TOO_BIG, e), sched);
+ onFailure(new FetchException(FetchException.TOO_BIG, e), false, sched, container);
return null;
} catch (IOException e) {
Logger.error(this, "Could not capture data - disk full?: "+e, e);
- onFailure(new FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return null;
}
return data;
Modified: branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SingleBlockInserter.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.lang.ref.SoftReference;
import java.net.MalformedURLException;
+import com.db4o.ObjectContainer;
+
import freenet.client.FailureCodeTracker;
import freenet.client.InsertContext;
import freenet.client.InsertException;
@@ -25,6 +27,7 @@
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
import freenet.support.api.Bucket;
+import freenet.support.io.NativeThread;
/**
* Insert *ONE KEY*.
@@ -107,7 +110,7 @@
}
}
- protected ClientKeyBlock encode() throws InsertException {
+ protected ClientKeyBlock encode(ObjectContainer container) throws InsertException {
ClientKeyBlock block;
boolean shouldSend;
synchronized(this) {
@@ -122,7 +125,7 @@
resultingURI = block.getClientKey().getURI();
}
if(shouldSend && !dontSendEncoded)
- cb.onEncode(block.getClientKey(), this);
+ cb.onEncode(block.getClientKey(), this, container);
return block;
}
@@ -138,15 +141,15 @@
return retries;
}
- public void onFailure(LowLevelPutException e, Object keyNum) {
+ public void onFailure(LowLevelPutException e, Object keyNum, ObjectContainer container) {
if(parent.isCancelled()) {
- fail(new InsertException(InsertException.CANCELLED));
+ fail(new InsertException(InsertException.CANCELLED), container);
return;
}
switch(e.code) {
case LowLevelPutException.COLLISION:
- fail(new InsertException(InsertException.COLLISION));
+ fail(new InsertException(InsertException.COLLISION), container);
break;
case LowLevelPutException.INTERNAL_ERROR:
errors.inc(InsertException.INTERNAL_ERROR);
@@ -169,7 +172,7 @@
if(logMINOR) Logger.minor(this, "Consecutive RNFs: "+consecutiveRNFs+" / "+ctx.consecutiveRNFsCountAsSuccess);
if(consecutiveRNFs == ctx.consecutiveRNFsCountAsSuccess) {
if(logMINOR) Logger.minor(this, "Consecutive RNFs: "+consecutiveRNFs+" - counting as success");
- onSuccess(keyNum);
+ onSuccess(keyNum, container);
return;
}
} else
@@ -177,17 +180,17 @@
if(logMINOR) Logger.minor(this, "Failed: "+e);
retries++;
if((retries > ctx.maxInsertRetries) && (ctx.maxInsertRetries != -1)) {
- fail(InsertException.construct(errors));
+ fail(InsertException.construct(errors), container);
return;
}
getScheduler().register(this);
}
- private void fail(InsertException e) {
- fail(e, false);
+ private void fail(InsertException e, ObjectContainer container) {
+ fail(e, false, container);
}
- private void fail(InsertException e, boolean forceFatal) {
+ private void fail(InsertException e, boolean forceFatal, ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
@@ -196,7 +199,7 @@
parent.fatallyFailedBlock();
else
parent.failedBlock();
- cb.onFailure(e, this);
+ cb.onFailure(e, this, container);
}
public ClientKeyBlock getBlock() {
@@ -215,15 +218,15 @@
}
}
- public void schedule() throws InsertException {
+ public void schedule(ObjectContainer container) throws InsertException {
synchronized(this) {
if(finished) return;
}
if(getCHKOnly) {
- ClientKeyBlock block = encode();
- cb.onEncode(block.getClientKey(), this);
+ ClientKeyBlock block = encode(container);
+ cb.onEncode(block.getClientKey(), this, container);
parent.completedBlock(false);
- cb.onSuccess(this);
+ cb.onSuccess(this, container);
finished = true;
} else {
getScheduler().register(this);
@@ -255,30 +258,30 @@
return resultingURI;
}
- public void onSuccess(Object keyNum) {
+ public void onSuccess(Object keyNum, ObjectContainer container) {
if(logMINOR) Logger.minor(this, "Succeeded ("+this+"): "+token);
if(parent.isCancelled()) {
- fail(new InsertException(InsertException.CANCELLED));
+ fail(new InsertException(InsertException.CANCELLED), container);
return;
}
synchronized(this) {
finished = true;
}
parent.completedBlock(false);
- cb.onSuccess(this);
+ cb.onSuccess(this, container);
}
public BaseClientPutter getParent() {
return parent;
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
}
super.unregister(false);
- cb.onFailure(new InsertException(InsertException.CANCELLED), this);
+ cb.onFailure(new InsertException(InsertException.CANCELLED), this, container);
}
public synchronized boolean isEmpty() {
@@ -304,18 +307,18 @@
}
}
if(parent.isCancelled())
- fail(new InsertException(InsertException.CANCELLED));
+ fail(new InsertException(InsertException.CANCELLED), null);
else
- fail(new InsertException(InsertException.BUCKET_ERROR, "Empty block", null));
+ fail(new InsertException(InsertException.BUCKET_ERROR, "Empty block", null), null);
return false;
}
} catch (LowLevelPutException e) {
- onFailure(e, keyNum);
+ sched.callFailure((SendableInsert) this, e, keyNum, NativeThread.NORM_PRIORITY, "Insert failed");
if(logMINOR) Logger.minor(this, "Request failed: "+this+" for "+e);
return true;
}
if(logMINOR) Logger.minor(this, "Request succeeded: "+this);
- onSuccess(keyNum);
+ sched.callSuccess(this, keyNum, NativeThread.NORM_PRIORITY, "Insert succeeded");
return true;
}
@@ -352,18 +355,18 @@
return true;
}
- public synchronized Object[] sendableKeys() {
+ public synchronized Object[] sendableKeys(ObjectContainer container) {
if(finished)
return new Object[] {};
else
return new Object[] { new Integer(0) };
}
- public synchronized Object[] allKeys() {
- return sendableKeys();
+ public synchronized Object[] allKeys(ObjectContainer container) {
+ return sendableKeys(container);
}
- public synchronized Object chooseKey(KeysFetchingLocally ignored) {
+ public synchronized Object chooseKey(KeysFetchingLocally ignored, ObjectContainer container) {
if(finished) return null;
else return new Integer(0);
}
Modified: branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SingleFileFetcher.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.net.MalformedURLException;
import java.util.LinkedList;
+import com.db4o.ObjectContainer;
+
import freenet.client.ArchiveContext;
import freenet.client.ArchiveExtractCallback;
import freenet.client.ArchiveFailureException;
@@ -121,7 +123,7 @@
// Process the completed data. May result in us going to a
// splitfile, or another SingleFileFetcher, etc.
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched, ObjectContainer container) {
this.sched = sched;
if(parent instanceof ClientGetter)
((ClientGetter)parent).addKeyToBinaryBlob(block);
@@ -132,7 +134,7 @@
Logger.error(this, "block is null! fromStore="+fromStore+", token="+token, new Exception("error"));
return;
}
- Bucket data = extract(block, sched);
+ Bucket data = extract(block, sched, container);
if(data == null) {
if(logMINOR)
Logger.minor(this, "No data");
@@ -142,43 +144,43 @@
if(logMINOR)
Logger.minor(this, "Block "+(block.isMetadata() ? "is metadata" : "is not metadata")+" on "+this);
if(!block.isMetadata()) {
- onSuccess(new FetchResult(clientMetadata, data), sched);
+ onSuccess(new FetchResult(clientMetadata, data), sched, container);
} else {
if(!ctx.followRedirects) {
- onFailure(new FetchException(FetchException.INVALID_METADATA, "Told me not to follow redirects (splitfile block??)"), sched);
+ onFailure(new FetchException(FetchException.INVALID_METADATA, "Told me not to follow redirects (splitfile block??)"), false, sched, container);
return;
}
if(parent.isCancelled()) {
- onFailure(new FetchException(FetchException.CANCELLED), sched);
+ onFailure(new FetchException(FetchException.CANCELLED), false, sched, container);
return;
}
if(data.size() > ctx.maxMetadataSize) {
- onFailure(new FetchException(FetchException.TOO_BIG_METADATA), sched);
+ onFailure(new FetchException(FetchException.TOO_BIG_METADATA), false, sched, container);
return;
}
// Parse metadata
try {
metadata = Metadata.construct(data);
} catch (MetadataParseException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched, container);
return;
} catch (IOException e) {
// Bucket error?
- onFailure(new FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
}
- wrapHandleMetadata(false);
+ wrapHandleMetadata(false, container);
}
}
- protected void onSuccess(FetchResult result, RequestScheduler sched) {
+ protected void onSuccess(FetchResult result, RequestScheduler sched, ObjectContainer container) {
this.sched = sched;
unregister(false);
if(parent.isCancelled()) {
if(logMINOR)
Logger.minor(this, "Parent is cancelled");
result.asBucket().free();
- onFailure(new FetchException(FetchException.CANCELLED), sched);
+ onFailure(new FetchException(FetchException.CANCELLED), false, sched, container);
return;
}
if(!decompressors.isEmpty()) {
@@ -189,10 +191,10 @@
long maxLen = Math.max(ctx.maxTempLength, ctx.maxOutputLength);
data = c.decompress(data, ctx.bucketFactory, maxLen, maxLen * 4, decompressors.isEmpty() ? returnBucket : null);
} catch (IOException e) {
- onFailure(new FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
} catch (CompressionOutputSizeException e) {
- onFailure(new FetchException(FetchException.TOO_BIG, e.estimatedSize, (rcb == parent), result.getMimeType()), sched);
+ onFailure(new FetchException(FetchException.TOO_BIG, e.estimatedSize, (rcb == parent), result.getMimeType()), false, sched, container);
return;
}
}
@@ -205,7 +207,7 @@
// It would be useful to be able to fetch the data ...
// On the other hand such inserts could cause unpredictable results?
// Would be useful to make a redirect to the key we actually fetched.
- rcb.onFailure(new FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many path components in redirects", thisKey), this);
+ rcb.onFailure(new FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many path components in redirects", thisKey), this, container);
} else {
// TOO_MANY_PATH_COMPONENTS
// report to user
@@ -214,15 +216,15 @@
}
FreenetURI tryURI = uri;
tryURI = tryURI.dropLastMetaStrings(metaStrings.size());
- rcb.onFailure(new FetchException(FetchException.TOO_MANY_PATH_COMPONENTS, result.size(), (rcb == parent), result.getMimeType(), tryURI), this);
+ rcb.onFailure(new FetchException(FetchException.TOO_MANY_PATH_COMPONENTS, result.size(), (rcb == parent), result.getMimeType(), tryURI), this, container);
}
result.asBucket().free();
return;
} else if(result.size() > ctx.maxOutputLength) {
- rcb.onFailure(new FetchException(FetchException.TOO_BIG, result.size(), (rcb == parent), result.getMimeType()), this);
+ rcb.onFailure(new FetchException(FetchException.TOO_BIG, result.size(), (rcb == parent), result.getMimeType()), this, container);
result.asBucket().free();
} else {
- rcb.onSuccess(result, this);
+ rcb.onSuccess(result, this, container);
}
}
@@ -235,7 +237,7 @@
* @throws ArchiveFailureException
* @throws ArchiveRestartException
*/
- private synchronized void handleMetadata() throws FetchException, MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ private synchronized void handleMetadata(final ObjectContainer container) throws FetchException, MetadataParseException, ArchiveFailureException, ArchiveRestartException {
while(true) {
if(metadata.isSimpleManifest()) {
if(logMINOR) Logger.minor(this, "Is simple manifest");
@@ -288,19 +290,19 @@
metadata = Metadata.construct(data);
} catch (MetadataParseException e) {
// Invalid metadata
- onFailure(new FetchException(FetchException.INVALID_METADATA, e), sched);
+ onFailure(new FetchException(FetchException.INVALID_METADATA, e), false, sched, container);
return;
} catch (IOException e) {
// Bucket error?
- onFailure(new FetchException(FetchException.BUCKET_ERROR, e), sched);
+ onFailure(new FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
}
- wrapHandleMetadata(true);
+ wrapHandleMetadata(true, container);
}
public void notInArchive() {
- onFailure(new FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot happen as ArchiveManager should synthesise some!"), sched);
+ onFailure(new FetchException(FetchException.INTERNAL_ERROR, "No metadata in container! Cannot happen as ArchiveManager should synthesise some!"), false, sched, container);
}
- }); // will result in this function being called again
+ }, container); // will result in this function being called again
return;
}
continue;
@@ -308,7 +310,7 @@
if(logMINOR) Logger.minor(this, "Is archive-internal redirect");
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata());
String mime = clientMetadata.getMIMEType();
- if(mime != null) rcb.onExpectedMIME(mime);
+ if(mime != null) rcb.onExpectedMIME(mime, container);
if(metaStrings.isEmpty() && isFinal && clientMetadata.getMIMETypeNoParams() != null && ctx.allowedMIMETypes != null &&
!ctx.allowedMIMETypes.contains(clientMetadata.getMIMETypeNoParams())) {
throw new FetchException(FetchException.WRONG_MIME_TYPE, -1, false, clientMetadata.getMIMEType());
@@ -337,7 +339,7 @@
// Return the data
ctx.executor.execute(new Runnable() {
public void run() {
- onSuccess(new FetchResult(clientMetadata, out), sched);
+ onSuccess(new FetchResult(clientMetadata, out), sched, container);
}
}, "SingleFileFetcher onSuccess callback for "+this);
@@ -361,16 +363,16 @@
out = data;
}
} catch (IOException e) {
- onFailure(new FetchException(FetchException.BUCKET_ERROR), sched);
+ onFailure(new FetchException(FetchException.BUCKET_ERROR), false, sched, container);
return;
}
// Return the data
- onSuccess(new FetchResult(clientMetadata, out), sched);
+ onSuccess(new FetchResult(clientMetadata, out), sched, container);
}
public void notInArchive() {
- onFailure(new FetchException(FetchException.NOT_IN_ARCHIVE), sched);
+ onFailure(new FetchException(FetchException.NOT_IN_ARCHIVE), false, sched, container);
}
- });
+ }, container);
// Will call back into this function when it has been fetched.
return;
}
@@ -383,7 +385,7 @@
this.metadata = null;
ctx.ticker.queueTimedJob(new Runnable() {
public void run() {
- f.wrapHandleMetadata(true);
+ f.wrapHandleMetadata(true, container);
}
}, 0);
return;
@@ -391,7 +393,7 @@
if(logMINOR) Logger.minor(this, "Is single-file redirect");
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even splitfiles can have mime types!
String mime = clientMetadata.getMIMEType();
- if(mime != null) rcb.onExpectedMIME(mime);
+ if(mime != null) rcb.onExpectedMIME(mime, container);
String mimeType = clientMetadata.getMIMETypeNoParams();
if(mimeType != null && ArchiveManager.isUsableArchiveType(mimeType) && metaStrings.size() > 0) {
@@ -438,7 +440,7 @@
final SingleFileFetcher f = new SingleFileFetcher(parent, rcb, clientMetadata, redirectedKey, metaStrings, this.uri, addedMetaStrings, ctx, actx, ah, archiveMetadata, maxRetries, recursionLevel, false, token, true, returnBucket, isFinal);
if((redirectedKey instanceof ClientCHK) && !((ClientCHK)redirectedKey).isMetadata())
- rcb.onBlockSetFinished(this);
+ rcb.onBlockSetFinished(this, container);
if(metadata.isCompressed()) {
Compressor codec = Compressor.getCompressionAlgorithmByMetadataID(metadata.getCompressionCodec());
f.addDecompressor(codec);
@@ -446,7 +448,7 @@
parent.onTransition(this, f);
ctx.slowSerialExecutor[parent.priorityClass].execute(new Runnable() {
public void run() {
- f.schedule();
+ f.schedule(container);
}
}, "Schedule "+this);
// All done! No longer our problem!
@@ -456,7 +458,7 @@
clientMetadata.mergeNoOverwrite(metadata.getClientMetadata()); // even splitfiles can have mime types!
String mime = clientMetadata.getMIMEType();
- if(mime != null) rcb.onExpectedMIME(mime);
+ if(mime != null) rcb.onExpectedMIME(mime, container);
String mimeType = clientMetadata.getMIMETypeNoParams();
if(mimeType != null && ArchiveManager.isUsableArchiveType(mimeType) && metaStrings.size() > 0) {
@@ -488,13 +490,13 @@
// It would be useful to be able to fetch the data ...
// On the other hand such inserts could cause unpredictable results?
// Would be useful to make a redirect to the key we actually fetched.
- rcb.onFailure(new FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many path components in redirects", thisKey), this);
+ rcb.onFailure(new FetchException(FetchException.INVALID_METADATA, "Invalid metadata: too many path components in redirects", thisKey), this, container);
} else {
// TOO_MANY_PATH_COMPONENTS
// report to user
FreenetURI tryURI = uri;
tryURI = tryURI.dropLastMetaStrings(metaStrings.size());
- rcb.onFailure(new FetchException(FetchException.TOO_MANY_PATH_COMPONENTS, metadata.uncompressedDataLength(), (rcb == parent), clientMetadata.getMIMEType(), tryURI), this);
+ rcb.onFailure(new FetchException(FetchException.TOO_MANY_PATH_COMPONENTS, metadata.uncompressedDataLength(), (rcb == parent), clientMetadata.getMIMEType(), tryURI), this, container);
}
return;
}
@@ -512,10 +514,10 @@
}
SplitFileFetcher sf = new SplitFileFetcher(metadata, rcb, parent, ctx,
- decompressors, clientMetadata, actx, recursionLevel, returnBucket, token);
+ decompressors, clientMetadata, actx, recursionLevel, returnBucket, token, container);
parent.onTransition(this, sf);
- sf.scheduleOffThread();
- rcb.onBlockSetFinished(this);
+ sf.schedule(container);
+ rcb.onBlockSetFinished(this, container);
// Clear our own metadata, we won't need it any more.
// For multi-level metadata etc see above.
metadata = null;
@@ -541,7 +543,7 @@
decompressors.addLast(codec);
}
- private void fetchArchive(boolean forData, Metadata meta, String element, ArchiveExtractCallback callback) throws FetchException, MetadataParseException, ArchiveFailureException, ArchiveRestartException {
+ private void fetchArchive(boolean forData, Metadata meta, String element, ArchiveExtractCallback callback, final ObjectContainer container) throws FetchException, MetadataParseException, ArchiveFailureException, ArchiveRestartException {
if(logMINOR) Logger.minor(this, "fetchArchive()");
// Fetch the archive
// How?
@@ -559,7 +561,7 @@
// Fetch the archive. The archive fetcher callback will unpack it, and either call the element
// callback, or just go back around handleMetadata() on this, which will see that the data is now
// available.
- f.wrapHandleMetadata(true);
+ f.wrapHandleMetadata(true, container);
}
}, "Fetching archive for "+this);
}
@@ -567,19 +569,19 @@
/**
* Call handleMetadata(), and deal with any resulting exceptions
*/
- private void wrapHandleMetadata(boolean notFinalizedSize) {
+ private void wrapHandleMetadata(boolean notFinalizedSize, ObjectContainer container) {
try {
- handleMetadata();
+ handleMetadata(container);
} catch (MetadataParseException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched, container);
} catch (FetchException e) {
if(notFinalizedSize)
e.setNotFinalizedSize();
- onFailure(e, sched);
+ onFailure(e, false, sched, container);
} catch (ArchiveFailureException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched, container);
} catch (ArchiveRestartException e) {
- onFailure(new FetchException(e), sched);
+ onFailure(new FetchException(e), false, sched, container);
}
}
@@ -595,44 +597,44 @@
this.callback = cb;
}
- public void onSuccess(FetchResult result, ClientGetState state) {
+ public void onSuccess(FetchResult result, ClientGetState state, ObjectContainer container) {
try {
ah.extractToCache(result.asBucket(), actx, element, callback);
} catch (ArchiveFailureException e) {
- SingleFileFetcher.this.onFailure(new FetchException(e), sched);
+ SingleFileFetcher.this.onFailure(new FetchException(e), false, sched, container);
return;
} catch (ArchiveRestartException e) {
- SingleFileFetcher.this.onFailure(new FetchException(e), sched);
+ SingleFileFetcher.this.onFailure(new FetchException(e), false, sched, container);
return;
}
if(callback != null) return;
- wrapHandleMetadata(true);
+ wrapHandleMetadata(true, container);
}
- public void onFailure(FetchException e, ClientGetState state) {
+ public void onFailure(FetchException e, ClientGetState state, ObjectContainer container) {
// Force fatal as the fetcher is presumed to have made a reasonable effort.
- SingleFileFetcher.this.onFailure(e, true, sched);
+ SingleFileFetcher.this.onFailure(e, true, sched, container);
}
- public void onBlockSetFinished(ClientGetState state) {
+ public void onBlockSetFinished(ClientGetState state, ObjectContainer container) {
if(wasFetchingFinalData) {
- rcb.onBlockSetFinished(SingleFileFetcher.this);
+ rcb.onBlockSetFinished(SingleFileFetcher.this, container);
}
}
- public void onTransition(ClientGetState oldState, ClientGetState newState) {
+ public void onTransition(ClientGetState oldState, ClientGetState newState, ObjectContainer container) {
// Ignore
}
- public void onExpectedMIME(String mime) {
+ public void onExpectedMIME(String mime, ObjectContainer container) {
// Ignore
}
- public void onExpectedSize(long size) {
- rcb.onExpectedSize(size);
+ public void onExpectedSize(long size, ObjectContainer container) {
+ rcb.onExpectedSize(size, container);
}
- public void onFinalizedMetadata() {
+ public void onFinalizedMetadata(ObjectContainer container) {
// Ignore
}
@@ -640,42 +642,42 @@
class MultiLevelMetadataCallback implements GetCompletionCallback {
- public void onSuccess(FetchResult result, ClientGetState state) {
+ public void onSuccess(FetchResult result, ClientGetState state, ObjectContainer container) {
try {
metadata = Metadata.construct(result.asBucket());
} catch (MetadataParseException e) {
- SingleFileFetcher.this.onFailure(new FetchException(FetchException.INVALID_METADATA, e), sched);
+ SingleFileFetcher.this.onFailure(new FetchException(FetchException.INVALID_METADATA, e), false, sched, container);
return;
} catch (IOException e) {
// Bucket error?
- SingleFileFetcher.this.onFailure(new FetchException(FetchException.BUCKET_ERROR, e), sched);
+ SingleFileFetcher.this.onFailure(new FetchException(FetchException.BUCKET_ERROR, e), false, sched, container);
return;
}
- wrapHandleMetadata(true);
+ wrapHandleMetadata(true, container);
}
- public void onFailure(FetchException e, ClientGetState state) {
+ public void onFailure(FetchException e, ClientGetState state, ObjectContainer container) {
// Pass it on; fetcher is assumed to have retried as appropriate already, so this is fatal.
- SingleFileFetcher.this.onFailure(e, true, sched);
+ SingleFileFetcher.this.onFailure(e, true, sched, container);
}
- public void onBlockSetFinished(ClientGetState state) {
+ public void onBlockSetFinished(ClientGetState state, ObjectContainer container) {
// Ignore as we are fetching metadata here
}
- public void onTransition(ClientGetState oldState, ClientGetState newState) {
+ public void onTransition(ClientGetState oldState, ClientGetState newState, ObjectContainer container) {
// Ignore
}
- public void onExpectedMIME(String mime) {
+ public void onExpectedMIME(String mime, ObjectContainer container) {
// Ignore
}
- public void onExpectedSize(long size) {
- rcb.onExpectedSize(size);
+ public void onExpectedSize(long size, ObjectContainer container) {
+ rcb.onExpectedSize(size, container);
}
- public void onFinalizedMetadata() {
+ public void onFinalizedMetadata(ObjectContainer container) {
// Ignore
}
@@ -691,7 +693,7 @@
public static ClientGetState create(ClientRequester requester, GetCompletionCallback cb,
ClientMetadata clientMetadata, FreenetURI uri, FetchContext ctx, ArchiveContext actx,
int maxRetries, int recursionLevel, boolean dontTellClientGet, long l, boolean isEssential,
- Bucket returnBucket, boolean isFinal) throws MalformedURLException, FetchException {
+ Bucket returnBucket, boolean isFinal, ObjectContainer container) throws MalformedURLException, FetchException {
BaseClientKey key = BaseClientKey.getBaseKey(uri);
if((clientMetadata == null || clientMetadata.isTrivial()) && (!uri.hasMetaStrings()) &&
ctx.allowSplitfiles == false && ctx.followRedirects == false &&
@@ -700,11 +702,11 @@
if(key instanceof ClientKey)
return new SingleFileFetcher(requester, cb, clientMetadata, (ClientKey)key, uri.listMetaStrings(), uri, 0, ctx, actx, null, null, maxRetries, recursionLevel, dontTellClientGet, l, isEssential, returnBucket, isFinal);
else {
- return uskCreate(requester, cb, clientMetadata, (USK)key, uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel, dontTellClientGet, l, isEssential, returnBucket, isFinal);
+ return uskCreate(requester, cb, clientMetadata, (USK)key, uri.listMetaStrings(), ctx, actx, maxRetries, recursionLevel, dontTellClientGet, l, isEssential, returnBucket, isFinal, container);
}
}
- private static ClientGetState uskCreate(ClientRequester requester, GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList metaStrings, FetchContext ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, long l, boolean isEssential, Bucket returnBucket, boolean isFinal) throws FetchException {
+ private static ClientGetState uskCreate(ClientRequester requester, GetCompletionCallback cb, ClientMetadata clientMetadata, USK usk, LinkedList metaStrings, FetchContext ctx, ArchiveContext actx, int maxRetries, int recursionLevel, boolean dontTellClientGet, long l, boolean isEssential, Bucket returnBucket, boolean isFinal, ObjectContainer container) throws FetchException {
if(usk.suggestedEdition >= 0) {
// Return the latest known version but at least suggestedEdition.
long edition = ctx.uskManager.lookup(usk);
@@ -714,7 +716,7 @@
edition = ctx.uskManager.lookup(usk);
if(edition > usk.suggestedEdition) {
if(logMINOR) Logger.minor(SingleFileFetcher.class, "Redirecting to edition "+edition);
- cb.onFailure(new FetchException(FetchException.PERMANENT_REDIRECT, usk.copy(edition).getURI().addMetaStrings(metaStrings)), null);
+ cb.onFailure(new FetchException(FetchException.PERMANENT_REDIRECT, usk.copy(edition).getURI().addMetaStrings(metaStrings)), null, container);
return null;
} else {
// Transition to SingleFileFetcher
@@ -728,7 +730,7 @@
return sf;
}
} else {
- cb.onFailure(new FetchException(FetchException.PERMANENT_REDIRECT, usk.copy(edition).getURI().addMetaStrings(metaStrings)), null);
+ cb.onFailure(new FetchException(FetchException.PERMANENT_REDIRECT, usk.copy(edition).getURI().addMetaStrings(metaStrings)), null, container);
return null;
}
} else {
@@ -772,27 +774,27 @@
this.returnBucket = returnBucket;
}
- public void onFoundEdition(long l, USK newUSK) {
+ public void onFoundEdition(long l, USK newUSK, ObjectContainer container) {
ClientSSK key = usk.getSSK(l);
try {
if(l == usk.suggestedEdition) {
SingleFileFetcher sf = new SingleFileFetcher(parent, cb, clientMetadata, key, metaStrings, key.getURI().addMetaStrings(metaStrings),
0, ctx, actx, null, null, maxRetries, recursionLevel+1, dontTellClientGet, token, false, returnBucket, true);
- sf.schedule();
+ sf.schedule(container);
} else {
- cb.onFailure(new FetchException(FetchException.PERMANENT_REDIRECT, newUSK.getURI().addMetaStrings(metaStrings)), null);
+ cb.onFailure(new FetchException(FetchException.PERMANENT_REDIRECT, newUSK.getURI().addMetaStrings(metaStrings)), null, container);
}
} catch (FetchException e) {
- cb.onFailure(e, null);
+ cb.onFailure(e, null, container);
}
}
- public void onFailure() {
- cb.onFailure(new FetchException(FetchException.DATA_NOT_FOUND, "No USK found"), null);
+ public void onFailure(ObjectContainer container) {
+ cb.onFailure(new FetchException(FetchException.DATA_NOT_FOUND, "No USK found"), null, container);
}
- public void onCancelled() {
- cb.onFailure(new FetchException(FetchException.CANCELLED, (String)null), null);
+ public void onCancelled(ObjectContainer container) {
+ cb.onFailure(new FetchException(FetchException.CANCELLED, (String)null), null, container);
}
public short getPollingPriorityNormal() {
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.io.OutputStream;
import java.util.LinkedList;
+import com.db4o.ObjectContainer;
+
import freenet.client.ArchiveContext;
import freenet.client.ClientMetadata;
import freenet.client.FetchContext;
@@ -59,7 +61,7 @@
public SplitFileFetcher(Metadata metadata, GetCompletionCallback rcb, ClientRequester parent2,
FetchContext newCtx, LinkedList decompressors, ClientMetadata clientMetadata,
- ArchiveContext actx, int recursionLevel, Bucket returnBucket, long token2) throws FetchException, MetadataParseException {
+ ArchiveContext actx, int recursionLevel, Bucket returnBucket, long token2, ObjectContainer container) throws FetchException, MetadataParseException {
this.finished = false;
this.returnBucket = returnBucket;
this.fetchContext = newCtx;
@@ -86,12 +88,12 @@
finalLength = overrideLength;
}
long eventualLength = Math.max(overrideLength, metadata.uncompressedDataLength());
- cb.onExpectedSize(eventualLength);
+ cb.onExpectedSize(eventualLength, container);
String mimeType = metadata.getMIMEType();
if(mimeType != null)
- cb.onExpectedMIME(mimeType);
+ cb.onExpectedMIME(mimeType, container);
if(metadata.uncompressedDataLength() > 0)
- cb.onFinalizedMetadata();
+ cb.onFinalizedMetadata(container);
if(eventualLength > 0 && newCtx.maxOutputLength > 0 && eventualLength > newCtx.maxOutputLength)
throw new FetchException(FetchException.TOO_BIG, eventualLength, true, clientMetadata.getMIMEType());
@@ -219,7 +221,7 @@
return output;
}
- public void segmentFinished(SplitFileFetcherSegment segment) {
+ public void segmentFinished(SplitFileFetcherSegment segment, ObjectContainer container) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Finished segment: "+segment);
boolean finish = false;
@@ -240,10 +242,10 @@
}
notifyAll();
}
- if(finish) finish();
+ if(finish) finish(container);
}
- private void finish() {
+ private void finish(ObjectContainer container) {
try {
synchronized(this) {
if(finished) {
@@ -262,47 +264,39 @@
if(!decompressors.isEmpty()) out = null;
data = c.decompress(data, fetchContext.bucketFactory, maxLen, maxLen * 4, out);
} catch (IOException e) {
- cb.onFailure(new FetchException(FetchException.BUCKET_ERROR, e), this);
+ cb.onFailure(new FetchException(FetchException.BUCKET_ERROR, e), this, container);
return;
} catch (CompressionOutputSizeException e) {
- cb.onFailure(new FetchException(FetchException.TOO_BIG, e.estimatedSize, false /* FIXME */, clientMetadata.getMIMEType()), this);
+ cb.onFailure(new FetchException(FetchException.TOO_BIG, e.estimatedSize, false /* FIXME */, clientMetadata.getMIMEType()), this, container);
return;
}
}
- cb.onSuccess(new FetchResult(clientMetadata, data), this);
+ cb.onSuccess(new FetchResult(clientMetadata, data), this, container);
} catch (FetchException e) {
- cb.onFailure(e, this);
+ cb.onFailure(e, this, container);
} catch (OutOfMemoryError e) {
OOMHandler.handleOOM(e);
System.err.println("Failing above attempted fetch...");
- cb.onFailure(new FetchException(FetchException.INTERNAL_ERROR, e), this);
+ cb.onFailure(new FetchException(FetchException.INTERNAL_ERROR, e), this, container);
} catch (Throwable t) {
- cb.onFailure(new FetchException(FetchException.INTERNAL_ERROR, t), this);
+ cb.onFailure(new FetchException(FetchException.INTERNAL_ERROR, t), this, container);
}
}
- public void schedule() {
+ public void schedule(ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this)) Logger.minor(this, "Scheduling "+this);
for(int i=0;i<segments.length;i++) {
- segments[i].schedule();
+ segments[i].schedule(container);
}
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
for(int i=0;i<segments.length;i++)
- segments[i].cancel();
+ segments[i].cancel(container);
}
public long getToken() {
return token;
}
- public void scheduleOffThread() {
- fetchContext.slowSerialExecutor[parent.priorityClass].execute(new Runnable() {
- public void run() {
- schedule();
- }
- }, "Splitfile scheduler thread for "+this);
- }
-
}
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.io.OutputStream;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.client.ArchiveContext;
import freenet.client.FECCodec;
import freenet.client.FECJob;
@@ -151,7 +153,7 @@
return fatallyFailedBlocks;
}
- public void onSuccess(Bucket data, int blockNo, SplitFileFetcherSubSegment seg, ClientKeyBlock block) {
+ public void onSuccess(Bucket data, int blockNo, SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer container) {
boolean decodeNow = false;
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on "+seg);
@@ -214,7 +216,7 @@
}
}
- public void onDecodedSegment() {
+ public void onDecodedSegment(ObjectContainer container) {
try {
if(isCollectingBinaryBlob()) {
for(int i=0;i<dataBuckets.length;i++) {
@@ -222,7 +224,7 @@
try {
maybeAddToBinaryBlob(data, i, false);
} catch (FetchException e) {
- fail(e);
+ fail(e, container);
return;
}
}
@@ -241,14 +243,14 @@
// Otherwise a race is possible that might result in it not seeing our finishing.
finished = true;
if(codec == null || !isCollectingBinaryBlob())
- parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container);
} catch (IOException e) {
Logger.normal(this, "Caught bucket error?: "+e, e);
synchronized(this) {
finished = true;
failureException = new FetchException(FetchException.BUCKET_ERROR);
}
- parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container);
return;
}
@@ -265,7 +267,7 @@
}
}
- public void onEncodedSegment() {
+ public void onEncodedSegment(ObjectContainer container) {
synchronized(this) {
// Now insert *ALL* blocks on which we had at least one failure, and didn't eventually succeed
for(int i=0;i<dataBuckets.length;i++) {
@@ -288,7 +290,7 @@
try {
maybeAddToBinaryBlob(data, i, true);
} catch (FetchException e) {
- fail(e);
+ fail(e, container);
return;
}
if(checkRetries[i] > 0)
@@ -304,7 +306,7 @@
}
// Defer the completion until we have generated healing blocks if we are collecting binary blobs.
if(isCollectingBinaryBlob())
- parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container);
}
boolean isCollectingBinaryBlob() {
@@ -337,8 +339,9 @@
fetchContext.healingQueue.queue(data);
}
- /** This is after any retries and therefore is either out-of-retries or fatal */
- public synchronized void onFatalFailure(FetchException e, int blockNo, SplitFileFetcherSubSegment seg) {
+ /** This is after any retries and therefore is either out-of-retries or fatal
+ * @param container */
+ public synchronized void onFatalFailure(FetchException e, int blockNo, SplitFileFetcherSubSegment seg, ObjectContainer container) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Permanently failed block: "+blockNo+" on "+this+" : "+e, e);
boolean allFailed;
@@ -373,13 +376,14 @@
allFailed = failedBlocks + fatallyFailedBlocks > (dataKeys.length + checkKeys.length - minFetched);
}
if(allFailed)
- fail(new FetchException(FetchException.SPLITFILE_ERROR, errors));
+ fail(new FetchException(FetchException.SPLITFILE_ERROR, errors), container);
else
seg.possiblyRemoveFromParent();
}
- /** A request has failed non-fatally, so the block may be retried */
- public void onNonFatalFailure(FetchException e, int blockNo, SplitFileFetcherSubSegment seg, RequestScheduler sched) {
+ /** A request has failed non-fatally, so the block may be retried
+ * @param container */
+ public void onNonFatalFailure(FetchException e, int blockNo, SplitFileFetcherSubSegment seg, RequestScheduler sched, ObjectContainer container) {
int tries;
int maxTries = blockFetchContext.maxNonSplitfileRetries;
boolean failed = false;
@@ -422,7 +426,7 @@
}
}
if(failed) {
- onFatalFailure(e, blockNo, seg);
+ onFatalFailure(e, blockNo, seg, container);
if(logMINOR)
Logger.minor(this, "Not retrying block "+blockNo+" on "+this+" : tries="+tries+"/"+maxTries);
return;
@@ -437,7 +441,7 @@
seg.unregisterKey(key.getNodeKey());
if(logMINOR)
Logger.minor(this, "Retrying block "+blockNo+" on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
- sub.add(blockNo, false);
+ sub.add(blockNo, false, container);
}
}
@@ -454,7 +458,7 @@
return sub;
}
- private void fail(FetchException e) {
+ private void fail(FetchException e, ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
@@ -481,16 +485,16 @@
}
}
removeSubSegments();
- parentFetcher.segmentFinished(this);
+ parentFetcher.segmentFinished(this, container);
}
- public void schedule() {
+ public void schedule(ObjectContainer container) {
try {
SplitFileFetcherSubSegment seg = getSubSegment(0);
for(int i=0;i<dataRetries.length+checkRetries.length;i++)
- seg.add(i, true);
+ seg.add(i, true, container);
- seg.schedule();
+ seg.schedule(container);
synchronized(this) {
scheduled = true;
}
@@ -499,12 +503,12 @@
Logger.minor(this, "scheduling "+seg+" : "+seg.blockNums);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" scheduling "+this, t);
- fail(new FetchException(FetchException.INTERNAL_ERROR, t));
+ fail(new FetchException(FetchException.INTERNAL_ERROR, t), container);
}
}
- public void cancel() {
- fail(new FetchException(FetchException.CANCELLED));
+ public void cancel(ObjectContainer container) {
+ fail(new FetchException(FetchException.CANCELLED), container);
}
public void onBlockSetFinished(ClientGetState state) {
@@ -582,7 +586,7 @@
return checkCooldownTimes[blockNum - dataKeys.length];
}
- public void requeueAfterCooldown(Key key, long time) {
+ public void requeueAfterCooldown(Key key, long time, ObjectContainer container) {
Vector v = null;
boolean notFound = true;
synchronized(this) {
@@ -601,7 +605,7 @@
if(logMINOR)
Logger.minor(this, "Retrying after cooldown on "+this+": data block "+i+" on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
if(v == null) v = new Vector();
- sub.add(i, true);
+ sub.add(i, true, container);
if(!v.contains(sub)) v.add(sub);
notFound = false;
}
@@ -619,7 +623,7 @@
if(logMINOR)
Logger.minor(this, "Retrying after cooldown on "+this+": check block "+i+" on "+this+" : tries="+tries+"/"+maxTries+" : "+sub);
if(v == null) v = new Vector();
- sub.add(i+dataKeys.length, true);
+ sub.add(i+dataKeys.length, true, container);
if(!v.contains(sub)) v.add(sub);
notFound = false;
}
@@ -630,7 +634,7 @@
}
if(v != null) {
for(int i=0;i<v.size();i++) {
- ((SplitFileFetcherSubSegment) v.get(i)).schedule();
+ ((SplitFileFetcherSubSegment) v.get(i)).schedule(container);
}
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSubSegment.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
import java.io.IOException;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.client.FetchException;
import freenet.keys.CHKBlock;
@@ -64,12 +66,12 @@
return ctx;
}
- public Object chooseKey(KeysFetchingLocally keys) {
+ public Object chooseKey(KeysFetchingLocally keys, ObjectContainer container) {
if(cancelled) return null;
return removeRandomBlockNum(keys);
}
- public ClientKey getKey(Object token) {
+ public ClientKey getKey(Object token, ObjectContainer container) {
synchronized(segment) {
if(cancelled) {
if(logMINOR)
@@ -94,14 +96,14 @@
* Fetch the array from the segment because we need to include *ALL* keys, especially
* those on cooldown queues. This is important when unregistering.
*/
- public Object[] allKeys() {
+ public Object[] allKeys(ObjectContainer container) {
return segment.getKeyNumbersAtRetryLevel(retryCount);
}
/**
* Just those keys which are eligible to be started now.
*/
- public Object[] sendableKeys() {
+ public Object[] sendableKeys(ObjectContainer container) {
return blockNums.toArray();
}
@@ -136,7 +138,7 @@
}
}
- public boolean hasValidKeys(KeysFetchingLocally keys) {
+ public boolean hasValidKeys(KeysFetchingLocally keys, ObjectContainer container) {
synchronized(segment) {
for(int i=0;i<10;i++) {
Object ret;
@@ -165,49 +167,49 @@
// Translate it, then call the real onFailure
// FIXME refactor this out to a common method; see SimpleSingleFileFetcher
- public void onFailure(LowLevelGetException e, Object token, RequestScheduler sched) {
+ public void onFailure(LowLevelGetException e, Object token, RequestScheduler sched, ObjectContainer container) {
if(logMINOR)
Logger.minor(this, "onFailure("+e+" , "+token);
switch(e.code) {
case LowLevelGetException.DATA_NOT_FOUND:
- onFailure(new FetchException(FetchException.DATA_NOT_FOUND), token, sched);
+ onFailure(new FetchException(FetchException.DATA_NOT_FOUND), token, sched, container);
return;
case LowLevelGetException.DATA_NOT_FOUND_IN_STORE:
- onFailure(new FetchException(FetchException.DATA_NOT_FOUND), token, sched);
+ onFailure(new FetchException(FetchException.DATA_NOT_FOUND), token, sched, container);
return;
case LowLevelGetException.RECENTLY_FAILED:
- onFailure(new FetchException(FetchException.RECENTLY_FAILED), token, sched);
+ onFailure(new FetchException(FetchException.RECENTLY_FAILED), token, sched, container);
return;
case LowLevelGetException.DECODE_FAILED:
- onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched);
+ onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched, container);
return;
case LowLevelGetException.INTERNAL_ERROR:
- onFailure(new FetchException(FetchException.INTERNAL_ERROR), token, sched);
+ onFailure(new FetchException(FetchException.INTERNAL_ERROR), token, sched, container);
return;
case LowLevelGetException.REJECTED_OVERLOAD:
- onFailure(new FetchException(FetchException.REJECTED_OVERLOAD), token, sched);
+ onFailure(new FetchException(FetchException.REJECTED_OVERLOAD), token, sched, container);
return;
case LowLevelGetException.ROUTE_NOT_FOUND:
- onFailure(new FetchException(FetchException.ROUTE_NOT_FOUND), token, sched);
+ onFailure(new FetchException(FetchException.ROUTE_NOT_FOUND), token, sched, container);
return;
case LowLevelGetException.TRANSFER_FAILED:
- onFailure(new FetchException(FetchException.TRANSFER_FAILED), token, sched);
+ onFailure(new FetchException(FetchException.TRANSFER_FAILED), token, sched, container);
return;
case LowLevelGetException.VERIFY_FAILED:
- onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched);
+ onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR), token, sched, container);
return;
case LowLevelGetException.CANCELLED:
- onFailure(new FetchException(FetchException.CANCELLED), token, sched);
+ onFailure(new FetchException(FetchException.CANCELLED), token, sched, container);
return;
default:
Logger.error(this, "Unknown LowLevelGetException code: "+e.code);
- onFailure(new FetchException(FetchException.INTERNAL_ERROR), token, sched);
+ onFailure(new FetchException(FetchException.INTERNAL_ERROR), token, sched, container);
return;
}
}
// Real onFailure
- protected void onFailure(FetchException e, Object token, RequestScheduler sched) {
+ protected void onFailure(FetchException e, Object token, RequestScheduler sched, ObjectContainer container) {
boolean forceFatal = false;
if(parent.isCancelled()) {
if(Logger.shouldLog(Logger.MINOR, this))
@@ -217,14 +219,14 @@
}
segment.errors.inc(e.getMode());
if(e.isFatal() || forceFatal) {
- segment.onFatalFailure(e, ((Integer)token).intValue(), this);
+ segment.onFatalFailure(e, ((Integer)token).intValue(), this, container);
} else {
- segment.onNonFatalFailure(e, ((Integer)token).intValue(), this, sched);
+ segment.onNonFatalFailure(e, ((Integer)token).intValue(), this, sched, container);
}
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched) {
- Bucket data = extract(block, token, sched);
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched, ObjectContainer container) {
+ Bucket data = extract(block, token, sched, container);
if(fromStore) {
// Normally when this method is called the block number has already
// been removed. However if fromStore=true, it won't have been, so
@@ -242,39 +244,39 @@
}
}
if(!block.isMetadata()) {
- onSuccess(data, fromStore, (Integer)token, ((Integer)token).intValue(), block, sched);
+ onSuccess(data, fromStore, (Integer)token, ((Integer)token).intValue(), block, sched, container);
} else {
- onFailure(new FetchException(FetchException.INVALID_METADATA, "Metadata where expected data"), token, sched);
+ onFailure(new FetchException(FetchException.INVALID_METADATA, "Metadata where expected data"), token, sched, container);
}
}
- protected void onSuccess(Bucket data, boolean fromStore, Integer token, int blockNo, ClientKeyBlock block, RequestScheduler sched) {
+ protected void onSuccess(Bucket data, boolean fromStore, Integer token, int blockNo, ClientKeyBlock block, RequestScheduler sched, ObjectContainer container) {
if(parent.isCancelled()) {
data.free();
- onFailure(new FetchException(FetchException.CANCELLED), token, sched);
+ onFailure(new FetchException(FetchException.CANCELLED), token, sched, container);
return;
}
- segment.onSuccess(data, blockNo, this, block);
+ segment.onSuccess(data, blockNo, this, block, container);
}
/** Convert a ClientKeyBlock to a Bucket. If an error occurs, report it via onFailure
* and return null.
*/
- protected Bucket extract(ClientKeyBlock block, Object token, RequestScheduler sched) {
+ protected Bucket extract(ClientKeyBlock block, Object token, RequestScheduler sched, ObjectContainer container) {
Bucket data;
try {
data = block.decode(ctx.bucketFactory, (int)(Math.min(ctx.maxOutputLength, Integer.MAX_VALUE)), false);
} catch (KeyDecodeException e1) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Decode failure: "+e1, e1);
- onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), token, sched);
+ onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR, e1.getMessage()), token, sched, container);
return null;
} catch (TooBigException e) {
- onFailure(new FetchException(FetchException.TOO_BIG, e.getMessage()), token, sched);
+ onFailure(new FetchException(FetchException.TOO_BIG, e.getMessage()), token, sched, container);
return null;
} catch (IOException e) {
Logger.error(this, "Could not capture data - disk full?: "+e, e);
- onFailure(new FetchException(FetchException.BUCKET_ERROR, e), token, sched);
+ onFailure(new FetchException(FetchException.BUCKET_ERROR, e), token, sched, container);
return null;
}
if(Logger.shouldLog(Logger.MINOR, this))
@@ -327,7 +329,7 @@
return false;
}
- public void add(int blockNo, boolean dontSchedule) {
+ public void add(int blockNo, boolean dontSchedule, ObjectContainer container) {
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Adding block "+blockNo+" to "+this+" dontSchedule="+dontSchedule);
if(blockNo < 0) throw new IllegalArgumentException();
@@ -357,7 +359,7 @@
schedule = false;
}
}
- if(schedule) schedule();
+ if(schedule) schedule(container);
else if(!dontSchedule)
// Already scheduled, however this key may not be registered.
getScheduler().addPendingKey(segment.getBlockKey(blockNo), this);
@@ -380,7 +382,7 @@
unregister(false);
}
- public void onGotKey(Key key, KeyBlock block, RequestScheduler sched) {
+ public void onGotKey(Key key, KeyBlock block, RequestScheduler sched, ObjectContainer container) {
if(logMINOR) Logger.minor(this, "onGotKey("+key+")");
// Find and remove block if it is on this subsegment. However it may have been
// removed already.
@@ -407,15 +409,15 @@
try {
cb = new ClientCHKBlock((CHKBlock)block, ckey);
} catch (CHKVerifyException e) {
- onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR, e), token, sched);
+ onFailure(new FetchException(FetchException.BLOCK_DECODE_ERROR, e), token, sched, container);
return;
}
- Bucket data = extract(cb, token, sched);
+ Bucket data = extract(cb, token, sched, container);
if(!cb.isMetadata()) {
- onSuccess(data, false, (Integer)token, ((Integer)token).intValue(), cb, sched);
+ onSuccess(data, false, (Integer)token, ((Integer)token).intValue(), cb, sched, container);
} else {
- onFailure(new FetchException(FetchException.INVALID_METADATA, "Metadata where expected data"), token, sched);
+ onFailure(new FetchException(FetchException.INVALID_METADATA, "Metadata where expected data"), token, sched, container);
}
}
@@ -435,21 +437,21 @@
}
}
- public long getCooldownWakeup(Object token) {
+ public long getCooldownWakeup(Object token, ObjectContainer container) {
return segment.getCooldownWakeup(((Integer)token).intValue());
}
- public void requeueAfterCooldown(Key key, long time) {
+ public void requeueAfterCooldown(Key key, long time, ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Requeueing after cooldown "+key+" for "+this);
- segment.requeueAfterCooldown(key, time);
+ segment.requeueAfterCooldown(key, time, container);
}
- public long getCooldownWakeupByKey(Key key) {
+ public long getCooldownWakeupByKey(Key key, ObjectContainer container) {
return segment.getCooldownWakeupByKey(key);
}
- public void resetCooldownTimes() {
+ public void resetCooldownTimes(ObjectContainer container) {
synchronized(segment) {
segment.resetCooldownTimes((Integer[])blockNums.toArray(new Integer[blockNums.size()]));
}
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileInserter.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -6,6 +6,8 @@
import java.io.IOException;
import java.util.Vector;
+import com.db4o.ObjectContainer;
+
import freenet.client.ClientMetadata;
import freenet.client.FECCodec;
import freenet.client.FailureCodeTracker;
@@ -228,9 +230,9 @@
return (SplitFileInserterSegment[]) segs.toArray(new SplitFileInserterSegment[segs.size()]);
}
- public void start() throws InsertException {
+ public void start(ObjectContainer container) throws InsertException {
for(int i=0;i<segments.length;i++)
- segments[i].start();
+ segments[i].start(container);
if(countDataBlocks > 32)
parent.onMajorProgress();
@@ -359,7 +361,7 @@
return parent;
}
- public void segmentFinished(SplitFileInserterSegment segment) {
+ public void segmentFinished(SplitFileInserterSegment segment, ObjectContainer container) {
if(logMINOR) Logger.minor(this, "Segment finished: "+segment, new Exception("debug"));
boolean allGone = true;
if(countDataBlocks > 32)
@@ -379,7 +381,7 @@
InsertException e = segment.getException();
if((e != null) && e.isFatal()) {
- cancel();
+ cancel(container);
} else {
if(!allGone) return;
}
@@ -431,7 +433,7 @@
}
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
synchronized(this) {
if(finished) return;
finished = true;
@@ -440,8 +442,8 @@
segments[i].cancel();
}
- public void schedule() throws InsertException {
- start();
+ public void schedule(ObjectContainer container) throws InsertException {
+ start(container);
}
public Object getToken() {
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -2,6 +2,8 @@
import java.net.MalformedURLException;
+import com.db4o.ObjectContainer;
+
import freenet.client.FECCodec;
import freenet.client.FECJob;
import freenet.client.FailureCodeTracker;
@@ -393,7 +395,7 @@
return fs;
}
- public void start() throws InsertException {
+ public void start(ObjectContainer container) throws InsertException {
if (logMINOR)
Logger.minor(this, "Starting segment " + segNo + " of " + parent
+ " (" + parent.dataLength + "): " + this + " ( finished="
@@ -444,7 +446,7 @@
} else
parent.parent.completedBlock(true);
}
- onEncodedSegment();
+ onEncodedSegment(container);
}
if (hasURIs) {
parent.segmentHasURIs(this);
@@ -458,13 +460,13 @@
if (fin)
finish();
if (finished) {
- parent.segmentFinished(this);
+ parent.segmentFinished(this, container);
}
}
- public void onDecodedSegment() {} // irrevelant
+ public void onDecodedSegment(ObjectContainer container) {} // irrevelant
- public void onEncodedSegment() {
+ public void onEncodedSegment(ObjectContainer container) {
// Start the inserts
try {
for (int i = 0; i < checkBlockInserters.length; i++) {
Modified: branches/db4o/freenet/src/freenet/client/async/USKCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKCallback.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKCallback.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.keys.USK;
/**
@@ -14,7 +16,7 @@
/** Found the latest edition.
* @param l The edition number.
* @param key The key. */
- void onFoundEdition(long l, USK key);
+ void onFoundEdition(long l, USK key, ObjectContainer container);
/**
* Priority at which the polling should run normally.
Modified: branches/db4o/freenet/src/freenet/client/async/USKChecker.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKChecker.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKChecker.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.keys.ClientKey;
import freenet.keys.ClientKeyBlock;
@@ -26,12 +28,12 @@
this.cb = cb;
}
- public void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched) {
+ public void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched, ObjectContainer container) {
unregister(false);
cb.onSuccess((ClientSSKBlock)block);
}
- public void onFailure(LowLevelGetException e, Object token, RequestScheduler sched) {
+ public void onFailure(LowLevelGetException e, Object token, RequestScheduler sched, ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "onFailure: "+e+" for "+this);
// Firstly, can we retry?
@@ -61,7 +63,7 @@
canRetry = true;
}
- if(canRetry && retry(sched)) return;
+ if(canRetry && retry(sched, container)) return;
// Ran out of retries.
unregister(false);
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcherCallback.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
/**
* Callback interface for USK fetches. If you submit a USK fetch via
* USKManager.getFetcher, then register yourself on it as a listener, then you
@@ -11,8 +13,8 @@
public interface USKFetcherCallback extends USKCallback {
/** Failed to find any edition at all (later than or equal to the specified hint) */
- void onFailure();
+ void onFailure(ObjectContainer container);
- void onCancelled();
+ void onCancelled(ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKFetcherWrapper.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client.async;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchException;
import freenet.client.FetchResult;
import freenet.keys.FreenetURI;
@@ -33,19 +35,19 @@
// Do nothing
}
- public void onSuccess(FetchResult result, ClientGetState state) {
+ public void onSuccess(FetchResult result, ClientGetState state, ObjectContainer container) {
// Ignore; we don't do anything with it because we are running in the background.
}
- public void onFailure(FetchException e, ClientGetState state) {
+ public void onFailure(FetchException e, ClientGetState state, ObjectContainer container) {
// Ignore
}
- public void onBlockSetFinished(ClientGetState state) {
+ public void onBlockSetFinished(ClientGetState state, ObjectContainer container) {
// Ignore
}
- public void onTransition(ClientGetState oldState, ClientGetState newState) {
+ public void onTransition(ClientGetState oldState, ClientGetState newState, ObjectContainer container) {
// Ignore
}
@@ -53,15 +55,15 @@
return super.toString()+ ':' +usk;
}
- public void onExpectedMIME(String mime) {
+ public void onExpectedMIME(String mime, ObjectContainer container) {
// Ignore
}
- public void onExpectedSize(long size) {
+ public void onExpectedSize(long size, ObjectContainer container) {
// Ignore
}
- public void onFinalizedMetadata() {
+ public void onFinalizedMetadata(ObjectContainer container) {
// Ignore
}
}
Modified: branches/db4o/freenet/src/freenet/client/async/USKInserter.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/USKInserter.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/client/async/USKInserter.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -7,6 +7,8 @@
import java.net.MalformedURLException;
import java.util.Arrays;
+import com.db4o.ObjectContainer;
+
import freenet.client.InsertContext;
import freenet.client.InsertException;
import freenet.client.Metadata;
@@ -51,7 +53,7 @@
/** After attempting inserts on this many slots, go back to the Fetcher */
private static final long MAX_TRIED_SLOTS = 10;
- public void schedule() throws InsertException {
+ public void schedule(ObjectContainer container) throws InsertException {
// Caller calls schedule()
// schedule() calls scheduleFetcher()
// scheduleFetcher() creates a Fetcher (set up to tell us about author-errors as well as valid inserts)
@@ -79,7 +81,7 @@
fetcher.schedule();
}
- public void onFoundEdition(long l, USK key) {
+ public void onFoundEdition(long l, USK key, ObjectContainer container) {
boolean alreadyInserted = false;
synchronized(this) {
edition = Math.max(l, edition);
@@ -111,11 +113,11 @@
parent.completedBlock(true);
cb.onSuccess(this);
} else {
- scheduleInsert();
+ scheduleInsert(container);
}
}
- private void scheduleInsert() {
+ private void scheduleInsert(ObjectContainer container) {
long edNo = Math.max(edition, ctx.uskManager.lookup(pubUSK)+1);
synchronized(this) {
if(finished) return;
@@ -126,7 +128,7 @@
ctx, this, isMetadata, sourceLength, token, getCHKOnly, false, true /* we don't use it */, tokenObject);
}
try {
- sbi.schedule();
+ sbi.schedule(container);
} catch (InsertException e) {
cb.onFailure(e, this);
}
@@ -149,7 +151,7 @@
// FINISHED!!!! Yay!!!
}
- public synchronized void onFailure(InsertException e, ClientPutState state) {
+ public synchronized void onFailure(InsertException e, ClientPutState state, ObjectContainer container) {
sbi = null;
if(e.getMode() == InsertException.COLLISION) {
// Try the next slot
@@ -157,7 +159,7 @@
if(consecutiveCollisions++ > MAX_TRIED_SLOTS)
scheduleFetcher();
else
- scheduleInsert();
+ scheduleInsert(container);
} else {
cb.onFailure(e, state);
}
@@ -190,11 +192,11 @@
return parent;
}
- public void cancel() {
+ public void cancel(ObjectContainer container) {
if(fetcher != null)
fetcher.cancel();
if(sbi != null)
- sbi.cancel();
+ sbi.cancel(container);
synchronized(this) {
finished = true;
}
Modified: branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java
===================================================================
--- branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/clients/http/bookmark/BookmarkManager.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -10,6 +10,9 @@
import java.net.MalformedURLException;
import java.util.Date;
import java.util.HashMap;
+
+import com.db4o.ObjectContainer;
+
import freenet.client.async.USKCallback;
import freenet.keys.FreenetURI;
import freenet.keys.USK;
@@ -91,7 +94,7 @@
private class USKUpdatedCallback implements USKCallback {
- public void onFoundEdition(long edition, USK key) {
+ public void onFoundEdition(long edition, USK key, ObjectContainer container) {
BookmarkItems items = MAIN_CATEGORY.getAllItems();
for(int i = 0; i < items.size(); i++) {
if(!"USK".equals(items.get(i).getKeyType()))
Modified: branches/db4o/freenet/src/freenet/node/BaseSendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/BaseSendableGet.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/BaseSendableGet.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -1,12 +1,14 @@
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.keys.Key;
public abstract class BaseSendableGet extends SendableRequest {
/** Get a numbered key to fetch. */
- public abstract Key getNodeKey(Object token);
+ public abstract Key getNodeKey(Object token, ObjectContainer container);
- public abstract boolean hasValidKeys(KeysFetchingLocally fetching);
+ public abstract boolean hasValidKeys(KeysFetchingLocally fetching, ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -113,8 +113,14 @@
* - Only one weak-reference cache for the database.
* - No need to refresh live objects.
* - Deactivation is simpler.
+ * Note that the priorities are thread priorities, not request priorities.
*/
public final PrioritizedSerialExecutor clientDatabaseExecutor;
+ /**
+ * Whenever a new request is added, we have to check the datastore. We funnel all such access
+ * through this thread. Note that the priorities are request priorities, not thread priorities.
+ */
+ public final PrioritizedSerialExecutor datastoreCheckerExecutor;
public static int maxBackgroundUSKFetchers;
@@ -141,6 +147,7 @@
clientSlowSerialExecutor[i] = new SerialExecutor(prio);
}
clientDatabaseExecutor = new PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY, NativeThread.MAX_PRIORITY+1, NativeThread.NORM_PRIORITY);
+ datastoreCheckerExecutor = new PrioritizedSerialExecutor(NativeThread.NORM_PRIORITY, RequestStarter.NUMBER_OF_PRIORITY_CLASSES, 0);
byte[] pwdBuf = new byte[16];
random.nextBytes(pwdBuf);
this.formPassword = Base64.encode(pwdBuf);
Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -58,4 +58,8 @@
public void callFailure(final SendableGet get, final LowLevelGetException e, final Object keyNum, int prio, String name);
+ public void callFailure(final SendableInsert put, final LowLevelPutException e, final Object keyNum, int prio, String name);
+
+ public void callSuccess(final SendableInsert put, final Object keyNum, int prio, String name);
+
}
Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.client.FetchContext;
import freenet.client.async.ClientRequestScheduler;
import freenet.client.async.ClientRequester;
@@ -25,10 +27,10 @@
public final ClientRequester parent;
/** Get a numbered key to fetch. */
- public abstract ClientKey getKey(Object token);
+ public abstract ClientKey getKey(Object token, ObjectContainer container);
- public Key getNodeKey(Object token) {
- ClientKey key = getKey(token);
+ public Key getNodeKey(Object token, ObjectContainer container) {
+ ClientKey key = getKey(token, container);
if(key == null) return null;
return key.getNodeKey();
}
@@ -37,10 +39,10 @@
public abstract FetchContext getContext();
/** Called when/if the low-level request succeeds. */
- public abstract void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched);
+ public abstract void onSuccess(ClientKeyBlock block, boolean fromStore, Object token, RequestScheduler sched, ObjectContainer container);
/** Called when/if the low-level request fails. */
- public abstract void onFailure(LowLevelGetException e, Object token, RequestScheduler sched);
+ public abstract void onFailure(LowLevelGetException e, Object token, RequestScheduler sched, ObjectContainer container);
/** Should the request ignore the datastore? */
public abstract boolean ignoreStore();
@@ -58,7 +60,7 @@
* @return True if a request was executed. False if caller should try to find another request, and remove
* this one from the queue. */
public boolean send(NodeClientCore core, final RequestScheduler sched, final Object keyNum) {
- ClientKey key = getKey(keyNum);
+ ClientKey key = getKey(keyNum, container);
if(key == null) {
Logger.error(this, "Key is null in send(): keyNum = "+keyNum+" for "+this);
return false;
@@ -66,11 +68,6 @@
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Sending get for key "+keyNum+" : "+key);
FetchContext ctx = getContext();
- long now = System.currentTimeMillis();
- if(getCooldownWakeupByKey(key.getNodeKey()) > now) {
- Logger.error(this, "Key is still on the cooldown queue in send() for "+this+" - key = "+key, new Exception("error"));
- return false;
- }
boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(isCancelled()) {
if(logMINOR) Logger.minor(this, "Cancelled: "+this);
@@ -99,7 +96,7 @@
return true;
}
- public void schedule() {
+ public void schedule(ObjectContainer container) {
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Scheduling "+this);
getScheduler().register(this);
@@ -118,7 +115,7 @@
* @param block
* @param sched
*/
- public abstract void onGotKey(Key key, KeyBlock block, RequestScheduler sched);
+ public abstract void onGotKey(Key key, KeyBlock block, RequestScheduler sched, ObjectContainer container);
/**
* Get the time at which the key specified by the given token will wake up from the
@@ -126,12 +123,12 @@
* @param token
* @return
*/
- public abstract long getCooldownWakeup(Object token);
+ public abstract long getCooldownWakeup(Object token, ObjectContainer container);
- public abstract long getCooldownWakeupByKey(Key key);
+ public abstract long getCooldownWakeupByKey(Key key, ObjectContainer container);
/** Reset the cooldown times when the request is reregistered. */
- public abstract void resetCooldownTimes();
+ public abstract void resetCooldownTimes(ObjectContainer container);
public final void unregister(boolean staySubscribed) {
if(!staySubscribed)
@@ -143,7 +140,7 @@
getScheduler().removePendingKey(this, false, key);
}
- public void internalError(final Object keyNum, final Throwable t, final RequestScheduler sched) {
+ public void internalError(final Object keyNum, final Throwable t, final RequestScheduler sched, ObjectContainer container) {
sched.callFailure(this, new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR, t.getMessage(), t), keyNum, NativeThread.MAX_PRIORITY, "Internal error");
}
@@ -152,6 +149,6 @@
* Only requeue if our requeue time is less than or equal to the given time.
* @param key
*/
- public abstract void requeueAfterCooldown(Key key, long time);
+ public abstract void requeueAfterCooldown(Key key, long time, ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/node/SendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableInsert.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SendableInsert.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import com.db4o.ObjectContainer;
+
/**
* Callback interface for a low level insert, which is immediately sendable. These
* should be registered on the ClientRequestScheduler when we want to send them. It will
@@ -12,13 +14,13 @@
public abstract class SendableInsert extends SendableRequest {
/** Called when we successfully insert the data */
- public abstract void onSuccess(Object keyNum);
+ public abstract void onSuccess(Object keyNum, ObjectContainer container);
/** Called when we don't! */
- public abstract void onFailure(LowLevelPutException e, Object keyNum);
+ public abstract void onFailure(LowLevelPutException e, Object keyNum, ObjectContainer container);
- public void internalError(Object keyNum, Throwable t, RequestScheduler sched) {
- onFailure(new LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t), keyNum);
+ public void internalError(Object keyNum, Throwable t, RequestScheduler sched, ObjectContainer container) {
+ onFailure(new LowLevelPutException(LowLevelPutException.INTERNAL_ERROR, t.getMessage(), t), keyNum, container);
}
}
Modified: branches/db4o/freenet/src/freenet/node/SendableRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SendableRequest.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -1,5 +1,7 @@
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.client.async.ClientRequester;
import freenet.support.Logger;
import freenet.support.RandomGrabArray;
@@ -25,15 +27,15 @@
* (but not the key itself, implementors must have a separate queue of block
* numbers and mapping of block numbers to keys).
* @return An object identifying a specific key. null indicates no keys available. */
- public abstract Object chooseKey(KeysFetchingLocally keys);
+ public abstract Object chooseKey(KeysFetchingLocally keys, ObjectContainer container);
/** All key identifiers. Including those not currently eligible to be sent because
* they are on a cooldown queue, requests for them are in progress, etc. */
- public abstract Object[] allKeys();
+ public abstract Object[] allKeys(ObjectContainer container);
/** All key identifiers currently eligible to be sent. Does not include those
* currently running, on the cooldown queue etc. */
- public abstract Object[] sendableKeys();
+ public abstract Object[] sendableKeys(ObjectContainer container);
/** ONLY called by RequestStarter. Start the actual request using the NodeClientCore
* provided, and the key and key number earlier got from chooseKey().
@@ -85,6 +87,6 @@
}
/** Requeue after an internal error */
- public abstract void internalError(Object keyNum, Throwable t, RequestScheduler sched);
+ public abstract void internalError(Object keyNum, Throwable t, RequestScheduler sched, ObjectContainer container);
}
Modified: branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java 2008-06-02 15:53:31 UTC (rev 20188)
+++ branches/db4o/freenet/src/freenet/node/SimpleSendableInsert.java 2008-06-03 20:49:20 UTC (rev 20189)
@@ -3,6 +3,8 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.node;
+import com.db4o.ObjectContainer;
+
import freenet.client.async.ClientRequestScheduler;
import freenet.client.async.ClientRequester;
import freenet.keys.CHKBlock;
@@ -42,13 +44,13 @@
this.scheduler = scheduler;
}
- public void onSuccess(Object keyNum) {
+ public void onSuccess(Object keyNum, ObjectContainer container) {
// Yay!
if(Logger.shouldLog(Logger.MINOR, this))
Logger.minor(this, "Finished insert of "+block);
}
- public void onFailure(LowLevelPutException e, Object keyNum) {
+ public void onFailure(LowLevel