[freenet-cvs] r20886 - branches/saltedhashstore/freenet/src/freenet/store

j16sdiz at freenetproject.org j16sdiz at freenetproject.org
Tue Jul 1 07:36:21 UTC 2008


Author: j16sdiz
Date: 2008-07-01 07:36:20 +0000 (Tue, 01 Jul 2008)
New Revision: 20886

Modified:
   branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
refactor to BatchProcessor interface and fix key count

Modified: branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
--- branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java	2008-07-01 04:07:26 UTC (rev 20885)
+++ branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java	2008-07-01 07:36:20 UTC (rev 20886)
@@ -11,7 +11,6 @@
 import java.nio.channels.FileChannel;
 import java.security.MessageDigest;
 import java.text.DecimalFormat;
-import java.util.AbstractList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -252,8 +251,10 @@
 
 					// Overwrite old offset
 					Entry entry = new Entry(routingKey, header, data);
-					writeEntry(entry, oldOffset); // overwrite, don't increase keyCount
+					writeEntry(entry, oldOffset);
 					writes.incrementAndGet();
+					if (oldEntry.getGeneration() != generation)
+						keyCount.incrementAndGet();
 					return;
 				}
 
@@ -268,7 +269,7 @@
 						if (updateBloom)
 							bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
 						writeEntry(entry, offset[i]);
-						long written = writes.incrementAndGet();
+						writes.incrementAndGet();
 						keyCount.incrementAndGet();
 						
 						return;
@@ -280,8 +281,11 @@
 					Logger.debug(this, "collision, write to i=0, offset=" + offset[0]);
 				if (updateBloom)
 					bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
+				oldEntry = readEntry(offset[0], null);
 				writeEntry(entry, offset[0]);
-				long written = writes.incrementAndGet();
+				writes.incrementAndGet();
+				if (oldEntry.getGeneration() != generation)
+					keyCount.incrementAndGet();
 			} finally {
 				unlockPlainKey(routingKey, false);
 			}
@@ -568,6 +572,14 @@
 				digestedRoutingKey = SaltedHashFreenetStore.this.getDigestedRoutingKey(this.plainRoutingKey);
 			return digestedRoutingKey;
 		}
+
+		public byte getGeneration() {
+			return generation;
+		}
+
+		public void setGeneration(byte generation) {
+			this.generation = generation;
+		}
 	}
 
 	/**
@@ -820,7 +832,7 @@
 	/**
 	 * Write config file
 	 */
