[freenet-dev] [freenet-cvs] r16725 - trunk/freenet/src/freenet/node

Matthew Toseland toad at amphibian.dyndns.org
Wed Dec 19 18:35:26 UTC 2007


It's a nice simplification... but it'll use a lot of threads.

How about asynchronous callbacks?

On Wednesday 19 December 2007 17:27, you wrote:
> Author: robert
> Date: 2007-12-19 17:27:36 +0000 (Wed, 19 Dec 2007)
> New Revision: 16725
> 
> Modified:
>    trunk/freenet/src/freenet/node/CHKInsertSender.java
> Log:
> Wait for transfer ack's from background transfers independently (might catch 
more)
> 
> 
> Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/CHKInsertSender.java	2007-12-19 16:44:52 
UTC (rev 16724)
> +++ trunk/freenet/src/freenet/node/CHKInsertSender.java	2007-12-19 17:27:36 
UTC (rev 16725)
> @@ -56,8 +56,10 @@
>  			try {
>  				bt.send(node.executor);
>  				this.completedTransfer(bt.failedDueToOverload());
> +				this.receivedNotice(waitForReceivedNotification(this));
>  			} catch (Throwable t) {
>  				this.completedTransfer(false);
> +				this.receivedNotice(false);
>  				Logger.error(this, "Caught "+t, t);
>  			}
>  		}
> @@ -617,12 +619,12 @@
>  		return sentRequest;
>  	}
>  		
> -		public void waitForBackgroundTransferCompletions() {
> +		private void waitForBackgroundTransferCompletions() {
>  			try {
>  		    freenet.support.Logger.OSThread.logPID(this);
>  			if(logMINOR) Logger.minor(this, "Starting "+this);
>  			
> -			// We are presently at a terminal stage.
> +			// We must presently be at such a stage that no more background 
transfers will be added.
>  			
>  			BackgroundTransfer[] transfers;
>  			synchronized(backgroundTransfers) {
> @@ -631,10 +633,26 @@
>  			}
>  			
>  			// Wait for the outgoing transfers to complete.
> -			if(!waitForCompletedTransfers(transfers)) {
> +			if(!waitForBackgroundTransfers(transfers)) {
>  				setTransferTimedOut();
>  				return;
>  			}
> +			} finally {
> +				synchronized(CHKInsertSender.this) {
> +					allTransfersCompleted = true;
> +					CHKInsertSender.this.notifyAll();
> +				}
> +			}
> +		}
> +	
> +	/**
> +	 * Blocks and waits for a response from the given node asto the final 
transfer status in the chain. This will be longer/after
> +	 * the local block transfer is complete, as it is neccesary to include the 
rount-trip-time in the allTransfersComplete()
> +	 * function.
> +	 * Returns true if received a successful notification of the downstream 
reception, false in every other case
> +	 * (e.g. timeout, cancel, receiveFailed, etc).
> +	 */
> +	private boolean waitForReceivedNotification(BackgroundTransfer awc) {
>  				
>  			long transfersCompletedTime = System.currentTimeMillis();
>  			
> @@ -643,7 +661,7 @@
>  			while(true) {
>  				
>  				synchronized(backgroundTransfers) {
> -					if(receiveFailed) return;
> +					if(receiveFailed) return false;
>  				}
>  				// First calculate the timeout
>  				int timeout;
> @@ -652,104 +670,92 @@
>  				if(timeout <= 0) {
>  						Logger.error(this, "Timed out waiting for transfers to complete 
on "+uid);
>  						setTransferTimedOut();
> -					return;
> +					return false;
>  				}
>  				
> -				MessageFilter mf = null;
> -				
> -				//Build a message filter to capture acknowledgement messages from the 
nodes we are interested in.
> -				for(int i=0;i<transfers.length;i++) {
> -					BackgroundTransfer awc = transfers[i];
>  					// If disconnected, ignore.
>  					if(!awc.pn.isRoutable()) {
>  						Logger.normal(this, "Disconnected: "+awc.pn+" 
in "+CHKInsertSender.this);
> -						continue;
> +						return false;
>  					}
>  					// If transfer failed, probably won't be acknowledged.
>  					if(!awc.transferSucceeded) {
> -						continue;
> +						if (logMINOR) 
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> 
false");
> +						return false;
>  					}
> -					// Wait for completion.
> -					if(!awc.receivedCompletionNotice) {
> -						MessageFilter m =
> +					// See if redundant.
> +					if(awc.receivedCompletionNotice) {
> +						return awc.completionSucceeded;
> +					}
> +				
> +				MessageFilter mf =
>  							MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
> -						if(mf == null)
> -							mf = m;
> -						else
> -							mf = m.or(mf);
> +
>  						if(logMINOR) Logger.minor(this, "Waiting for "+awc.pn.getPeer());
> -					}
> -				}
>  				
> -				if (mf==null) {
> -					if (logMINOR) Logger.minor(this, "Done waiting, no more completion 
listeners");
> -					return;
> -				} else {
>  					Message m;
>  					try {
>  						m = node.usm.waitFor(mf, CHKInsertSender.this);
>  					} catch (DisconnectedException e) {
> -						// Which one? I have no idea.
> -						// Go around the loop again to find out.
> -						continue;
> +						Logger.normal(this, "Disconnected (on waitFor): "+awc.pn+" 
in "+this);
> +						return false;
>  					}
>  					if(m == null) {
> -						Logger.error(this, "Timed out waiting for a final ack from any 
nodes.");
> -						//Would looping again help? We could either:
> -						// (1) loop and notice that there is no more time left (handling the 
timeout), or
> -						// (2) notice that the nodes we are waiting on are down and possibly 
exit gracefully (not implemented).
> -						continue;
> +						Logger.error(this, "Timed out waiting for a final ack 
from: "+awc.pn);
> +						return false;
>  					} else {
> -						// Process message
>  						PeerNode pn = (PeerNode) m.getSource();
>  						// pn cannot be null, because the filters will prevent garbage 
collection of the nodes
> -						boolean processed = false;
> -						for(int i=0;i<transfers.length;i++) {
> -							PeerNode p = transfers[i].pn;
> -							if(p == pn) {
> +
> +							if(awc.pn.equals(pn)) {
>  								boolean anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> -								transfers[i].receivedNotice(!anyTimedOut);
>  								if(anyTimedOut) {
>  									setTransferTimedOut();
>  								}
> -								processed = true;
> -								break;
> +								return !anyTimedOut;
> +							} else {
> +								Logger.error(this, "received completion notice for wrong 
node: "+awc);
> +								continue;
>  							}
> -						}
> -						if(!processed) {
> -							Logger.error(this, "Did not process message: "+m+" on "+this);
> -						}
>  					}
>  				}
> -			}
> -			} finally {
> -				synchronized(CHKInsertSender.this) {
> -					allTransfersCompleted = true;
> -					CHKInsertSender.this.notifyAll();
> -				}
> -			}
>  		}
>  
> -		/** Block until all transfers have finished. @return True if there is any 
point in waiting for acknowledgements. */
> -		private boolean waitForCompletedTransfers(BackgroundTransfer[] transfers) 
{
> +		/**
> +		 * Block until all transfers have reached a final-terminal state 
(success/failure). On success this means that a
> +		 * successful 'received-notification' has been received.
> +		 * @return True if all background transfers were successful.
> +		 */
> +		private boolean waitForBackgroundTransfers(BackgroundTransfer[] 
transfers) {
>  			// MAYBE all done
>  			while(true) {
> +				//If we want to be sure to exit as-soon-as the transfers are done, then 
we must hold the lock while we check.
> +				synchronized(backgroundTransfers) {
> +					if(receiveFailed) return false;
> +
>  				boolean noneRouteable = true;
>  				boolean completedTransfers = true;
> +				boolean completedNotifications = true;
>  				for(int i=0;i<transfers.length;i++) {
>  					if(!transfers[i].pn.isRoutable()) continue;
>  					noneRouteable = false;
>  					if(!transfers[i].completedTransfer) {
> +						//must wait
>  						completedTransfers = false;
>  						break;
>  					}
> +					if (!transfers[i].receivedCompletionNotice) {
> +						//must wait
> +						completedNotifications = false;
> +						break;
> +					}
> +					if (!transfers[i].completionSucceeded)
> +						return false;
>  				}
> -				if(completedTransfers) return true;
>  				if(noneRouteable) return false;
> +				if(completedTransfers && completedNotifications) return true;
>  
> -				synchronized(backgroundTransfers) {
> -					if(receiveFailed) return false;
> -					if(logMINOR) Logger.minor(this, "Waiting for completion");
> +					if(logMINOR) Logger.minor(this, "Waiting for 
(completion="+!completedTransfers+", 
notification="+completedNotifications+")");
>  					try {
>  						backgroundTransfers.wait(100*1000);
>  					} catch (InterruptedException e) {
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
Url : http://emu.freenetproject.org/pipermail/devl/attachments/20071219/50b32c93/attachment.pgp 


More information about the Devl mailing list