[freenet-cvs] r13166 - trunk/freenet/src/freenet/store
juiceman at freenetproject.org
juiceman at freenetproject.org
Mon May 7 19:55:36 UTC 2007
Author: juiceman
Date: 2007-05-07 19:55:36 +0000 (Mon, 07 May 2007)
New Revision: 13166
Modified:
trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
Log:
Spaces -> tabs
Modified: trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java
===================================================================
--- trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2007-05-07 19:44:38 UTC (rev 13165)
+++ trunk/freenet/src/freenet/store/BerkeleyDBFreenetStore.java 2007-05-07 19:55:36 UTC (rev 13166)
@@ -46,21 +46,21 @@
import freenet.support.Logger;
import freenet.support.SortedLongSet;
-/**
- * Freenet datastore based on BerkelyDB Java Edition by sleepycat software
- * More info at http://www.sleepycat.com/products/bdbje.html
- *
- * @author tubbie
- *
- * TODO: Fix ugly Exception handling
- *
- */
+/**
+* Freenet datastore based on BerkelyDB Java Edition by sleepycat software
+* More info at http://www.sleepycat.com/products/bdbje.html
+*
+* @author tubbie
+*
+* TODO: Fix ugly Exception handling
+*
+*/
public class BerkeleyDBFreenetStore implements FreenetStore {
private static boolean logMINOR;
- final int dataBlockSize;
- final int headerBlockSize;
+ final int dataBlockSize;
+ final int headerBlockSize;
private final Environment environment;
private final TupleBinding storeBlockTupleBinding;
@@ -85,25 +85,25 @@
private boolean reallyClosed;
private final static byte[] dummy = new byte[0];
- public static BerkeleyDBFreenetStore construct(int lastVersion, File baseStoreDir, boolean isStore,
+ public static BerkeleyDBFreenetStore construct(int lastVersion, File baseStoreDir, boolean isStore,
String suffix, long maxStoreKeys, int blockSize, int headerSize, boolean throwOnTooFewKeys, short type, Environment storeEnvironment, RandomSource random, SemiOrderedShutdownHook storeShutdownHook) throws DatabaseException, IOException {
/**
- * Migration strategy:
- *
- * If nothing exists, create a new database in the storeEnvironment and store files of new names.
- * Else
- * If the old store directories exist:
- * If the old store file does not exist, delete the old store directories, and create a new database in the storeEnvironment and store files of new names.
- * Try to load the old database.
- * If successful
- * Migrate to the new database.
- * Move the files.
- * If not successful
- * Reconstruct the new database from the old file.
- * Move the old file to the new location.
- *
- */
+ * Migration strategy:
+ *
+ * If nothing exists, create a new database in the storeEnvironment and store files of new names.
+ * Else
+ * If the old store directories exist:
+ * If the old store file does not exist, delete the old store directories, and create a new database in the storeEnvironment and store files of new names.
+ * Try to load the old database.
+ * If successful
+ * Migrate to the new database.
+ * Move the files.
+ * If not successful
+ * Reconstruct the new database from the old file.
+ * Move the old file to the new location.
+ *
+ */
// Location of old directory.
String oldDirName = oldTypeName(type) + (isStore ? "store" : "cache") + suffix;
@@ -251,9 +251,9 @@
System.err.println("Migrating data from old Environment to new Environment");
/** Reads from old database */
- Cursor c = null;
- /** Writes to new store */
- Transaction t = null;
+ Cursor c = null;
+ /** Writes to new store */
+ Transaction t = null;
try {
// Read from old database
t = newStore.environment.beginTransaction(null, null);
@@ -328,12 +328,12 @@
}
- private static BerkeleyDBFreenetStore openStore(Environment storeEnvironment, String newDBPrefix, File newStoreFile,
- File newFixSecondaryFile, long maxStoreKeys, int blockSize, int headerSize, boolean throwOnTooFewKeys,
+ private static BerkeleyDBFreenetStore openStore(Environment storeEnvironment, String newDBPrefix, File newStoreFile,
+ File newFixSecondaryFile, long maxStoreKeys, int blockSize, int headerSize, boolean throwOnTooFewKeys,
boolean noCheck, int lastVersion, short type, boolean wipe, SemiOrderedShutdownHook storeShutdownHook) throws DatabaseException, IOException {
try {
- return new BerkeleyDBFreenetStore(storeEnvironment, newDBPrefix, newStoreFile, newFixSecondaryFile,
+ return new BerkeleyDBFreenetStore(storeEnvironment, newDBPrefix, newStoreFile, newFixSecondaryFile,
maxStoreKeys, blockSize, headerSize, throwOnTooFewKeys, noCheck, wipe, storeShutdownHook);
} catch (DatabaseException e) {
@@ -420,14 +420,14 @@
}
/**
- * Initializes database
- * @param noCheck If true, don't check for holes etc.
- * @param wipe If true, wipe the database first.
- * @param the directory where the store is located
- * @throws IOException
- * @throws DatabaseException
- * @throws FileNotFoundException if the dir does not exist and could not be created
- */
+ * Initializes database
+ * @param noCheck If true, don't check for holes etc.
+ * @param wipe If true, wipe the database first.
+ * @param the directory where the store is located
+ * @throws IOException
+ * @throws DatabaseException
+ * @throws FileNotFoundException if the dir does not exist and could not be created
+ */
public BerkeleyDBFreenetStore(Environment env, String prefix, File storeFile, File fixSecondaryFile, long maxChkBlocks, int blockSize, int headerSize, boolean throwOnTooFewKeys, boolean noCheck, boolean wipe, SemiOrderedShutdownHook storeShutdownHook) throws IOException, DatabaseException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
this.dataBlockSize = blockSize;
@@ -458,7 +458,7 @@
Logger.error(this, "This may take some time...");
System.err.println("Recreating secondary databases");
System.err.println("This may take some time...");
- WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000+chkDB.count()*100)));
+ WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000+chkDB.count()*100)));
// Of course it's not a solution but a quick fix
// Integer.MAX_VALUE seems to trigger an overflow or whatever ...
// Either we find out what the maximum value is and we do a static method somewhere ensuring
@@ -489,7 +489,7 @@
secDbConfig.setTransactional(true);
secDbConfig.setAllowPopulate(false);
storeBlockTupleBinding = new StoreBlockTupleBinding();
- AccessTimeKeyCreator accessTimeKeyCreator =
+ AccessTimeKeyCreator accessTimeKeyCreator =
new AccessTimeKeyCreator(storeBlockTupleBinding);
secDbConfig.setKeyCreator(accessTimeKeyCreator);
try {
@@ -505,7 +505,7 @@
// throw new DatabaseException("Needs repopulation");
// }
} catch (DatabaseException e) {
- WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000+chkDB.count()*100)));
+ WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000+chkDB.count()*100)));
// Of course it's not a solution but a quick fix
// Integer.MAX_VALUE seems to trigger an overflow or whatever ...
// Either we find out what the maximum value is and we do a static method somewhere ensuring
@@ -545,7 +545,7 @@
blockNoDbConfig.setAllowPopulate(false);
blockNoDbConfig.setTransactional(true);
- BlockNumberKeyCreator bnkc =
+ BlockNumberKeyCreator bnkc =
new BlockNumberKeyCreator(storeBlockTupleBinding);
blockNoDbConfig.setKeyCreator(bnkc);
SecondaryDatabase blockNums = null;
@@ -562,7 +562,7 @@
// throw new DatabaseException("Needs repopulation");
// }
} catch (DatabaseException e) {
- WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000+chkDB.count()*100)));
+ WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000+chkDB.count()*100)));
// Of course it's not a solution but a quick fix
// Integer.MAX_VALUE seems to trigger an overflow or whatever ...
// Either we find out what the maximum value is and we do a static method somewhere ensuring
@@ -674,7 +674,7 @@
DatabaseEntry found = new DatabaseEntry();
LongBinding.longToEntry(i, blockNumEntry);
- OperationStatus success =
+ OperationStatus success =
chkDB_blockNum.get(null, blockNumEntry, found, LockMode.DEFAULT);
if(success.equals(OperationStatus.NOTFOUND)) {
@@ -709,28 +709,28 @@
private boolean shrinking = false;
/**
- * Do an offline shrink, if necessary. Will not return until completed.
- * @param dontCheckForHoles If true, don't check for holes.
- * @throws DatabaseException
- * @throws IOException
- */
+ * Do an offline shrink, if necessary. Will not return until completed.
+ * @param dontCheckForHoles If true, don't check for holes.
+ * @throws DatabaseException
+ * @throws IOException
+ */
private void maybeOfflineShrink(boolean dontCheckForHoles) throws DatabaseException, IOException {
if(chkBlocksInStore <= maxChkBlocks) return;
maybeSlowShrink(dontCheckForHoles, true);
}
/**
- * Do an online shrink, if necessary. Non-blocking i.e. it will do the shrink on another thread.
- * @param forceBigOnlineShrinks If true, force the node to shrink the store immediately even if
- * it is a major (more than 10%) shrink. Normally this is not allowed because online shrinks do not
- * preserve the most recently used data; the best thing to do is to restart the node and let it do
- * an offline shrink.
- * @throws DatabaseException If a database error occurs.
- * @throws IOException If an I/O error occurs.
- * @return True if the database will be shrunk in the background (or the database is already small
- * enough), false if it is not possible to shrink it because a large shrink was requested and we
- * don't want to do a large online shrink.
- */
+ * Do an online shrink, if necessary. Non-blocking i.e. it will do the shrink on another thread.
+ * @param forceBigOnlineShrinks If true, force the node to shrink the store immediately even if
+ * it is a major (more than 10%) shrink. Normally this is not allowed because online shrinks do not
+ * preserve the most recently used data; the best thing to do is to restart the node and let it do
+ * an offline shrink.
+ * @throws DatabaseException If a database error occurs.
+ * @throws IOException If an I/O error occurs.
+ * @return True if the database will be shrunk in the background (or the database is already small
+ * enough), false if it is not possible to shrink it because a large shrink was requested and we
+ * don't want to do a large online shrink.
+ */
private boolean maybeOnlineShrink(boolean forceBigOnlineShrinks) throws DatabaseException, IOException {
synchronized(this) {
if(chkBlocksInStore <= maxChkBlocks) return true;
@@ -764,24 +764,24 @@
Vector unwantedMove = new Vector(); // content is not wanted, but is in the part of the store we will keep
Vector alreadyDropped = new Vector(); // any blocks past the end which have already been truncated, but which there are still database blocks pointing to
- Cursor c = null;
- Transaction t = null;
+ Cursor c = null;
+ Transaction t = null;
- long newSize = maxChkBlocks;
- if(chkBlocksInStore < maxChkBlocks) return;
-
- System.err.println("Shrinking from "+chkBlocksInStore+" to "+maxChkBlocks+" (from db "+chkDB.count()+" from file "+countCHKBlocksFromFile()+ ')');
-
- if(!dontCheckForHoles)
- checkForHoles(maxChkBlocks, true);
-
- WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000 + chkBlocksInStore * 100))); // 10 per second
-
- long realSize = countCHKBlocksFromFile();
-
- long highestBlock = 0;
-
- try {
+ long newSize = maxChkBlocks;
+ if(chkBlocksInStore < maxChkBlocks) return;
+
+ System.err.println("Shrinking from "+chkBlocksInStore+" to "+maxChkBlocks+" (from db "+chkDB.count()+" from file "+countCHKBlocksFromFile()+ ')');
+
+ if(!dontCheckForHoles)
+ checkForHoles(maxChkBlocks, true);
+
+ WrapperManager.signalStarting((int)(Math.min(Integer.MAX_VALUE, 5*60*1000 + chkBlocksInStore * 100))); // 10 per second
+
+ long realSize = countCHKBlocksFromFile();
+
+ long highestBlock = 0;
+
+ try {
c = chkDB_accessTime.openCursor(null,null);
DatabaseEntry keyDBE = new DatabaseEntry();
@@ -799,7 +799,7 @@
//Logger.minor(this, "Found first key");
int x = 0;
while(true) {
- StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
+ StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
long block = storeBlock.offset;
if(block > highestBlock) highestBlock = block;
if(storeBlock.offset > Integer.MAX_VALUE) {
@@ -855,148 +855,148 @@
}
}
- } finally {
- if(c != null)
- c.close();
- }
-
- Integer[] wantedKeepNums = (Integer[]) wantedKeep.toArray(new Integer[wantedKeep.size()]);
- Integer[] unwantedIgnoreNums = (Integer[]) unwantedIgnore.toArray(new Integer[unwantedIgnore.size()]);
- Integer[] wantedMoveNums = (Integer[]) wantedMove.toArray(new Integer[wantedMove.size()]);
- Integer[] unwantedMoveNums = (Integer[]) unwantedMove.toArray(new Integer[unwantedMove.size()]);
- long[] freeEarlySlots = freeBlocks.toArray();
- Arrays.sort(wantedKeepNums);
- Arrays.sort(unwantedIgnoreNums);
- Arrays.sort(wantedMoveNums);
- Arrays.sort(unwantedMoveNums);
-
- for(int i=0;i<newSize;i++) {
- Integer ii = new Integer(i);
- if(Arrays.binarySearch(wantedKeepNums, ii) >= 0) continue;
- if(Arrays.binarySearch(unwantedIgnoreNums, ii) >= 0) continue;
- if(Arrays.binarySearch(wantedMoveNums, ii) >= 0) continue;
- if(Arrays.binarySearch(unwantedMoveNums, ii) >= 0) continue;
- unwantedMove.add(ii);
- }
- unwantedMoveNums = (Integer[]) unwantedMove.toArray(new Integer[unwantedMove.size()]);
-
- System.err.println("Keys to keep where they are: "+wantedKeepNums.length);
- System.err.println("Keys which will be wiped anyway: "+unwantedIgnoreNums.length);
- System.err.println("Keys to move: "+wantedMoveNums.length);
- System.err.println("Keys to be moved over: "+unwantedMoveNums.length);
- System.err.println("Free slots to be moved over: "+freeEarlySlots.length);
-
- // Now move all the wantedMove blocks onto the corresponding unwantedMove's.
-
- WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE, (5*60*1000 + wantedMoveNums.length*1000L + alreadyDropped.size() * 100L))); // 1 per second
-
- byte[] buf = new byte[headerBlockSize + dataBlockSize];
- t = null;
- try {
- t = environment.beginTransaction(null,null);
- if(alreadyDropped.size() > 0) {
- System.err.println("Deleting "+alreadyDropped.size()+" blocks beyond the length of the file");
- for(int i=0;i<alreadyDropped.size();i++) {
- int unwantedBlock = ((Integer) alreadyDropped.get(i)).intValue();
- DatabaseEntry unwantedBlockEntry = new DatabaseEntry();
- LongBinding.longToEntry(unwantedBlock, unwantedBlockEntry);
- chkDB_blockNum.delete(t, unwantedBlockEntry);
- if(i % 1024 == 0) {
- t.commit();
- t = environment.beginTransaction(null,null);
- }
- }
- if(alreadyDropped.size() % 1024 != 0) {
- t.commit();
- t = environment.beginTransaction(null,null);
- }
- }
- for(int i=0;i<wantedMoveNums.length;i++) {
- Integer wantedBlock = wantedMoveNums[i];
-
- Integer unwantedBlock;
-
- // Can we move over an empty slot?
- if(i < freeEarlySlots.length) {
- // Don't need to delete old block
- unwantedBlock = new Integer((int) freeEarlySlots[i]); // will fit in an int
- } else if(unwantedMoveNums.length + freeEarlySlots.length > i) {
- unwantedBlock = unwantedMoveNums[i-freeEarlySlots.length];
- // Delete unwantedBlock from the store
- DatabaseEntry unwantedBlockEntry = new DatabaseEntry();
- LongBinding.longToEntry(unwantedBlock.longValue(), unwantedBlockEntry);
- // Delete the old block from the database.
- chkDB_blockNum.delete(t, unwantedBlockEntry);
- } else {
- System.err.println("Keys to move but no keys to move over! Moved "+i);
- t.commit();
- t = null;
- return;
- }
- // Move old data to new location
-
- DatabaseEntry wantedBlockEntry = new DatabaseEntry();
- LongBinding.longToEntry(wantedBlock.longValue(), wantedBlockEntry);
- long seekTo = wantedBlock.longValue() * (headerBlockSize + dataBlockSize);
- try {
- chkStore.seek(seekTo);
- chkStore.readFully(buf);
- } catch (EOFException e) {
- System.err.println("Was reading "+wantedBlock+" to write to "+unwantedBlock);
- System.err.println(e);
- e.printStackTrace();
- throw e;
- }
- seekTo = unwantedBlock.longValue() * (headerBlockSize + dataBlockSize);
- chkStore.seek(seekTo);
- chkStore.write(buf);
-
- // Update the database w.r.t. the old block.
-
- DatabaseEntry routingKeyDBE = new DatabaseEntry();
- DatabaseEntry blockDBE = new DatabaseEntry();
- chkDB_blockNum.get(t, wantedBlockEntry, routingKeyDBE, blockDBE, LockMode.RMW);
- StoreBlock block = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
- block.offset = unwantedBlock.longValue();
- storeBlockTupleBinding.objectToEntry(block, blockDBE);
- chkDB.put(t, routingKeyDBE, blockDBE);
-
- // Think about committing the transaction.
-
- if((i+1) % 2048 == 0) {
- t.commit();
- t = environment.beginTransaction(null,null);
- System.out.println("Moving blocks: "+(i*100/wantedMove.size())+ "% ( "+i+ '/' +wantedMove.size()+ ')');
- }
- //System.err.println("Moved "+wantedBlock+" to "+unwantedBlock);
- }
- System.out.println("Moved all "+wantedMove.size()+" blocks");
- if(t != null) {
- t.commit();
- t = null;
- }
- } finally {
- if(t != null)
- t.abort();
- t = null;
- }
- System.out.println("Completing shrink"); // FIXME remove
-
- int totalUnwantedBlocks = unwantedMoveNums.length+freeEarlySlots.length;
- WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE, 5*60*1000 + (totalUnwantedBlocks-wantedMoveNums.length) * 100));
- // If there are any slots left over, they must be free.
- freeBlocks.clear();
+ } finally {
+ if(c != null)
+ c.close();
+ }
+
+ Integer[] wantedKeepNums = (Integer[]) wantedKeep.toArray(new Integer[wantedKeep.size()]);
+ Integer[] unwantedIgnoreNums = (Integer[]) unwantedIgnore.toArray(new Integer[unwantedIgnore.size()]);
+ Integer[] wantedMoveNums = (Integer[]) wantedMove.toArray(new Integer[wantedMove.size()]);
+ Integer[] unwantedMoveNums = (Integer[]) unwantedMove.toArray(new Integer[unwantedMove.size()]);
+ long[] freeEarlySlots = freeBlocks.toArray();
+ Arrays.sort(wantedKeepNums);
+ Arrays.sort(unwantedIgnoreNums);
+ Arrays.sort(wantedMoveNums);
+ Arrays.sort(unwantedMoveNums);
+
+ for(int i=0;i<newSize;i++) {
+ Integer ii = new Integer(i);
+ if(Arrays.binarySearch(wantedKeepNums, ii) >= 0) continue;
+ if(Arrays.binarySearch(unwantedIgnoreNums, ii) >= 0) continue;
+ if(Arrays.binarySearch(wantedMoveNums, ii) >= 0) continue;
+ if(Arrays.binarySearch(unwantedMoveNums, ii) >= 0) continue;
+ unwantedMove.add(ii);
+ }
+ unwantedMoveNums = (Integer[]) unwantedMove.toArray(new Integer[unwantedMove.size()]);
+
+ System.err.println("Keys to keep where they are: "+wantedKeepNums.length);
+ System.err.println("Keys which will be wiped anyway: "+unwantedIgnoreNums.length);
+ System.err.println("Keys to move: "+wantedMoveNums.length);
+ System.err.println("Keys to be moved over: "+unwantedMoveNums.length);
+ System.err.println("Free slots to be moved over: "+freeEarlySlots.length);
+
+ // Now move all the wantedMove blocks onto the corresponding unwantedMove's.
+
+ WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE, (5*60*1000 + wantedMoveNums.length*1000L + alreadyDropped.size() * 100L))); // 1 per second
+
+ byte[] buf = new byte[headerBlockSize + dataBlockSize];
+ t = null;
+ try {
+ t = environment.beginTransaction(null,null);
+ if(alreadyDropped.size() > 0) {
+ System.err.println("Deleting "+alreadyDropped.size()+" blocks beyond the length of the file");
+ for(int i=0;i<alreadyDropped.size();i++) {
+ int unwantedBlock = ((Integer) alreadyDropped.get(i)).intValue();
+ DatabaseEntry unwantedBlockEntry = new DatabaseEntry();
+ LongBinding.longToEntry(unwantedBlock, unwantedBlockEntry);
+ chkDB_blockNum.delete(t, unwantedBlockEntry);
+ if(i % 1024 == 0) {
+ t.commit();
+ t = environment.beginTransaction(null,null);
+ }
+ }
+ if(alreadyDropped.size() % 1024 != 0) {
+ t.commit();
+ t = environment.beginTransaction(null,null);
+ }
+ }
+ for(int i=0;i<wantedMoveNums.length;i++) {
+ Integer wantedBlock = wantedMoveNums[i];
+
+ Integer unwantedBlock;
+
+ // Can we move over an empty slot?
+ if(i < freeEarlySlots.length) {
+ // Don't need to delete old block
+ unwantedBlock = new Integer((int) freeEarlySlots[i]); // will fit in an int
+ } else if(unwantedMoveNums.length + freeEarlySlots.length > i) {
+ unwantedBlock = unwantedMoveNums[i-freeEarlySlots.length];
+ // Delete unwantedBlock from the store
+ DatabaseEntry unwantedBlockEntry = new DatabaseEntry();
+ LongBinding.longToEntry(unwantedBlock.longValue(), unwantedBlockEntry);
+ // Delete the old block from the database.
+ chkDB_blockNum.delete(t, unwantedBlockEntry);
+ } else {
+ System.err.println("Keys to move but no keys to move over! Moved "+i);
+ t.commit();
+ t = null;
+ return;
+ }
+ // Move old data to new location
+
+ DatabaseEntry wantedBlockEntry = new DatabaseEntry();
+ LongBinding.longToEntry(wantedBlock.longValue(), wantedBlockEntry);
+ long seekTo = wantedBlock.longValue() * (headerBlockSize + dataBlockSize);
+ try {
+ chkStore.seek(seekTo);
+ chkStore.readFully(buf);
+ } catch (EOFException e) {
+ System.err.println("Was reading "+wantedBlock+" to write to "+unwantedBlock);
+ System.err.println(e);
+ e.printStackTrace();
+ throw e;
+ }
+ seekTo = unwantedBlock.longValue() * (headerBlockSize + dataBlockSize);
+ chkStore.seek(seekTo);
+ chkStore.write(buf);
+
+ // Update the database w.r.t. the old block.
+
+ DatabaseEntry routingKeyDBE = new DatabaseEntry();
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ chkDB_blockNum.get(t, wantedBlockEntry, routingKeyDBE, blockDBE, LockMode.RMW);
+ StoreBlock block = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
+ block.offset = unwantedBlock.longValue();
+ storeBlockTupleBinding.objectToEntry(block, blockDBE);
+ chkDB.put(t, routingKeyDBE, blockDBE);
+
+ // Think about committing the transaction.
+
+ if((i+1) % 2048 == 0) {
+ t.commit();
+ t = environment.beginTransaction(null,null);
+ System.out.println("Moving blocks: "+(i*100/wantedMove.size())+ "% ( "+i+ '/' +wantedMove.size()+ ')');
+ }
+ //System.err.println("Moved "+wantedBlock+" to "+unwantedBlock);
+ }
+ System.out.println("Moved all "+wantedMove.size()+" blocks");
+ if(t != null) {
+ t.commit();
+ t = null;
+ }
+ } finally {
+ if(t != null)
+ t.abort();
+ t = null;
+ }
+ System.out.println("Completing shrink"); // FIXME remove
+
+ int totalUnwantedBlocks = unwantedMoveNums.length+freeEarlySlots.length;
+ WrapperManager.signalStarting((int)Math.min(Integer.MAX_VALUE, 5*60*1000 + (totalUnwantedBlocks-wantedMoveNums.length) * 100));
+ // If there are any slots left over, they must be free.
+ freeBlocks.clear();
t = environment.beginTransaction(null,null);
- for(int i=wantedMoveNums.length;i<totalUnwantedBlocks;i++) {
- long blockNo;
- String reason;
- if(i < freeEarlySlots.length) {
- blockNo = freeEarlySlots[i];
- reason = "early slot "+i;
- } else {
- blockNo = unwantedMoveNums[i-freeEarlySlots.length].longValue();
- reason = "unwanted "+(i-freeEarlySlots.length);
- }
+ for(int i=wantedMoveNums.length;i<totalUnwantedBlocks;i++) {
+ long blockNo;
+ String reason;
+ if(i < freeEarlySlots.length) {
+ blockNo = freeEarlySlots[i];
+ reason = "early slot "+i;
+ } else {
+ blockNo = unwantedMoveNums[i-freeEarlySlots.length].longValue();
+ reason = "unwanted "+(i-freeEarlySlots.length);
+ }
DatabaseEntry unwantedBlockEntry = new DatabaseEntry();
LongBinding.longToEntry(blockNo, unwantedBlockEntry);
chkDB_blockNum.delete(t, unwantedBlockEntry);
@@ -1009,26 +1009,26 @@
t = environment.beginTransaction(null,null);
}
addFreeBlock(blockNo, true, reason);
- }
- if(t != null) t.commit();
+ }
+ if(t != null) t.commit();
t = null;
-
- System.out.println("Finishing shrink"); // FIXME remove
-
- chkStore.setLength(newSize * (dataBlockSize + headerBlockSize));
-
- synchronized(this) {
- chkBlocksInStore = newSize;
- }
- System.err.println("Shrunk store, now have "+chkBlocksInStore+" of "+maxChkBlocks);
+
+ System.out.println("Finishing shrink"); // FIXME remove
+
+ chkStore.setLength(newSize * (dataBlockSize + headerBlockSize));
+
+ synchronized(this) {
+ chkBlocksInStore = newSize;
+ }
+ System.err.println("Shrunk store, now have "+chkBlocksInStore+" of "+maxChkBlocks);
}
/**
- * Shrink the store, on the fly/quickly.
- * @param offline If false, keep going until the store has shrunk enough.
- * @throws DatabaseException
- * @throws IOException
- */
+ * Shrink the store, on the fly/quickly.
+ * @param offline If false, keep going until the store has shrunk enough.
+ * @throws DatabaseException
+ * @throws IOException
+ */
private void maybeQuickShrink(boolean offline) throws DatabaseException, IOException {
// long's are not atomic.
long maxBlocks;
@@ -1045,15 +1045,15 @@
}
/**
- * @param curBlocks The current number of blocks in the file. (From the file length).
- * @param maxBlocks The target number of blocks in the file. (The file will be truncated to this length in blocks).
- * @param offline If true, innerQuickShrink will run once. If false, after the first run, if
- * the store is still over its required size, it will shrink it again, and so on until the store
- * is within its required size.
- * If false, innerQuickShrink will repeat itself until it deletes no more blocks, This is to handle
- * @throws DatabaseException If a database error occurs.
- * @throws IOException If an I/O error occurs.
- */
+ * @param curBlocks The current number of blocks in the file. (From the file length).
+ * @param maxBlocks The target number of blocks in the file. (The file will be truncated to this length in blocks).
+ * @param offline If true, innerQuickShrink will run once. If false, after the first run, if
+ * the store is still over its required size, it will shrink it again, and so on until the store
+ * is within its required size.
+ * If false, innerQuickShrink will repeat itself until it deletes no more blocks, This is to handle
+ * @throws DatabaseException If a database error occurs.
+ * @throws IOException If an I/O error occurs.
+ */
private void innerQuickShrink(long curBlocks, long maxBlocks, boolean offline) throws DatabaseException, IOException {
long oldCurBlocks = curBlocks;
try {
@@ -1079,14 +1079,14 @@
DatabaseEntry blockNumEntry = new DatabaseEntry();
LongBinding.longToEntry(i, blockNumEntry);
- OperationStatus result =
+ OperationStatus result =
chkDB_blockNum.delete(t, blockNumEntry);
if(result.equals(OperationStatus.SUCCESS))
deleted++;
if((curBlocks-i) % 2048 == 0) {
t.commit();
- t = null;
+ t = null;
}
freeBlocks.remove(i);
@@ -1130,12 +1130,12 @@
public static final short TYPE_SSK = 2;
/**
- * Recreate the index from the data file. Call this when the index has been corrupted.
- * @param the directory where the store is located
- * @throws DatabaseException If the store cannot be opened because of a database problem.
- * @throws IOException If the store cannot be opened because of a filesystem problem.
- * @throws FileNotFoundException if the dir does not exist and could not be created
- */
+ * Recreate the index from the data file. Call this when the index has been corrupted.
+ * @param the directory where the store is located
+ * @throws DatabaseException If the store cannot be opened because of a database problem.
+ * @throws IOException If the store cannot be opened because of a filesystem problem.
+ * @throws FileNotFoundException if the dir does not exist and could not be created
+ */
public BerkeleyDBFreenetStore(Environment env, String prefix, File storeFile, File fixSecondaryFile, long maxChkBlocks, int blockSize, int headerSize, short type, boolean noCheck, SemiOrderedShutdownHook storeShutdownHook) throws DatabaseException, IOException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
this.dataBlockSize = blockSize;
@@ -1169,7 +1169,7 @@
secDbConfig.setTransactional(true);
secDbConfig.setAllowPopulate(true);
storeBlockTupleBinding = new StoreBlockTupleBinding();
- AccessTimeKeyCreator accessTimeKeyCreator =
+ AccessTimeKeyCreator accessTimeKeyCreator =
new AccessTimeKeyCreator(storeBlockTupleBinding);
secDbConfig.setKeyCreator(accessTimeKeyCreator);
chkDB_accessTime = environment.openSecondaryDatabase
@@ -1182,7 +1182,7 @@
blockNoDbConfig.setAllowPopulate(true);
blockNoDbConfig.setTransactional(true);
- BlockNumberKeyCreator bnkc =
+ BlockNumberKeyCreator bnkc =
new BlockNumberKeyCreator(storeBlockTupleBinding);
blockNoDbConfig.setKeyCreator(bnkc);
System.err.println("Creating block db index");
@@ -1314,388 +1314,388 @@
}
/**
- * Retrieve a block.
- * @param dontPromote If true, don't promote data if fetched.
- * @return null if there is no such block stored, otherwise the block.
- */
- public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException {
- synchronized(this) {
- if(closed)
- return null;
- }
-
- byte[] routingkey = chk.getRoutingKey();
- DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
- DatabaseEntry blockDBE = new DatabaseEntry();
- Cursor c = null;
- Transaction t = null;
- try{
- t = environment.beginTransaction(null,null);
- c = chkDB.openCursor(t,null);
+ * Retrieve a block.
+ * @param dontPromote If true, don't promote data if fetched.
+ * @return null if there is no such block stored, otherwise the block.
+ */
+ public CHKBlock fetch(NodeCHK chk, boolean dontPromote) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return null;
+ }
+
+ byte[] routingkey = chk.getRoutingKey();
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ Cursor c = null;
+ Transaction t = null;
+ try{
+ t = environment.beginTransaction(null,null);
+ c = chkDB.openCursor(t,null);
- if(logMINOR) Logger.minor(this, "Fetching "+chk+" dontPromote="+dontPromote);
- /**
- * We will have to write, unless both dontPromote and the key is valid.
- * The lock only applies to this record, so it's not a big problem for our use.
- * What *IS* a big problem is that if we take a LockMode.DEFAULT, and two threads
- * access the same key, they will both take the read lock, and then both try to
- * take the write lock. Neither can relinquish the read in order for the other to
- * take the write, so we're screwed.
- */
- if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
- !=OperationStatus.SUCCESS) {
- c.close();
- c = null;
- t.abort();
- t = null;
- synchronized(this) {
- misses++;
- }
- return null;
- }
+ if(logMINOR) Logger.minor(this, "Fetching "+chk+" dontPromote="+dontPromote);
+ /**
+ * We will have to write, unless both dontPromote and the key is valid.
+ * The lock only applies to this record, so it's not a big problem for our use.
+ * What *IS* a big problem is that if we take a LockMode.DEFAULT, and two threads
+ * access the same key, they will both take the read lock, and then both try to
+ * take the write lock. Neither can relinquish the read in order for the other to
+ * take the write, so we're screwed.
+ */
+ if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
+ !=OperationStatus.SUCCESS) {
+ c.close();
+ c = null;
+ t.abort();
+ t = null;
+ synchronized(this) {
+ misses++;
+ }
+ return null;
+ }
- StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
-
- CHKBlock block = null;
- try{
- byte[] header = new byte[headerBlockSize];
- byte[] data = new byte[dataBlockSize];
- try {
- synchronized(chkStore) {
- long seekTarget = storeBlock.offset*(long)(dataBlockSize+headerBlockSize);
- try {
- chkStore.seek(seekTarget);
- } catch (IOException ioe) {
- if(seekTarget > (2l*1024*1024*1024)) {
- Logger.error(this, "Environment does not support files bigger than 2 GB?");
- System.out.println("Environment does not support files bigger than 2 GB? (exception to follow)");
- }
- Logger.error(this, "Caught IOException on chkStore.seek("+seekTarget+ ')');
- throw ioe;
- }
- chkStore.readFully(header);
- chkStore.readFully(data);
- }
- } catch (EOFException e) {
- Logger.error(this, "No block");
- c.close();
- c = null;
- chkDB.delete(t, routingkeyDBE);
- t.commit();
- t = null;
- addFreeBlock(storeBlock.offset, true, "Data off end of store file");
- return null;
- }
-
-
- block = new CHKBlock(data,header,chk);
-
- if(!dontPromote)
- {
- storeBlock.updateRecentlyUsed();
- DatabaseEntry updateDBE = new DatabaseEntry();
- storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
- c.putCurrent(updateDBE);
- c.close();
- c = null;
- t.commit();
- t = null;
- }else{
- c.close();
- c = null;
- t.abort();
- t = null;
- }
-
- if(logMINOR) {
- Logger.minor(this, "Get key: " + chk);
- Logger.minor(this, "Headers: " + header.length+" bytes, hash " + Fields.hashCode(header));
- Logger.minor(this, "Data: " + data.length + " bytes, hash " + Fields.hashCode(data) + " fetching " + chk);
- }
-
- }catch(CHKVerifyException ex){
- Logger.error(this, "CHKBlock: Does not verify ("+ex+"), setting accessTime to 0 for : "+chk);
- System.err.println("Does not verify (CHK block "+storeBlock.offset+ ')');
- c.close();
- c = null;
- chkDB.delete(t, routingkeyDBE);
- t.commit();
- t = null;
- addFreeBlock(storeBlock.offset, true, "CHK does not verify");
- synchronized(this) {
- misses++;
- }
- return null;
- }
- synchronized(this) {
- hits++;
- }
- return block;
- }catch(Throwable ex) { // FIXME: ugly
- if(c!=null) {
- try{c.close();}catch(DatabaseException ex2){}
- }
- if(t!=null)
- try{t.abort();}catch(DatabaseException ex2){}
- Logger.error(this, "Caught "+ex, ex);
- ex.printStackTrace();
- checkSecondaryDatabaseError(ex);
- IOException e = new IOException(ex.getMessage());
- e.initCause(ex);
- throw e;
- }
-
+ StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
+
+ CHKBlock block = null;
+ try{
+ byte[] header = new byte[headerBlockSize];
+ byte[] data = new byte[dataBlockSize];
+ try {
+ synchronized(chkStore) {
+ long seekTarget = storeBlock.offset*(long)(dataBlockSize+headerBlockSize);
+ try {
+ chkStore.seek(seekTarget);
+ } catch (IOException ioe) {
+ if(seekTarget > (2l*1024*1024*1024)) {
+ Logger.error(this, "Environment does not support files bigger than 2 GB?");
+ System.out.println("Environment does not support files bigger than 2 GB? (exception to follow)");
+ }
+ Logger.error(this, "Caught IOException on chkStore.seek("+seekTarget+ ')');
+ throw ioe;
+ }
+ chkStore.readFully(header);
+ chkStore.readFully(data);
+ }
+ } catch (EOFException e) {
+ Logger.error(this, "No block");
+ c.close();
+ c = null;
+ chkDB.delete(t, routingkeyDBE);
+ t.commit();
+ t = null;
+ addFreeBlock(storeBlock.offset, true, "Data off end of store file");
+ return null;
+ }
+
+
+ block = new CHKBlock(data,header,chk);
+
+ if(!dontPromote)
+ {
+ storeBlock.updateRecentlyUsed();
+ DatabaseEntry updateDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
+ c.putCurrent(updateDBE);
+ c.close();
+ c = null;
+ t.commit();
+ t = null;
+ }else{
+ c.close();
+ c = null;
+ t.abort();
+ t = null;
+ }
+
+ if(logMINOR) {
+ Logger.minor(this, "Get key: " + chk);
+ Logger.minor(this, "Headers: " + header.length+" bytes, hash " + Fields.hashCode(header));
+ Logger.minor(this, "Data: " + data.length + " bytes, hash " + Fields.hashCode(data) + " fetching " + chk);
+ }
+
+ }catch(CHKVerifyException ex){
+ Logger.error(this, "CHKBlock: Does not verify ("+ex+"), setting accessTime to 0 for : "+chk);
+ System.err.println("Does not verify (CHK block "+storeBlock.offset+ ')');
+ c.close();
+ c = null;
+ chkDB.delete(t, routingkeyDBE);
+ t.commit();
+ t = null;
+ addFreeBlock(storeBlock.offset, true, "CHK does not verify");
+ synchronized(this) {
+ misses++;
+ }
+ return null;
+ }
+ synchronized(this) {
+ hits++;
+ }
+ return block;
+ }catch(Throwable ex) { // FIXME: ugly
+ if(c!=null) {
+ try{c.close();}catch(DatabaseException ex2){}
+ }
+ if(t!=null)
+ try{t.abort();}catch(DatabaseException ex2){}
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ checkSecondaryDatabaseError(ex);
+ IOException e = new IOException(ex.getMessage());
+ e.initCause(ex);
+ throw e;
+ }
+
// return null;
- }
+ }
/**
- * Retrieve a block.
- * @param dontPromote If true, don't promote data if fetched.
- * @return null if there is no such block stored, otherwise the block.
- */
- public SSKBlock fetch(NodeSSK chk, boolean dontPromote) throws IOException {
- synchronized(this) {
- if(closed)
- return null;
- }
-
- byte[] routingkey = chk.getRoutingKey();
- DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
- DatabaseEntry blockDBE = new DatabaseEntry();
- Cursor c = null;
- Transaction t = null;
- try{
- t = environment.beginTransaction(null,null);
- c = chkDB.openCursor(t,null);
-
- // Explanation of locking is in fetchPubKey.
- // Basically, locking the whole element saves us all sorts of trouble, especially
- // since we will usually be writing here if only to promote it.
- if(logMINOR) Logger.minor(this, "Fetching "+chk+" dontPromote="+dontPromote);
- if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
- !=OperationStatus.SUCCESS) {
- c.close();
- c = null;
- t.abort();
- t = null;
- synchronized(this) {
- misses++;
- }
- return null;
- }
+ * Retrieve a block.
+ * @param dontPromote If true, don't promote data if fetched.
+ * @return null if there is no such block stored, otherwise the block.
+ */
+ public SSKBlock fetch(NodeSSK chk, boolean dontPromote) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return null;
+ }
+
+ byte[] routingkey = chk.getRoutingKey();
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ Cursor c = null;
+ Transaction t = null;
+ try{
+ t = environment.beginTransaction(null,null);
+ c = chkDB.openCursor(t,null);
+
+ // Explanation of locking is in fetchPubKey.
+ // Basically, locking the whole element saves us all sorts of trouble, especially
+ // since we will usually be writing here if only to promote it.
+ if(logMINOR) Logger.minor(this, "Fetching "+chk+" dontPromote="+dontPromote);
+ if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
+ !=OperationStatus.SUCCESS) {
+ c.close();
+ c = null;
+ t.abort();
+ t = null;
+ synchronized(this) {
+ misses++;
+ }
+ return null;
+ }
- StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
-
- SSKBlock block = null;
- try{
- byte[] header = new byte[headerBlockSize];
- byte[] data = new byte[dataBlockSize];
- try {
- synchronized(chkStore) {
- chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
- chkStore.readFully(header);
- chkStore.readFully(data);
- }
- } catch (EOFException e) {
- Logger.error(this, "No block");
- c.close();
- c = null;
- chkDB.delete(t, routingkeyDBE);
- t.commit();
- t = null;
- addFreeBlock(storeBlock.offset, true, "Data off end of store file");
- return null;
- }
-
-
- block = new SSKBlock(data,header,chk, true);
-
- if(!dontPromote) {
- storeBlock.updateRecentlyUsed();
- DatabaseEntry updateDBE = new DatabaseEntry();
- storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
- c.putCurrent(updateDBE);
- c.close();
- c = null;
- t.commit();
- t = null;
- }else{
- c.close();
- c = null;
- t.abort();
- t = null;
- }
-
- if(logMINOR) {
- Logger.minor(this, "Headers: " + header.length+" bytes, hash " + Fields.hashCode(header));
- Logger.minor(this, "Data: " + data.length + " bytes, hash " + Fields.hashCode(data) + " fetching " + chk);
- }
-
- }catch(SSKVerifyException ex){
- Logger.normal(this, "SSKBlock: Does not verify ("+ex+"), setting accessTime to 0 for : "+chk, ex);
- chkDB.delete(t, routingkeyDBE);
- c.close();
- c = null;
- t.commit();
- t = null;
- addFreeBlock(storeBlock.offset, true, "SSK does not verify");
- synchronized(this) {
- misses++;
- }
- return null;
- }
- synchronized(this) {
- hits++;
- }
- return block;
- }catch(Throwable ex) { // FIXME: ugly
- if(c!=null) {
- try{c.close();}catch(DatabaseException ex2){}
- }
- if(t!=null) {
- try{t.abort();}catch(DatabaseException ex2){}
- }
- checkSecondaryDatabaseError(ex);
- Logger.error(this, "Caught "+ex, ex);
- ex.printStackTrace();
- throw new IOException(ex.getMessage());
- }
-
+ StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
+
+ SSKBlock block = null;
+ try{
+ byte[] header = new byte[headerBlockSize];
+ byte[] data = new byte[dataBlockSize];
+ try {
+ synchronized(chkStore) {
+ chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
+ chkStore.readFully(header);
+ chkStore.readFully(data);
+ }
+ } catch (EOFException e) {
+ Logger.error(this, "No block");
+ c.close();
+ c = null;
+ chkDB.delete(t, routingkeyDBE);
+ t.commit();
+ t = null;
+ addFreeBlock(storeBlock.offset, true, "Data off end of store file");
+ return null;
+ }
+
+
+ block = new SSKBlock(data,header,chk, true);
+
+ if(!dontPromote) {
+ storeBlock.updateRecentlyUsed();
+ DatabaseEntry updateDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
+ c.putCurrent(updateDBE);
+ c.close();
+ c = null;
+ t.commit();
+ t = null;
+ }else{
+ c.close();
+ c = null;
+ t.abort();
+ t = null;
+ }
+
+ if(logMINOR) {
+ Logger.minor(this, "Headers: " + header.length+" bytes, hash " + Fields.hashCode(header));
+ Logger.minor(this, "Data: " + data.length + " bytes, hash " + Fields.hashCode(data) + " fetching " + chk);
+ }
+
+ }catch(SSKVerifyException ex){
+ Logger.normal(this, "SSKBlock: Does not verify ("+ex+"), setting accessTime to 0 for : "+chk, ex);
+ chkDB.delete(t, routingkeyDBE);
+ c.close();
+ c = null;
+ t.commit();
+ t = null;
+ addFreeBlock(storeBlock.offset, true, "SSK does not verify");
+ synchronized(this) {
+ misses++;
+ }
+ return null;
+ }
+ synchronized(this) {
+ hits++;
+ }
+ return block;
+ }catch(Throwable ex) { // FIXME: ugly
+ if(c!=null) {
+ try{c.close();}catch(DatabaseException ex2){}
+ }
+ if(t!=null) {
+ try{t.abort();}catch(DatabaseException ex2){}
+ }
+ checkSecondaryDatabaseError(ex);
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ }
+
// return null;
- }
+ }
- // FIXME do this with interfaces etc.
-
- public DSAPublicKey fetchPubKey(byte[] hash, boolean dontPromote) throws IOException {
- return fetchPubKey(hash, null, dontPromote);
- }
-
+ // FIXME do this with interfaces etc.
+
+ public DSAPublicKey fetchPubKey(byte[] hash, boolean dontPromote) throws IOException {
+ return fetchPubKey(hash, null, dontPromote);
+ }
+
/**
- * Retrieve a block.
- * @param dontPromote If true, don't promote data if fetched.
- * @param replacement If non-null, and the data exists but is corrupt, replace it with this.
- * @return null if there is no such block stored, otherwise the block.
- */
- public DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey replacement, boolean dontPromote) throws IOException {
- synchronized(this) {
- if(closed)
- return null;
- }
-
- DatabaseEntry routingkeyDBE = new DatabaseEntry(hash);
- DatabaseEntry blockDBE = new DatabaseEntry();
- Cursor c = null;
- Transaction t = null;
- try{
- if(logMINOR) Logger.minor(this, "Fetching pubkey: "+HexUtil.bytesToHex(hash));
- t = environment.beginTransaction(null,null);
- c = chkDB.openCursor(t,null);
+ * Retrieve a block.
+ * @param dontPromote If true, don't promote data if fetched.
+ * @param replacement If non-null, and the data exists but is corrupt, replace it with this.
+ * @return null if there is no such block stored, otherwise the block.
+ */
+ public DSAPublicKey fetchPubKey(byte[] hash, DSAPublicKey replacement, boolean dontPromote) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return null;
+ }
+
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(hash);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ Cursor c = null;
+ Transaction t = null;
+ try{
+ if(logMINOR) Logger.minor(this, "Fetching pubkey: "+HexUtil.bytesToHex(hash));
+ t = environment.beginTransaction(null,null);
+ c = chkDB.openCursor(t,null);
- // Lock the records as soon as we find them.
- // RMW - nobody else may access this key until we are finished.
- // This is advantageous as we will usually promote it and we may replace its content;
- // if two readers accessed it at once both might try to. Also IIRC we can deadlock
- // if we don't.
- if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
- !=OperationStatus.SUCCESS) {
- c.close();
- c = null;
- t.abort();
- t = null;
- synchronized(this) {
- misses++;
- }
- return null;
- }
+ // Lock the records as soon as we find them.
+ // RMW - nobody else may access this key until we are finished.
+ // This is advantageous as we will usually promote it and we may replace its content;
+ // if two readers accessed it at once both might try to. Also IIRC we can deadlock
+ // if we don't.
+ if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
+ !=OperationStatus.SUCCESS) {
+ c.close();
+ c = null;
+ t.abort();
+ t = null;
+ synchronized(this) {
+ misses++;
+ }
+ return null;
+ }
- StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
-
- // Promote the key (we can always demote it later; promoting it here means it shouldn't be deallocated
- // FIXME the locking/concurrency in this class is a bit dodgy!
-
- if(!dontPromote) {
- storeBlock.updateRecentlyUsed();
- DatabaseEntry updateDBE = new DatabaseEntry();
- storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
- c.putCurrent(updateDBE);
- }
-
- DSAPublicKey block = null;
-
- byte[] data = new byte[dataBlockSize];
- if(logMINOR) Logger.minor(this, "Reading from store... "+storeBlock.offset+" ("+storeBlock.recentlyUsed+ ')');
- // When will java have pread/pwrite? :(
- try {
- synchronized(chkStore) {
- chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
- chkStore.readFully(data);
- }
+ StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
+
+ // Promote the key (we can always demote it later; promoting it here means it shouldn't be deallocated
+ // FIXME the locking/concurrency in this class is a bit dodgy!
+
+ if(!dontPromote) {
+ storeBlock.updateRecentlyUsed();
+ DatabaseEntry updateDBE = new DatabaseEntry();
+ storeBlockTupleBinding.objectToEntry(storeBlock, updateDBE);
+ c.putCurrent(updateDBE);
+ }
+
+ DSAPublicKey block = null;
+
+ byte[] data = new byte[dataBlockSize];
+ if(logMINOR) Logger.minor(this, "Reading from store... "+storeBlock.offset+" ("+storeBlock.recentlyUsed+ ')');
+ // When will java have pread/pwrite? :(
+ try {
+ synchronized(chkStore) {
+ chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
+ chkStore.readFully(data);
+ }
} catch (EOFException e) {
Logger.error(this, "No block");
- c.close();
- c = null;
- chkDB.delete(t, routingkeyDBE);
- t.commit();
- t = null;
- addFreeBlock(storeBlock.offset, true, "Data off end of store file");
- return null;
+ c.close();
+ c = null;
+ chkDB.delete(t, routingkeyDBE);
+ t.commit();
+ t = null;
+ addFreeBlock(storeBlock.offset, true, "Data off end of store file");
+ return null;
}
- if(logMINOR) Logger.minor(this, "Read");
-
- try {
- block = DSAPublicKey.create(data);
- } catch (CryptFormatException e) {
- Logger.error(this, "Could not read key: "+e, e);
- finishKey(storeBlock, c, t, routingkeyDBE, hash, replacement);
- return replacement;
- }
-
- if(!Arrays.equals(block.asBytesHash(), hash)) {
- finishKey(storeBlock, c, t, routingkeyDBE, hash, replacement);
- return replacement;
- }
-
- // Finished, commit.
- c.close();
- c = null;
- t.commit();
- t = null;
-
- if(logMINOR) {
- Logger.minor(this, "Data: " + data.length + " bytes, hash " + Fields.hashCode(data) + " fetching "+HexUtil.bytesToHex(hash));
- }
-
- synchronized(this) {
- hits++;
- }
- return block;
- } catch(Throwable ex) { // FIXME: ugly
- // Clean up.
- // Reports of wierd NPEs when aborting a transaction, deal with it
- if(c!=null) {
- try {
- c.close();
- } catch(Throwable ex2) {
- Logger.error(this, "Caught "+ex2+" closing in finally block", ex2);
- }
- }
- if(t!=null) {
- try {
- t.abort();
- } catch(Throwable ex2) {
- Logger.error(this, "Caught "+ex2+" aborting in finally block", ex2);
- }
- }
- checkSecondaryDatabaseError(ex);
- Logger.error(this, "Caught "+ex, ex);
- ex.printStackTrace();
- throw new IOException(ex.getMessage());
- }
-
+ if(logMINOR) Logger.minor(this, "Read");
+
+ try {
+ block = DSAPublicKey.create(data);
+ } catch (CryptFormatException e) {
+ Logger.error(this, "Could not read key: "+e, e);
+ finishKey(storeBlock, c, t, routingkeyDBE, hash, replacement);
+ return replacement;
+ }
+
+ if(!Arrays.equals(block.asBytesHash(), hash)) {
+ finishKey(storeBlock, c, t, routingkeyDBE, hash, replacement);
+ return replacement;
+ }
+
+ // Finished, commit.
+ c.close();
+ c = null;
+ t.commit();
+ t = null;
+
+ if(logMINOR) {
+ Logger.minor(this, "Data: " + data.length + " bytes, hash " + Fields.hashCode(data) + " fetching "+HexUtil.bytesToHex(hash));
+ }
+
+ synchronized(this) {
+ hits++;
+ }
+ return block;
+ } catch(Throwable ex) { // FIXME: ugly
+ // Clean up.
+ // Reports of wierd NPEs when aborting a transaction, deal with it
+ if(c!=null) {
+ try {
+ c.close();
+ } catch(Throwable ex2) {
+ Logger.error(this, "Caught "+ex2+" closing in finally block", ex2);
+ }
+ }
+ if(t!=null) {
+ try {
+ t.abort();
+ } catch(Throwable ex2) {
+ Logger.error(this, "Caught "+ex2+" aborting in finally block", ex2);
+ }
+ }
+ checkSecondaryDatabaseError(ex);
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ }
+
// return null;
- }
+ }
- private boolean finishKey(StoreBlock storeBlock, Cursor c, Transaction t, DatabaseEntry routingkeyDBE, byte[] hash, DSAPublicKey replacement) throws IOException, DatabaseException {
+ private boolean finishKey(StoreBlock storeBlock, Cursor c, Transaction t, DatabaseEntry routingkeyDBE, byte[] hash, DSAPublicKey replacement) throws IOException, DatabaseException {
if(replacement != null) {
Logger.normal(this, "Replacing corrupt DSAPublicKey ("+HexUtil.bytesToHex(hash));
synchronized(chkStore) {
@@ -1722,16 +1722,16 @@
}
private void addFreeBlock(long offset, boolean loud, String reason) {
- if(freeBlocks.push(offset)) {
- if(loud) {
- System.err.println("Freed block "+offset+" ("+reason+ ')');
- Logger.normal(this, "Freed block "+offset+" ("+reason+ ')');
- } else {
- if(logMINOR) Logger.minor(this, "Freed block "+offset+" ("+reason+ ')');
- }
- } else {
- if(logMINOR) Logger.minor(this, "Already freed block "+offset+" ("+reason+ ')');
- }
+ if(freeBlocks.push(offset)) {
+ if(loud) {
+ System.err.println("Freed block "+offset+" ("+reason+ ')');
+ Logger.normal(this, "Freed block "+offset+" ("+reason+ ')');
+ } else {
+ if(logMINOR) Logger.minor(this, "Freed block "+offset+" ("+reason+ ')');
+ }
+ } else {
+ if(logMINOR) Logger.minor(this, "Already freed block "+offset+" ("+reason+ ')');
+ }
}
public void put(CHKBlock b) throws IOException {
@@ -1740,9 +1740,9 @@
if(oldBlock != null)
return;
innerPut(b);
- }
-
- public void put(SSKBlock b, boolean overwrite) throws IOException, KeyCollisionException {
+ }
+
+ public void put(SSKBlock b, boolean overwrite) throws IOException, KeyCollisionException {
NodeSSK ssk = (NodeSSK) b.getKey();
SSKBlock oldBlock = fetch(ssk, false);
if(oldBlock != null) {
@@ -1756,150 +1756,150 @@
} else {
innerPut(b);
}
- }
-
- private boolean overwrite(SSKBlock b) throws IOException {
- synchronized(this) {
- if(closed)
- return false;
- }
-
- NodeSSK chk = (NodeSSK) b.getKey();
- byte[] routingkey = chk.getRoutingKey();
- DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
- DatabaseEntry blockDBE = new DatabaseEntry();
- Cursor c = null;
- Transaction t = null;
- try{
- t = environment.beginTransaction(null,null);
- c = chkDB.openCursor(t,null);
+ }
+
+ private boolean overwrite(SSKBlock b) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return false;
+ }
+
+ NodeSSK chk = (NodeSSK) b.getKey();
+ byte[] routingkey = chk.getRoutingKey();
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+ Cursor c = null;
+ Transaction t = null;
+ try{
+ t = environment.beginTransaction(null,null);
+ c = chkDB.openCursor(t,null);
- // Lock the record.
- if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
- !=OperationStatus.SUCCESS) {
- c.close();
- c = null;
- t.abort();
- t = null;
- return false;
- }
+ // Lock the record.
+ if(c.getSearchKey(routingkeyDBE,blockDBE,LockMode.RMW)
+ !=OperationStatus.SUCCESS) {
+ c.close();
+ c = null;
+ t.abort();
+ t = null;
+ return false;
+ }
- StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
-
- byte[] header = b.getRawHeaders();
- byte[] data = b.getRawData();
- synchronized(chkStore) {
- chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
- chkStore.write(header);
- chkStore.write(data);
- }
-
- // Unlock record.
- c.close();
- c = null;
- t.commit();
- t = null;
-
- } catch(Throwable ex) { // FIXME: ugly
- checkSecondaryDatabaseError(ex);
- Logger.error(this, "Caught "+ex, ex);
- ex.printStackTrace();
- throw new IOException(ex.getMessage());
- } finally {
- if(c!=null) {
- try{c.close();}catch(DatabaseException ex2){}
-
- }
- if(t!=null) {
- try{t.abort();}catch(DatabaseException ex2){}
- }
-
- }
-
- return true;
+ StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(blockDBE);
+
+ byte[] header = b.getRawHeaders();
+ byte[] data = b.getRawData();
+ synchronized(chkStore) {
+ chkStore.seek(storeBlock.offset*(long)(dataBlockSize+headerBlockSize));
+ chkStore.write(header);
+ chkStore.write(data);
+ }
+
+ // Unlock record.
+ c.close();
+ c = null;
+ t.commit();
+ t = null;
+
+ } catch(Throwable ex) { // FIXME: ugly
+ checkSecondaryDatabaseError(ex);
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ throw new IOException(ex.getMessage());
+ } finally {
+ if(c!=null) {
+ try{c.close();}catch(DatabaseException ex2){}
+
+ }
+ if(t!=null) {
+ try{t.abort();}catch(DatabaseException ex2){}
+ }
+
+ }
+
+ return true;
}
/**
- * Store a block.
- */
- private void innerPut(KeyBlock block) throws IOException {
- synchronized(this) {
- if(closed)
- return;
- }
-
- byte[] routingkey = block.getKey().getRoutingKey();
- byte[] data = block.getRawData();
- byte[] header = block.getRawHeaders();
-
- if(data.length!=dataBlockSize) {
- Logger.error(this, "This data is "+data.length+" bytes. Should be "+dataBlockSize);
- return;
- }
- if(header.length!=headerBlockSize) {
- Logger.error(this, "This header is "+data.length+" bytes. Should be "+headerBlockSize);
- return;
- }
-
- Transaction t = null;
-
- try{
- t = environment.beginTransaction(null,null);
- DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
-
- DatabaseEntry blockDBE = new DatabaseEntry();
-
- // Check whether it already exists
-
- if(logMINOR) Logger.minor(this, "Putting key "+block+" - checking whether it exists first");
- OperationStatus result = chkDB.get(t, routingkeyDBE, blockDBE, LockMode.RMW);
-
- if(result == OperationStatus.SUCCESS || result == OperationStatus.KEYEXIST) {
- if(logMINOR) Logger.minor(this, "Key already exists");
- // Key already exists!
- // But is it valid?
- t.abort();
- if(fetchKey(block.getKey(), false) != null) return; // old key was valid, we are not overwriting
- // If we are here, it was corrupt, or it was just deleted, so we can replace it.
- if(logMINOR) Logger.minor(this, "Old key was invalid, adding anyway");
- innerPut(block);
- return;
- } else if(result == OperationStatus.KEYEMPTY) {
- Logger.error(this, "Got KEYEMPTY - record deleted? Shouldn't be possible with record locking...!");
- // Put it in anyway
- } else if(result == OperationStatus.NOTFOUND) {
- // Good
- } else
- throw new IllegalStateException("Unknown operation status: "+result);
-
- writeBlock(header, data, t, routingkeyDBE);
-
- t.commit();
- t = null;
-
- if(logMINOR) {
- Logger.minor(this, "Headers: "+header.length+" bytes, hash "+Fields.hashCode(header));
- Logger.minor(this, "Data: "+data.length+" bytes, hash "+Fields.hashCode(data)+" putting "+block.getKey());
- }
-
- }catch(Throwable ex) { // FIXME: ugly
- if(t!=null){
- try{t.abort();}catch(DatabaseException ex2){};
- }
- checkSecondaryDatabaseError(ex);
- Logger.error(this, "Caught "+ex, ex);
- ex.printStackTrace();
- if(ex instanceof IOException) throw (IOException) ex;
- else throw new IOException(ex.getMessage());
- }
- }
-
- private KeyBlock fetchKey(Key key, boolean b) throws IOException {
- if(key instanceof NodeCHK)
- return fetch((NodeCHK)key, b);
- else
- return fetch((NodeSSK)key, b);
+ * Store a block.
+ */
+ private void innerPut(KeyBlock block) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return;
+ }
+
+ byte[] routingkey = block.getKey().getRoutingKey();
+ byte[] data = block.getRawData();
+ byte[] header = block.getRawHeaders();
+
+ if(data.length!=dataBlockSize) {
+ Logger.error(this, "This data is "+data.length+" bytes. Should be "+dataBlockSize);
+ return;
+ }
+ if(header.length!=headerBlockSize) {
+ Logger.error(this, "This header is "+data.length+" bytes. Should be "+headerBlockSize);
+ return;
+ }
+
+ Transaction t = null;
+
+ try{
+ t = environment.beginTransaction(null,null);
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+
+ DatabaseEntry blockDBE = new DatabaseEntry();
+
+ // Check whether it already exists
+
+ if(logMINOR) Logger.minor(this, "Putting key "+block+" - checking whether it exists first");
+ OperationStatus result = chkDB.get(t, routingkeyDBE, blockDBE, LockMode.RMW);
+
+ if(result == OperationStatus.SUCCESS || result == OperationStatus.KEYEXIST) {
+ if(logMINOR) Logger.minor(this, "Key already exists");
+ // Key already exists!
+ // But is it valid?
+ t.abort();
+ if(fetchKey(block.getKey(), false) != null) return; // old key was valid, we are not overwriting
+ // If we are here, it was corrupt, or it was just deleted, so we can replace it.
+ if(logMINOR) Logger.minor(this, "Old key was invalid, adding anyway");
+ innerPut(block);
+ return;
+ } else if(result == OperationStatus.KEYEMPTY) {
+ Logger.error(this, "Got KEYEMPTY - record deleted? Shouldn't be possible with record locking...!");
+ // Put it in anyway
+ } else if(result == OperationStatus.NOTFOUND) {
+ // Good
+ } else
+ throw new IllegalStateException("Unknown operation status: "+result);
+
+ writeBlock(header, data, t, routingkeyDBE);
+
+ t.commit();
+ t = null;
+
+ if(logMINOR) {
+ Logger.minor(this, "Headers: "+header.length+" bytes, hash "+Fields.hashCode(header));
+ Logger.minor(this, "Data: "+data.length+" bytes, hash "+Fields.hashCode(data)+" putting "+block.getKey());
+ }
+
+ }catch(Throwable ex) { // FIXME: ugly
+ if(t!=null){
+ try{t.abort();}catch(DatabaseException ex2){};
+ }
+ checkSecondaryDatabaseError(ex);
+ Logger.error(this, "Caught "+ex, ex);
+ ex.printStackTrace();
+ if(ex instanceof IOException) throw (IOException) ex;
+ else throw new IOException(ex.getMessage());
+ }
}
+
+ private KeyBlock fetchKey(Key key, boolean b) throws IOException {
+ if(key instanceof NodeCHK)
+ return fetch((NodeCHK)key, b);
+ else
+ return fetch((NodeSSK)key, b);
+ }
private void overwriteLRUBlock(byte[] header, byte[] data, Transaction t, DatabaseEntry routingkeyDBE) throws DatabaseException, IOException {
// Overwrite an other block
@@ -1935,7 +1935,7 @@
DatabaseEntry found = new DatabaseEntry();
LongBinding.longToEntry(blockNum, blockNumEntry);
- OperationStatus success =
+ OperationStatus success =
chkDB_blockNum.get(t, blockNumEntry, found, LockMode.DEFAULT);
if(success == OperationStatus.KEYEXIST || success == OperationStatus.SUCCESS) {
@@ -1970,259 +1970,259 @@
private void checkSecondaryDatabaseError(Throwable ex) {
String msg = ex.getMessage();
- if((ex instanceof DatabaseException) && (msg != null && (msg.indexOf("missing key in the primary database") > -1) ||
- msg.indexOf("the primary record contains a key that is not present in the secondary") > -1)) {
- try {
+ if((ex instanceof DatabaseException) && (msg != null && (msg.indexOf("missing key in the primary database") > -1) ||
+ msg.indexOf("the primary record contains a key that is not present in the secondary") > -1)) {
+ try {
fixSecondaryFile.createNewFile();
} catch (IOException e) {
Logger.error(this, "Corrupt secondary database ("+getName()+") but could not create flag file "+fixSecondaryFile);
System.err.println("Corrupt secondary database ("+getName()+") but could not create flag file "+fixSecondaryFile);
return; // Not sure what else we can do
}
- Logger.error(this, "Corrupt secondary database ("+getName()+"). Should be cleaned up on restart.");
- System.err.println("Corrupt secondary database ("+getName()+"). Should be cleaned up on restart.");
- System.exit(freenet.node.Node.EXIT_DATABASE_REQUIRES_RESTART);
- }
+ Logger.error(this, "Corrupt secondary database ("+getName()+"). Should be cleaned up on restart.");
+ System.err.println("Corrupt secondary database ("+getName()+"). Should be cleaned up on restart.");
+ System.exit(freenet.node.Node.EXIT_DATABASE_REQUIRES_RESTART);
+ }
}
- /**
- * Store a pubkey.
- */
- public void put(byte[] hash, DSAPublicKey key) throws IOException {
- innerPut(hash, key);
- }
+ /**
+ * Store a pubkey.
+ */
+ public void put(byte[] hash, DSAPublicKey key) throws IOException {
+ innerPut(hash, key);
+ }
/**
- * Store a block.
- */
- private void innerPut(byte[] hash, DSAPublicKey key) throws IOException {
- synchronized(this) {
- if(closed)
- return;
- }
-
- byte[] routingkey = hash;
- byte[] data = key.asPaddedBytes();
-
- if(!(Arrays.equals(hash, key.asBytesHash()))) {
- Logger.error(this, "Invalid hash!: " + HexUtil.bytesToHex(hash) + " : " + HexUtil.bytesToHex(key.asBytesHash()));
- }
-
- if(data.length!=dataBlockSize) {
- Logger.error(this, "This data is "+data.length+" bytes. Should be "+dataBlockSize);
- return;
- }
-
- Transaction t = null;
-
- try{
- t = environment.beginTransaction(null,null);
- DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
- DatabaseEntry blockDBE = new DatabaseEntry();
-
- // Check whether it already exists
-
- if(logMINOR) Logger.minor(this, "Putting key: "+HexUtil.bytesToHex(hash)+" : "+key+" - checking whether it exists already...");
- OperationStatus result = chkDB.get(t, routingkeyDBE, blockDBE, LockMode.RMW);
-
- if(result == OperationStatus.SUCCESS || result == OperationStatus.KEYEXIST) {
- // Key already exists!
- // But is it valid?
- if(logMINOR)
- Logger.minor(this, "Putting "+HexUtil.bytesToHex(hash)+" : already exists - aborting transaction");
- t.abort();
- if(logMINOR)
- Logger.minor(this, "Fetching (replacing) key");
- if(fetchPubKey(hash, key, false) != null) {
- if(logMINOR) Logger.minor(this, "Fetch/replace succeeded");
- return; // replaced key
- }
- if(logMINOR) Logger.minor(this, "Fetch failed after key already exists");
- // If we are here, it was corrupt, and it got deleted before it could be replaced.
- innerPut(hash, key);
- return;
- } else if(result == OperationStatus.KEYEMPTY) {
- Logger.error(this, "Got KEYEMPTY - record deleted? Shouldn't be possible with record locking...!");
- // Put it in anyway
- } else if(result == OperationStatus.NOTFOUND) {
- // Good
- } else
- throw new IllegalStateException("Unknown operation status: "+result);
-
- writeBlock(dummy, data, t, routingkeyDBE);
-
- t.commit();
- t = null;
-
- if(logMINOR) {
- Logger.minor(this, "Data: "+data.length+" bytes, hash "+Fields.hashCode(data)+" putting "+HexUtil.bytesToHex(hash)+" : "+key);
- }
-
- } catch(Throwable ex) { // FIXME: ugly
- Logger.error(this, "Caught "+ex, ex);
- System.err.println("Caught: "+ex);
- ex.printStackTrace();
- if(t!=null){
- try{t.abort();}catch(DatabaseException ex2){};
- }
- checkSecondaryDatabaseError(ex);
- if(ex instanceof IOException) throw (IOException) ex;
- else throw new IOException(ex.getMessage());
- }
- }
-
- private void writeBlock(byte[] header, byte[] data, Transaction t, DatabaseEntry routingkeyDBE) throws DatabaseException, IOException {
-
- long blockNum;
-
- while(true) {
- if((blockNum = grabFreeBlock()) >= 0) {
- if(logMINOR)
- Logger.minor(this, "Overwriting free block: "+blockNum);
- if(writeNewBlock(blockNum, header, data, t, routingkeyDBE))
- return;
- } else if(chkBlocksInStore<maxChkBlocks) {
- // Expand the store file
- synchronized(chkBlocksInStoreLock) {
- blockNum = chkBlocksInStore;
- chkBlocksInStore++;
- }
- if(logMINOR)
- Logger.minor(this, "Expanding store and writing block "+blockNum);
- // Just in case
- freeBlocks.remove(blockNum);
- if(writeNewBlock(blockNum, header, data, t, routingkeyDBE))
- return;
- }else{
- if(logMINOR)
- Logger.minor(this, "Overwriting LRU block");
- overwriteLRUBlock(header, data, t, routingkeyDBE);
- return;
- }
-
- }
-
+ * Store a block.
+ */
+ private void innerPut(byte[] hash, DSAPublicKey key) throws IOException {
+ synchronized(this) {
+ if(closed)
+ return;
+ }
+
+ byte[] routingkey = hash;
+ byte[] data = key.asPaddedBytes();
+
+ if(!(Arrays.equals(hash, key.asBytesHash()))) {
+ Logger.error(this, "Invalid hash!: " + HexUtil.bytesToHex(hash) + " : " + HexUtil.bytesToHex(key.asBytesHash()));
+ }
+
+ if(data.length!=dataBlockSize) {
+ Logger.error(this, "This data is "+data.length+" bytes. Should be "+dataBlockSize);
+ return;
+ }
+
+ Transaction t = null;
+
+ try{
+ t = environment.beginTransaction(null,null);
+ DatabaseEntry routingkeyDBE = new DatabaseEntry(routingkey);
+ DatabaseEntry blockDBE = new DatabaseEntry();
+
+ // Check whether it already exists
+
+ if(logMINOR) Logger.minor(this, "Putting key: "+HexUtil.bytesToHex(hash)+" : "+key+" - checking whether it exists already...");
+ OperationStatus result = chkDB.get(t, routingkeyDBE, blockDBE, LockMode.RMW);
+
+ if(result == OperationStatus.SUCCESS || result == OperationStatus.KEYEXIST) {
+ // Key already exists!
+ // But is it valid?
+ if(logMINOR)
+ Logger.minor(this, "Putting "+HexUtil.bytesToHex(hash)+" : already exists - aborting transaction");
+ t.abort();
+ if(logMINOR)
+ Logger.minor(this, "Fetching (replacing) key");
+ if(fetchPubKey(hash, key, false) != null) {
+ if(logMINOR) Logger.minor(this, "Fetch/replace succeeded");
+ return; // replaced key
+ }
+ if(logMINOR) Logger.minor(this, "Fetch failed after key already exists");
+ // If we are here, it was corrupt, and it got deleted before it could be replaced.
+ innerPut(hash, key);
+ return;
+ } else if(result == OperationStatus.KEYEMPTY) {
+ Logger.error(this, "Got KEYEMPTY - record deleted? Shouldn't be possible with record locking...!");
+ // Put it in anyway
+ } else if(result == OperationStatus.NOTFOUND) {
+ // Good
+ } else
+ throw new IllegalStateException("Unknown operation status: "+result);
+
+ writeBlock(dummy, data, t, routingkeyDBE);
+
+ t.commit();
+ t = null;
+
+ if(logMINOR) {
+ Logger.minor(this, "Data: "+data.length+" bytes, hash "+Fields.hashCode(data)+" putting "+HexUtil.bytesToHex(hash)+" : "+key);
+ }
+
+ } catch(Throwable ex) { // FIXME: ugly
+ Logger.error(this, "Caught "+ex, ex);
+ System.err.println("Caught: "+ex);
+ ex.printStackTrace();
+ if(t!=null){
+ try{t.abort();}catch(DatabaseException ex2){};
+ }
+ checkSecondaryDatabaseError(ex);
+ if(ex instanceof IOException) throw (IOException) ex;
+ else throw new IOException(ex.getMessage());
+ }
}
+
+ private void writeBlock(byte[] header, byte[] data, Transaction t, DatabaseEntry routingkeyDBE) throws DatabaseException, IOException {
+
+ long blockNum;
+
+ while(true) {
+ if((blockNum = grabFreeBlock()) >= 0) {
+ if(logMINOR)
+ Logger.minor(this, "Overwriting free block: "+blockNum);
+ if(writeNewBlock(blockNum, header, data, t, routingkeyDBE))
+ return;
+ } else if(chkBlocksInStore<maxChkBlocks) {
+ // Expand the store file
+ synchronized(chkBlocksInStoreLock) {
+ blockNum = chkBlocksInStore;
+ chkBlocksInStore++;
+ }
+ if(logMINOR)
+ Logger.minor(this, "Expanding store and writing block "+blockNum);
+ // Just in case
+ freeBlocks.remove(blockNum);
+ if(writeNewBlock(blockNum, header, data, t, routingkeyDBE))
+ return;
+ }else{
+ if(logMINOR)
+ Logger.minor(this, "Overwriting LRU block");
+ overwriteLRUBlock(header, data, t, routingkeyDBE);
+ return;
+ }
+
+ }
+
+ }
private long grabFreeBlock() {
- while(!freeBlocks.isEmpty()) {
- long blockNum = freeBlocks.removeFirst();
- if(blockNum < maxChkBlocks) return blockNum;
- }
+ while(!freeBlocks.isEmpty()) {
+ long blockNum = freeBlocks.removeFirst();
+ if(blockNum < maxChkBlocks) return blockNum;
+ }
return -1;
}
private class StoreBlock {
- private long recentlyUsed;
- private long offset;
-
- public StoreBlock(final BerkeleyDBFreenetStore bdbfs, long offset) {
- this(offset, bdbfs.getNewRecentlyUsed());
- }
-
- public StoreBlock(long offset,long recentlyUsed) {
- this.offset = offset;
- this.recentlyUsed = recentlyUsed;
- }
-
-
- public long getRecentlyUsed() {
- return recentlyUsed;
- }
-
- public void setRecentlyUsedToZero() {
- recentlyUsed = 0;
- }
-
- public void updateRecentlyUsed() {
- recentlyUsed = getNewRecentlyUsed();
- }
-
- public long getOffset() {
- return offset;
- }
- }
-
- /**
- * Convert StoreBlock's to the format used by the database
- */
- private class StoreBlockTupleBinding extends TupleBinding {
+ private long recentlyUsed;
+ private long offset;
+
+ public StoreBlock(final BerkeleyDBFreenetStore bdbfs, long offset) {
+ this(offset, bdbfs.getNewRecentlyUsed());
+ }
+
+ public StoreBlock(long offset,long recentlyUsed) {
+ this.offset = offset;
+ this.recentlyUsed = recentlyUsed;
+ }
+
+
+ public long getRecentlyUsed() {
+ return recentlyUsed;
+ }
+
+ public void setRecentlyUsedToZero() {
+ recentlyUsed = 0;
+ }
+
+ public void updateRecentlyUsed() {
+ recentlyUsed = getNewRecentlyUsed();
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+ }
+
+ /**
+ * Convert StoreBlock's to the format used by the database
+ */
+ private class StoreBlockTupleBinding extends TupleBinding {
- public void objectToEntry(Object object, TupleOutput to) {
- StoreBlock myData = (StoreBlock)object;
+ public void objectToEntry(Object object, TupleOutput to) {
+ StoreBlock myData = (StoreBlock)object;
- to.writeLong(myData.getOffset());
- to.writeLong(myData.getRecentlyUsed());
- }
+ to.writeLong(myData.getOffset());
+ to.writeLong(myData.getRecentlyUsed());
+ }
- public Object entryToObject(TupleInput ti) {
- if(Logger.shouldLog(Logger.DEBUG, this))
- Logger.debug(this, "Available: "+ti.available());
- long offset = ti.readLong();
- long lastAccessed = ti.readLong();
-
- StoreBlock storeBlock = new StoreBlock(offset,lastAccessed);
- return storeBlock;
- }
- }
-
- /**
- * Used to create the secondary database sorted on accesstime
- */
- private class AccessTimeKeyCreator implements SecondaryKeyCreator {
- private TupleBinding theBinding;
-
- public AccessTimeKeyCreator(TupleBinding theBinding1) {
- theBinding = theBinding1;
- }
-
- public boolean createSecondaryKey(SecondaryDatabase secDb,
- DatabaseEntry keyEntry,
- DatabaseEntry dataEntry,
- DatabaseEntry resultEntry) {
+ public Object entryToObject(TupleInput ti) {
+ if(Logger.shouldLog(Logger.DEBUG, this))
+ Logger.debug(this, "Available: "+ti.available());
+ long offset = ti.readLong();
+ long lastAccessed = ti.readLong();
+
+ StoreBlock storeBlock = new StoreBlock(offset,lastAccessed);
+ return storeBlock;
+ }
+ }
+
+ /**
+ * Used to create the secondary database sorted on accesstime
+ */
+ private class AccessTimeKeyCreator implements SecondaryKeyCreator {
+ private TupleBinding theBinding;
+
+ public AccessTimeKeyCreator(TupleBinding theBinding1) {
+ theBinding = theBinding1;
+ }
+
+ public boolean createSecondaryKey(SecondaryDatabase secDb,
+ DatabaseEntry keyEntry,
+ DatabaseEntry dataEntry,
+ DatabaseEntry resultEntry) {
- StoreBlock storeblock = (StoreBlock) theBinding.entryToObject(dataEntry);
- LongBinding.longToEntry(storeblock.getRecentlyUsed(), resultEntry);
- return true;
- }
- }
+ StoreBlock storeblock = (StoreBlock) theBinding.entryToObject(dataEntry);
+ LongBinding.longToEntry(storeblock.getRecentlyUsed(), resultEntry);
+ return true;
+ }
+ }
- private class BlockNumberKeyCreator implements SecondaryKeyCreator {
- private TupleBinding theBinding;
-
- public BlockNumberKeyCreator(TupleBinding theBinding1) {
- theBinding = theBinding1;
- }
-
- public boolean createSecondaryKey(SecondaryDatabase secDb,
- DatabaseEntry keyEntry,
- DatabaseEntry dataEntry,
- DatabaseEntry resultEntry) {
+ private class BlockNumberKeyCreator implements SecondaryKeyCreator {
+ private TupleBinding theBinding;
+
+ public BlockNumberKeyCreator(TupleBinding theBinding1) {
+ theBinding = theBinding1;
+ }
+
+ public boolean createSecondaryKey(SecondaryDatabase secDb,
+ DatabaseEntry keyEntry,
+ DatabaseEntry dataEntry,
+ DatabaseEntry resultEntry) {
- StoreBlock storeblock = (StoreBlock) theBinding.entryToObject(dataEntry);
- LongBinding.longToEntry(storeblock.offset, resultEntry);
- return true;
- }
-
- }
-
- private class ShutdownHook extends Thread {
- public void run() {
- System.err.println("Closing database due to shutdown.");
- close(true);
- }
- }
-
- private Object closeLock = new Object();
-
- private void close(boolean sleep) {
- try{
+ StoreBlock storeblock = (StoreBlock) theBinding.entryToObject(dataEntry);
+ LongBinding.longToEntry(storeblock.offset, resultEntry);
+ return true;
+ }
+
+ }
+
+ private class ShutdownHook extends Thread {
+ public void run() {
+ System.err.println("Closing database due to shutdown.");
+ close(true);
+ }
+ }
+
+ private Object closeLock = new Object();
+
+ private void close(boolean sleep) {
+ try{
// FIXME: we should be sure all access to the database has stopped
// before we try to close it. Currently we just guess
- // This is nothing too problematic however since the worst thing that should
- // happen is that we miss the last few store()'s and get an exception.
- logMINOR = Logger.shouldLog(Logger.MINOR, this);
- if(logMINOR) Logger.minor(this, "Closing database "+this);
+ // This is nothing too problematic however since the worst thing that should
+ // happen is that we miss the last few store()'s and get an exception.
+ logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ if(logMINOR) Logger.minor(this, "Closing database "+this);
closed=true;
if(reallyClosed) {
Logger.error(this, "Already closed "+this);
@@ -2276,44 +2276,44 @@
// Return anyway
}
}
- }
-
- private long highestBlockNumberInDatabase() throws DatabaseException {
- Cursor c = null;
- try {
- c = chkDB_blockNum.openCursor(null,null);
- DatabaseEntry keyDBE = new DatabaseEntry();
- DatabaseEntry dataDBE = new DatabaseEntry();
- if(c.getLast(keyDBE,dataDBE,null)==OperationStatus.SUCCESS) {
- StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(dataDBE);
- return storeBlock.offset + 1;
- }
- c.close();
- c = null;
- } finally {
- if(c != null) {
- try {
- c.close();
- } catch (DatabaseException e) {
- Logger.error(this, "Caught "+e, e);
- }
- }
- }
+ }
+
+ private long highestBlockNumberInDatabase() throws DatabaseException {
+ Cursor c = null;
+ try {
+ c = chkDB_blockNum.openCursor(null,null);
+ DatabaseEntry keyDBE = new DatabaseEntry();
+ DatabaseEntry dataDBE = new DatabaseEntry();
+ if(c.getLast(keyDBE,dataDBE,null)==OperationStatus.SUCCESS) {
+ StoreBlock storeBlock = (StoreBlock) storeBlockTupleBinding.entryToObject(dataDBE);
+ return storeBlock.offset + 1;
+ }
+ c.close();
+ c = null;
+ } finally {
+ if(c != null) {
+ try {
+ c.close();
+ } catch (DatabaseException e) {
+ Logger.error(this, "Caught "+e, e);
+ }
+ }
+ }
return 0;
- }
-
+ }
+
private long countCHKBlocksFromFile() throws IOException {
int keySize = headerBlockSize + dataBlockSize;
long fileSize = chkStore.length();
return fileSize / keySize;
}
- private long getMaxRecentlyUsed() {
- long maxRecentlyUsed = 0;
-
- Cursor c = null;
- try{
- c = chkDB_accessTime.openCursor(null,null);
+ private long getMaxRecentlyUsed() {
+ long maxRecentlyUsed = 0;
+
+ Cursor c = null;
+ try{
+ c = chkDB_accessTime.openCursor(null,null);
DatabaseEntry keyDBE = new DatabaseEntry();
DatabaseEntry dataDBE = new DatabaseEntry();
if(c.getLast(keyDBE,dataDBE,null)==OperationStatus.SUCCESS) {
@@ -2322,27 +2322,27 @@
}
c.close();
c = null;
- } catch(DatabaseException ex) {
- ex.printStackTrace();
- } finally {
- if(c != null) {
- try {
- c.close();
- } catch (DatabaseException e) {
- Logger.error(this, "Caught "+e, e);
- }
- }
- }
-
- return maxRecentlyUsed;
- }
-
- private long getNewRecentlyUsed() {
- synchronized(lastRecentlyUsedSync) {
- lastRecentlyUsed++;
- return lastRecentlyUsed;
- }
- }
+ } catch(DatabaseException ex) {
+ ex.printStackTrace();
+ } finally {
+ if(c != null) {
+ try {
+ c.close();
+ } catch (DatabaseException e) {
+ Logger.error(this, "Caught "+e, e);
+ }
+ }
+ }
+
+ return maxRecentlyUsed;
+ }
+
+ private long getNewRecentlyUsed() {
+ synchronized(lastRecentlyUsedSync) {
+ lastRecentlyUsed++;
+ return lastRecentlyUsed;
+ }
+ }
public void setMaxKeys(long maxStoreKeys, boolean forceBigShrink) throws DatabaseException, IOException {
synchronized(this) {
@@ -2350,10 +2350,10 @@
}
maybeOnlineShrink(false);
}
-
- public long getMaxKeys() {
- return maxChkBlocks;
- }
+
+ public long getMaxKeys() {
+ return maxChkBlocks;
+ }
public long hits() {
return hits;
More information about the cvs
mailing list