[freenet-cvs] r20886 - branches/saltedhashstore/freenet/src/freenet/store
j16sdiz at freenetproject.org
j16sdiz at freenetproject.org
Tue Jul 1 07:36:21 UTC 2008
Author: j16sdiz
Date: 2008-07-01 07:36:20 +0000 (Tue, 01 Jul 2008)
New Revision: 20886
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
refactor to BatchProcessor interface and fix key count
Modified: branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
--- branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java 2008-07-01 04:07:26 UTC (rev 20885)
+++ branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java 2008-07-01 07:36:20 UTC (rev 20886)
@@ -11,7 +11,6 @@
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.text.DecimalFormat;
-import java.util.AbstractList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
@@ -252,8 +251,10 @@
// Overwrite old offset
Entry entry = new Entry(routingKey, header, data);
- writeEntry(entry, oldOffset); // overwrite, don't increase keyCount
+ writeEntry(entry, oldOffset);
writes.incrementAndGet();
+ if (oldEntry.getGeneration() != generation)
+ keyCount.incrementAndGet();
return;
}
@@ -268,7 +269,7 @@
if (updateBloom)
bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
writeEntry(entry, offset[i]);
- long written = writes.incrementAndGet();
+ writes.incrementAndGet();
keyCount.incrementAndGet();
return;
@@ -280,8 +281,11 @@
Logger.debug(this, "collision, write to i=0, offset=" + offset[0]);
if (updateBloom)
bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
+ oldEntry = readEntry(offset[0], null);
writeEntry(entry, offset[0]);
- long written = writes.incrementAndGet();
+ writes.incrementAndGet();
+ if (oldEntry.getGeneration() != generation)
+ keyCount.incrementAndGet();
} finally {
unlockPlainKey(routingKey, false);
}
@@ -568,6 +572,14 @@
digestedRoutingKey = SaltedHashFreenetStore.this.getDigestedRoutingKey(this.plainRoutingKey);
return digestedRoutingKey;
}
+
+ public byte getGeneration() {
+ return generation;
+ }
+
+ public void setGeneration(byte generation) {
+ this.generation = generation;
+ }
}
/**
@@ -820,7 +832,7 @@
/**
* Write config file
*/
- private void writeConfigFile() throws IOException {
+ private void writeConfigFile() {
configLock.writeLock().lock();
try {
File tempConfig = new File(configFile.getPath() + ".tmp");
@@ -838,6 +850,8 @@
raf.close();
FileUtil.renameTo(tempConfig, configFile);
+ } catch (IOException ioe) {
+ Logger.error(this, "error writing config file for " + name, ioe);
} finally {
configLock.writeLock().unlock();
}
@@ -849,6 +863,11 @@
private static Lock cleanerGlobalLock = new ReentrantLock(); // global across all datastore
private Cleaner cleanerThread;
+ private interface BatchProcessor {
+ // return <code>null</code> to free the entry
+ Entry processs(Entry entry);
+ }
+
private class Cleaner extends Thread {
/**
* How often the clean should run
@@ -898,12 +917,7 @@
} catch (Exception e) { // may throw IOException (even if it is not defined)
Logger.error(this, "Can't force bloom filter", e);
}
- try {
- writeConfigFile();
- } catch (IOException e) {
- Logger.error(this, "Can't write config file", e);
- }
-
+ writeConfigFile();
cleanerLock.notifyAll();
try {
@@ -930,7 +944,7 @@
initOldEntriesFile();
- List<Entry> oldEntryList = new LinkedList<Entry>();
+ final List<Entry> oldEntryList = new LinkedList<Entry>();
// start from end of store, make store shrinking quicker
long startOffset = (_prevStoreSize / RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
@@ -938,8 +952,16 @@
if (shutdown)
return;
- batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, oldEntryList, true);
+ batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+ public Entry processs(Entry entry) {
+ if (entry.getStoreSize() == storeSize) // new size
+ return entry;
+ oldEntryList.add(entry);
+ return null;
+ }
+ });
+
if (storeSize < _prevStoreSize)
setStoreFileSize(Math.max(storeSize, curOffset));
@@ -984,32 +1006,33 @@
Logger.normal(this, "Start rebuilding bloom filter for " + callback);
- bloomFilter.fork();
- List<Entry> buildList = new AbstractList<Entry>() {
- @Override
- public void add(int index, Entry entry) {
- bloomFilter.updateFilter(entry.getDigestedRoutingKey());
- }
- @Override
- public Entry get(int index) {
- return null;
- }
+ configLock.writeLock().lock();
+ try {
+ generation++;
+ bloomFilter.fork();
+ keyCount.set(0);
+ } finally {
+ configLock.writeLock().unlock();
+ }
- @Override
- public int size() {
- return 0;
- }
-
- };
-
for (long curOffset = 0; curOffset < storeSize; curOffset += RESIZE_MEMORY_ENTRIES) {
if (shutdown) {
bloomFilter.discard();
return;
}
- batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, buildList, false);
+ batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+ public Entry processs(Entry entry) {
+ if (entry.getGeneration() != generation) {
+ bloomFilter.updateFilter(entry.getDigestedRoutingKey());
+ keyCount.incrementAndGet();
+ }
+ return entry;
+ }
+ });
+
Logger.normal(this, "Rebuilding bloom filter for " + callback + ": " + curOffset + "/" + storeSize);
+ writeConfigFile();
}
bloomFilter.merge();
@@ -1024,23 +1047,19 @@
}
/**
- * Read a list of items from store. In resizing mode, only old items are read and the
- * original offsets are freed.
+ * Read a list of items from store.
*
* @param offset
* start offset, must be multiple of {@link FILE_SPLIT}
* @param length
* number of items to read, must be multiple of {@link FILE_SPLIT}. If this
* excess store size, read as much as possible.
- * @param items
- * a list of items
- * @param resizing
- * If <code>true</code>, only get old items and free the offset. Otherwise, get
- * all items.
+ * @param processor
+ * batch processor
* @return <code>true</code> if operation complete successfully; <code>false</code>
* otherwise (e.g. can't acquire locks, node shutting down)
*/
- private boolean batchReadEntries(long offset, int length, List<Entry> items, boolean resizing) {
+ private boolean batchReadEntries(long offset, int length, BatchProcessor processor) {
assert offset % FILE_SPLIT == 0;
assert length % FILE_SPLIT == 0;
@@ -1060,6 +1079,7 @@
ByteBuffer buf = ByteBuffer.allocate((int) bufLen);
for (int i = 0; i < FILE_SPLIT; i++) { // for each split file
+ boolean dirty = false;
buf.clear();
try {
while (buf.hasRemaining()) {
@@ -1074,27 +1094,31 @@
}
buf.flip();
- for (int j = 0; buf.remaining() >= entryTotalLength; j++) {
- if (shutdown)
- return false;
+ try {
+ for (int j = 0; buf.limit() >= j * entryTotalLength; j++) {
+ if (shutdown)
+ return false;
- ByteBuffer enBuf = buf.slice();
- buf.position(buf.position() + (int) entryTotalLength);
+ buf.position((int) (j * entryTotalLength));
+ if (buf.remaining() < entryTotalLength) // EOF
+ break;
- enBuf.limit((int) entryTotalLength);
+ ByteBuffer enBuf = buf.slice();
+ enBuf.limit((int) entryTotalLength);
- Entry entry = new Entry(enBuf);
- entry.curOffset = offset + j * FILE_SPLIT + i;
+ Entry entry = new Entry(enBuf);
+ entry.curOffset = offset + j * FILE_SPLIT + i;
- try {
if (entry.isFree())
continue; // not occupied
- if (resizing && entry.storeSize != storeSize)
- continue; // resizing mode, not new item
- items.add(entry);
-
- if (resizing) { // free the offset
+ Entry newEntry = processor.processs(entry);
+ if (newEntry != null) {
+ // write back
+ buf.position((int) (j * entryTotalLength));
+ buf.put(newEntry.toByteBuffer());
+ dirty = true;
+ } else { // free the offset
try {
freeOffset(entry.curOffset);
keyCount.decrementAndGet();
@@ -1103,11 +1127,20 @@
Logger.error(this, "error freeing entry " + entry.curOffset, ioe);
}
}
- } finally {
- // unlock current entry
- unlockEntry(entry.curOffset);
- locked[(int) (entry.curOffset - offset)] = false;
}
+ } finally {
+ // write back.
+ if (dirty) {
+ buf.flip();
+
+ try {
+ while (buf.hasRemaining()) {
+ storeFC[i].write(buf, startFileOffset + buf.position());
+ }
+ } catch (IOException ioe) {
+ Logger.error(this, "unexpected IOException", ioe);
+ }
+ }
}
}
@@ -1401,11 +1434,7 @@
try {
flushAndClose();
flags &= ~FLAG_DIRTY; // clean shutdown
- try {
- writeConfigFile();
- } catch (IOException e) {
- Logger.error(this, "error writing store config", e);
- }
+ writeConfigFile();
} finally {
configLock.writeLock().unlock();
}
More information about the cvs
mailing list