[freenet-dev] [freenet-cvs] r16733 - trunk/freenet/src/freenet/node

Matthew Toseland toad at amphibian.dyndns.org
Thu Dec 20 00:25:49 UTC 2007


You need to call .setMatchesDroppedConnection() 
and .setMatchesRestartedConnections() on the filter.

On Wednesday 19 December 2007 19:31, robert at freenetproject.org wrote:
> Author: robert
> Date: 2007-12-19 19:31:05 +0000 (Wed, 19 Dec 2007)
> New Revision: 16733
> 
> Modified:
>    trunk/freenet/src/freenet/node/CHKInsertSender.java
> Log:
> use message callback rather than hanging onto thread
> 
> 
> Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> ===================================================================
> --- trunk/freenet/src/freenet/node/CHKInsertSender.java	2007-12-19 19:10:21 
UTC (rev 16732)
> +++ trunk/freenet/src/freenet/node/CHKInsertSender.java	2007-12-19 19:31:05 
UTC (rev 16733)
> @@ -6,6 +6,7 @@
>  import java.util.HashSet;
>  import java.util.Vector;
>  
> +import freenet.io.comm.AsyncMessageFilterCallback;
>  import freenet.io.comm.ByteCounter;
>  import freenet.io.comm.DMT;
>  import freenet.io.comm.DisconnectedException;
> @@ -24,7 +25,7 @@
>  
>  public final class CHKInsertSender implements Runnable, AnyInsertSender, 
ByteCounter {
>  	
> -	private class BackgroundTransfer implements Runnable {
> +	private class BackgroundTransfer implements Runnable, 
AsyncMessageFilterCallback {
>  		/** Node we are waiting for response from */
>  		final PeerNode pn;
>  		/** We may be sending data to that node */
> @@ -42,6 +43,8 @@
>  		/** Did it succeed? */
>  		boolean transferSucceeded;
>  		
> +		long transferCompletedTime;
> +		
>  		BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
>  			this.pn = pn;
>  			bt = new BlockTransmitter(node.usm, pn, uid, prb, node.outputThrottle, 
CHKInsertSender.this);
> @@ -56,7 +59,13 @@
>  			try {
>  				bt.send(node.executor);
>  				this.completedTransfer(bt.failedDueToOverload());
> -				this.receivedNotice(waitForReceivedNotification(this));
> +				if (pn.isRoutable() && transferSucceeded) {
> +					//synch-version: 
this.receivedNotice(waitForReceivedNotification(this));
> +					//Add ourselves as a listener for the longterm completion message of 
this transfer, then gracefully exit.
> +					node.usm.addAsyncFilter(getNotificationMessageFilter(), this);
> +				} else {
> +					this.receivedNotice(false);
> +				}
>  			} catch (Throwable t) {
>  				this.completedTransfer(false);
>  				this.receivedNotice(false);
> @@ -64,10 +73,11 @@
>  			}
>  		}
>  		
> -		void completedTransfer(boolean success) {
> +		private void completedTransfer(boolean success) {
>  			synchronized(this) {
>  				transferSucceeded = success;
>  				completedTransfer = true;
> +				transferCompletedTime = System.currentTimeMillis();
>  				notifyAll();
>  			}
>  			synchronized(backgroundTransfers) {
> @@ -78,11 +88,15 @@
>  			}
>  		}
>  		
> -		void receivedNotice(boolean success) {
> +		private void receivedNotice(boolean success) {
>  			synchronized(this) {
> +				if (receivedCompletionNotice) {
> +					Logger.error(this, "receivedNotice("+success+"), already had 
receivedNotice("+completionSucceeded+")");
> +				} else {
>  				completionSucceeded = success;
>  				receivedCompletionNotice = true;
>  				notifyAll();
> +				}
>  			}
>  			synchronized(backgroundTransfers) {
>  				backgroundTransfers.notifyAll();
> @@ -92,6 +106,39 @@
>  			}			
>  		}
>  		
> +		public void onMatched(Message m) {
> +			PeerNode pn = (PeerNode) m.getSource();
> +			// pn cannot be null, because the filters will prevent garbage 
collection of the nodes
> +			
> +			if(this.pn.equals(pn)) {
> +				boolean anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> +				if(anyTimedOut) {
> +					CHKInsertSender.this.setTransferTimedOut();
> +				}
> +				receivedNotice(!anyTimedOut);
> +			} else {
> +				Logger.error(this, "received completion notice for wrong 
node: "+pn+" != "+this.pn);
> +			}			
> +		}
> +		
> +		public boolean shouldTimeout() {
> +			//AFIACS, this will still let the filter timeout, but not call 
onMatched() twice.
> +			return receivedCompletionNotice;
> +		}
> +		
> +		private MessageFilter getNotificationMessageFilter() {
> +			return MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(pn).setTimeout(TRANSFER_COMPLETION_ACK_TIMEOUT);
> +		}
> +		
> +		boolean isTimedOut() {
> +			return 
System.currentTimeMillis()>(transferCompletedTime+TRANSFER_COMPLETION_ACK_TIMEOUT);
> +		}
> +		
> +		public void maybeTimedOut() {
> +			if (isTimedOut()) {
> +				receivedNotice(false);
> +			}
> +		}
>  	}
>  	
>  	CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
> @@ -651,83 +698,7 @@
>  				}
>  			}
>  		}
> -	
> -	/**
> -	 * Blocks and waits for a response from the given node asto the final 
transfer status in the chain. This will be longer/after
> -	 * the local block transfer is complete, as it is neccesary to include the 
rount-trip-time in the allTransfersComplete()
> -	 * function.
> -	 * Returns true if received a successful notification of the downstream 
reception, false in every other case
> -	 * (e.g. timeout, cancel, receiveFailed, etc).
> -	 */
> -	private boolean waitForReceivedNotification(BackgroundTransfer awc) {
> -				
> -			long transfersCompletedTime = System.currentTimeMillis();
> -			
> -			// Wait for acknowledgements from each node, or timeouts.
> -			
> -			while(true) {
> -				
> -				synchronized(backgroundTransfers) {
> -					if(receiveFailed) return false;
> -				}
> -				// First calculate the timeout
> -				int timeout;
> -				long now = System.currentTimeMillis();
> -				timeout = (int)Math.min(Integer.MAX_VALUE, (transfersCompletedTime + 
TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
> -				if(timeout <= 0) {
> -						Logger.error(this, "Timed out waiting for transfers to complete 
on "+uid);
> -						setTransferTimedOut();
> -					return false;
> -				}
> -				
> -					// If disconnected, ignore.
> -					if(!awc.pn.isRoutable()) {
> -						Logger.normal(this, "Disconnected: "+awc.pn+" 
in "+CHKInsertSender.this);
> -						return false;
> -					}
> -					// If transfer failed, probably won't be acknowledged.
> -					if(!awc.transferSucceeded) {
> -						if (logMINOR) 
Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> 
false");
> -						return false;
> -					}
> -					// See if redundant.
> -					if(awc.receivedCompletionNotice) {
> -						return awc.completionSucceeded;
> -					}
> -				
> -				MessageFilter mf =
> -							MessageFilter.create().setField(DMT.UID, 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
>  
> -						if(logMINOR) Logger.minor(this, "Waiting for "+awc.pn.getPeer());
> -				
> -					Message m;
> -					try {
> -						m = node.usm.waitFor(mf, CHKInsertSender.this);
> -					} catch (DisconnectedException e) {
> -						Logger.normal(this, "Disconnected (on waitFor): "+awc.pn+" 
in "+this);
> -						return false;
> -					}
> -					if(m == null) {
> -						Logger.error(this, "Timed out waiting for a final ack 
from: "+awc.pn);
> -						return false;
> -					} else {
> -						PeerNode pn = (PeerNode) m.getSource();
> -						// pn cannot be null, because the filters will prevent garbage 
collection of the nodes
> -
> -							if(awc.pn.equals(pn)) {
> -								boolean anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> -								if(anyTimedOut) {
> -									setTransferTimedOut();
> -								}
> -								return !anyTimedOut;
> -							} else {
> -								Logger.error(this, "received completion notice for wrong 
node: "+awc);
> -								continue;
> -							}
> -					}
> -				}
> -		}
> -
>  		/**
>  		 * Block until all transfers have reached a final-terminal state 
(success/failure). On success this means that a
>  		 * successful 'received-notification' has been received.
> @@ -751,6 +722,7 @@
>  						completedTransfers = false;
>  						break;
>  					}
> +					transfers[i].maybeTimedOut();
>  					if (!transfers[i].receivedCompletionNotice) {
>  						//must wait
>  						completedNotifications = false;
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
Url : http://emu.freenetproject.org/pipermail/devl/attachments/20071220/1158e1e3/attachment.pgp 


More information about the Devl mailing list