-	private void writeConfigFile() throws IOException {
+	private void writeConfigFile() {
 		configLock.writeLock().lock();
 		try {
 			File tempConfig = new File(configFile.getPath() + ".tmp");
@@ -838,6 +850,8 @@
 			raf.close();
 
 			FileUtil.renameTo(tempConfig, configFile);
+		} catch (IOException ioe) {
+			Logger.error(this, "error writing config file for " + name, ioe);
 		} finally {
 			configLock.writeLock().unlock();
 		}
@@ -849,6 +863,11 @@
 	private static Lock cleanerGlobalLock = new ReentrantLock(); // global across all datastore
 	private Cleaner cleanerThread;
 
+	private interface BatchProcessor {
+		// return <code>null</code> to free the entry
+		Entry processs(Entry entry);
+	}
+	
 	private class Cleaner extends Thread {
 		/**
 		 * How often the clean should run
@@ -898,12 +917,7 @@
 					} catch (Exception e) { // may throw IOException (even if it is not defined)
 						Logger.error(this, "Can't force bloom filter", e);
 					}
-					try {
-						writeConfigFile();
-					} catch (IOException e) {
-						Logger.error(this, "Can't write config file", e);
-					}
-
+					writeConfigFile();
 					cleanerLock.notifyAll();
 
 					try {
@@ -930,7 +944,7 @@
 
 			initOldEntriesFile();
 
-			List<Entry> oldEntryList = new LinkedList<Entry>();
+			final List<Entry> oldEntryList = new LinkedList<Entry>();
 
 			// start from end of store, make store shrinking quicker 
 			long startOffset = (_prevStoreSize / RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
@@ -938,8 +952,16 @@
 				if (shutdown)
 					return;
 
-				batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, oldEntryList, true);
+				batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+					public Entry processs(Entry entry) {
+						if (entry.getStoreSize() == storeSize) // new size
+							return entry;
 
+						oldEntryList.add(entry);
+						return null;
+					}
+				});
+
 				if (storeSize < _prevStoreSize)
 					setStoreFileSize(Math.max(storeSize, curOffset));
 
@@ -984,32 +1006,33 @@
 			
 			Logger.normal(this, "Start rebuilding bloom filter for " + callback);
 			
-			bloomFilter.fork();
-			List<Entry> buildList = new AbstractList<Entry>() {
-				@Override
-				public void add(int index, Entry entry) {
-					bloomFilter.updateFilter(entry.getDigestedRoutingKey());
-				}
 
-				@Override
-				public Entry get(int index) {
-					return null;
-				}
+			configLock.writeLock().lock();
+			try {
+				generation++;
+				bloomFilter.fork();
+				keyCount.set(0);
+			} finally {
+				configLock.writeLock().unlock();
+			}
 
-				@Override
-				public int size() {
-					return 0;
-				}
-
-			};
-
 			for (long curOffset = 0; curOffset < storeSize; curOffset += RESIZE_MEMORY_ENTRIES) {
 				if (shutdown) {
 					bloomFilter.discard();
 					return;
 				}
-				batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, buildList, false);
+				batchReadEntries(curOffset, RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+					public Entry processs(Entry entry) {
+						if (entry.getGeneration() != generation) {
+							bloomFilter.updateFilter(entry.getDigestedRoutingKey());
+							keyCount.incrementAndGet();
+						}
+						return entry;
+					}
+				});
+				
 				Logger.normal(this, "Rebuilding bloom filter for " + callback + ": " + curOffset + "/" + storeSize);
+				writeConfigFile();
 			}
 
 			bloomFilter.merge();
@@ -1024,23 +1047,19 @@
 		}
 
 		/**
-		 * Read a list of items from store. In resizing mode, only old items are read and the
-		 * original offsets are freed.
+		 * Read a list of items from store.
 		 * 
 		 * @param offset
 		 *            start offset, must be multiple of {@link FILE_SPLIT}
 		 * @param length
 		 *            number of items to read, must be multiple of {@link FILE_SPLIT}. If this
 		 *            excess store size, read as much as possible.
-		 * @param items
-		 *            a list of items
-		 * @param resizing
-		 *            If <code>true</code>, only get old items and free the offset. Otherwise, get
-		 *            all items.
+		 * @param processor
+		 *            batch processor
 		 * @return <code>true</code> if operation complete successfully; <code>false</code>
 		 *         otherwise (e.g. can't acquire locks, node shutting down)
 		 */
-		private boolean batchReadEntries(long offset, int length, List<Entry> items, boolean resizing) {
+		private boolean batchReadEntries(long offset, int length, BatchProcessor processor) {
 			assert offset % FILE_SPLIT == 0;
 			assert length % FILE_SPLIT == 0;
 
@@ -1060,6 +1079,7 @@
 
 				ByteBuffer buf = ByteBuffer.allocate((int) bufLen);
 				for (int i = 0; i < FILE_SPLIT; i++) { // for each split file
+					boolean dirty = false;
 					buf.clear();
 					try {
 						while (buf.hasRemaining()) {
@@ -1074,27 +1094,31 @@
 					}
 					buf.flip();
 
-					for (int j = 0; buf.remaining() >= entryTotalLength; j++) {
-						if (shutdown)
-							return false;
+					try { 
+						for (int j = 0; buf.limit() >= j * entryTotalLength; j++) {
+							if (shutdown)
+								return false;
 
-						ByteBuffer enBuf = buf.slice();
-						buf.position(buf.position() + (int) entryTotalLength);
+							buf.position((int) (j * entryTotalLength));
+							if (buf.remaining() < entryTotalLength) // EOF
+								break;
 
-						enBuf.limit((int) entryTotalLength);
+							ByteBuffer enBuf = buf.slice();
+							enBuf.limit((int) entryTotalLength);
 
-						Entry entry = new Entry(enBuf);
-						entry.curOffset = offset + j * FILE_SPLIT + i;
+							Entry entry = new Entry(enBuf);
+							entry.curOffset = offset + j * FILE_SPLIT + i;
 
-						try {
 							if (entry.isFree())
 								continue; // not occupied
-							if (resizing && entry.storeSize != storeSize)
-								continue; // resizing mode, not new item
 
-							items.add(entry);
-
-							if (resizing) { // free the offset
+							Entry newEntry = processor.processs(entry);
+							if (newEntry != null) {
+								// write back
+								buf.position((int) (j * entryTotalLength));
+								buf.put(newEntry.toByteBuffer());
+								dirty = true;
+							} else { // free the offset
 								try {
 									freeOffset(entry.curOffset);
 									keyCount.decrementAndGet();
@@ -1103,11 +1127,20 @@
 										Logger.error(this, "error freeing entry " + entry.curOffset, ioe);
 								}
 							}
-						} finally {
-							// unlock current entry
-							unlockEntry(entry.curOffset);
-							locked[(int) (entry.curOffset - offset)] = false;
 						}
+					} finally {
+						// write back.
+						if (dirty) {
+							buf.flip();
+
+							try {
+								while (buf.hasRemaining()) {
+									storeFC[i].write(buf, startFileOffset + buf.position());
+								}
+							} catch (IOException ioe) {
+								Logger.error(this, "unexpected IOException", ioe);
+							}
+						}
 					}
 				}
 
@@ -1401,11 +1434,7 @@
 			try {
 				flushAndClose();
 				flags &= ~FLAG_DIRTY; // clean shutdown
-				try {
-					writeConfigFile();
-				} catch (IOException e) {
-					Logger.error(this, "error writing store config", e);
-				}
+				writeConfigFile();
 			} finally {
 				configLock.writeLock().unlock();
 			}




More information about the cvs mailing list