[freenet-dev] [freenet-cvs] r16733 - trunk/freenet/src/freenet/node

Matthew Toseland toad at amphibian.dyndns.org
Thu Dec 20 00:27:11 UTC 2007


On Thursday 20 December 2007 00:25, Matthew Toseland wrote:
> You need to call .setMatchesDroppedConnection() 
> and .setMatchesRestartedConnections() on the filter.

Sorry, no you don't, it's on by default.
> 
> On Wednesday 19 December 2007 19:31, robert at freenetproject.org wrote:
> > Author: robert
> > Date: 2007-12-19 19:31:05 +0000 (Wed, 19 Dec 2007)
> > New Revision: 16733
> > 
> > Modified:
> >    trunk/freenet/src/freenet/node/CHKInsertSender.java
> > Log:
> > use message callback rather than hanging onto thread
> > 
> > 
> > Modified: trunk/freenet/src/freenet/node/CHKInsertSender.java
> > ===================================================================
> > --- trunk/freenet/src/freenet/node/CHKInsertSender.java	2007-12-19 
19:10:21 
> UTC (rev 16732)
> > +++ trunk/freenet/src/freenet/node/CHKInsertSender.java	2007-12-19 
19:31:05 
> UTC (rev 16733)
> > @@ -6,6 +6,7 @@
> >  import java.util.HashSet;
> >  import java.util.Vector;
> >  
> > +import freenet.io.comm.AsyncMessageFilterCallback;
> >  import freenet.io.comm.ByteCounter;
> >  import freenet.io.comm.DMT;
> >  import freenet.io.comm.DisconnectedException;
> > @@ -24,7 +25,7 @@
> >  
> >  public final class CHKInsertSender implements Runnable, AnyInsertSender, 
> ByteCounter {
> >  	
> > -	private class BackgroundTransfer implements Runnable {
> > +	private class BackgroundTransfer implements Runnable, 
> AsyncMessageFilterCallback {
> >  		/** Node we are waiting for response from */
> >  		final PeerNode pn;
> >  		/** We may be sending data to that node */
> > @@ -42,6 +43,8 @@
> >  		/** Did it succeed? */
> >  		boolean transferSucceeded;
> >  		
> > +		long transferCompletedTime;
> > +		
> >  		BackgroundTransfer(PeerNode pn, PartiallyReceivedBlock prb) {
> >  			this.pn = pn;
> >  			bt = new BlockTransmitter(node.usm, pn, uid, prb, node.outputThrottle, 
> CHKInsertSender.this);
> > @@ -56,7 +59,13 @@
> >  			try {
> >  				bt.send(node.executor);
> >  				this.completedTransfer(bt.failedDueToOverload());
> > -				this.receivedNotice(waitForReceivedNotification(this));
> > +				if (pn.isRoutable() && transferSucceeded) {
> > +					//synch-version: 
> this.receivedNotice(waitForReceivedNotification(this));
> > +					//Add ourselves as a listener for the longterm completion message of 
> this transfer, then gracefully exit.
> > +					node.usm.addAsyncFilter(getNotificationMessageFilter(), this);
> > +				} else {
> > +					this.receivedNotice(false);
> > +				}
> >  			} catch (Throwable t) {
> >  				this.completedTransfer(false);
> >  				this.receivedNotice(false);
> > @@ -64,10 +73,11 @@
> >  			}
> >  		}
> >  		
> > -		void completedTransfer(boolean success) {
> > +		private void completedTransfer(boolean success) {
> >  			synchronized(this) {
> >  				transferSucceeded = success;
> >  				completedTransfer = true;
> > +				transferCompletedTime = System.currentTimeMillis();
> >  				notifyAll();
> >  			}
> >  			synchronized(backgroundTransfers) {
> > @@ -78,11 +88,15 @@
> >  			}
> >  		}
> >  		
> > -		void receivedNotice(boolean success) {
> > +		private void receivedNotice(boolean success) {
> >  			synchronized(this) {
> > +				if (receivedCompletionNotice) {
> > +					Logger.error(this, "receivedNotice("+success+"), already had 
> receivedNotice("+completionSucceeded+")");
> > +				} else {
> >  				completionSucceeded = success;
> >  				receivedCompletionNotice = true;
> >  				notifyAll();
> > +				}
> >  			}
> >  			synchronized(backgroundTransfers) {
> >  				backgroundTransfers.notifyAll();
> > @@ -92,6 +106,39 @@
> >  			}			
> >  		}
> >  		
> > +		public void onMatched(Message m) {
> > +			PeerNode pn = (PeerNode) m.getSource();
> > +			// pn cannot be null, because the filters will prevent garbage 
> collection of the nodes
> > +			
> > +			if(this.pn.equals(pn)) {
> > +				boolean anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> > +				if(anyTimedOut) {
> > +					CHKInsertSender.this.setTransferTimedOut();
> > +				}
> > +				receivedNotice(!anyTimedOut);
> > +			} else {
> > +				Logger.error(this, "received completion notice for wrong 
> node: "+pn+" != "+this.pn);
> > +			}			
> > +		}
> > +		
> > +		public boolean shouldTimeout() {
> > +			//AFIACS, this will still let the filter timeout, but not call 
> onMatched() twice.
> > +			return receivedCompletionNotice;
> > +		}
> > +		
> > +		private MessageFilter getNotificationMessageFilter() {
> > +			return MessageFilter.create().setField(DMT.UID, 
> 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(pn).setTimeout(TRANSFER_COMPLETION_ACK_TIMEOUT);
> > +		}
> > +		
> > +		boolean isTimedOut() {
> > +			return 
> 
System.currentTimeMillis()>(transferCompletedTime+TRANSFER_COMPLETION_ACK_TIMEOUT);
> > +		}
> > +		
> > +		public void maybeTimedOut() {
> > +			if (isTimedOut()) {
> > +				receivedNotice(false);
> > +			}
> > +		}
> >  	}
> >  	
> >  	CHKInsertSender(NodeCHK myKey, long uid, byte[] headers, short htl, 
> > @@ -651,83 +698,7 @@
> >  				}
> >  			}
> >  		}
> > -	
> > -	/**
> > -	 * Blocks and waits for a response from the given node asto the final 
> transfer status in the chain. This will be longer/after
> > -	 * the local block transfer is complete, as it is neccesary to include 
the 
> rount-trip-time in the allTransfersComplete()
> > -	 * function.
> > -	 * Returns true if received a successful notification of the downstream 
> reception, false in every other case
> > -	 * (e.g. timeout, cancel, receiveFailed, etc).
> > -	 */
> > -	private boolean waitForReceivedNotification(BackgroundTransfer awc) {
> > -				
> > -			long transfersCompletedTime = System.currentTimeMillis();
> > -			
> > -			// Wait for acknowledgements from each node, or timeouts.
> > -			
> > -			while(true) {
> > -				
> > -				synchronized(backgroundTransfers) {
> > -					if(receiveFailed) return false;
> > -				}
> > -				// First calculate the timeout
> > -				int timeout;
> > -				long now = System.currentTimeMillis();
> > -				timeout = (int)Math.min(Integer.MAX_VALUE, (transfersCompletedTime + 
> TRANSFER_COMPLETION_ACK_TIMEOUT) - now);
> > -				if(timeout <= 0) {
> > -						Logger.error(this, "Timed out waiting for transfers to complete 
> on "+uid);
> > -						setTransferTimedOut();
> > -					return false;
> > -				}
> > -				
> > -					// If disconnected, ignore.
> > -					if(!awc.pn.isRoutable()) {
> > -						Logger.normal(this, "Disconnected: "+awc.pn+" 
> in "+CHKInsertSender.this);
> > -						return false;
> > -					}
> > -					// If transfer failed, probably won't be acknowledged.
> > -					if(!awc.transferSucceeded) {
> > -						if (logMINOR) 
> Logger.minor(this, "waitForReceivedNotification: !transferSucceeded -> 
> false");
> > -						return false;
> > -					}
> > -					// See if redundant.
> > -					if(awc.receivedCompletionNotice) {
> > -						return awc.completionSucceeded;
> > -					}
> > -				
> > -				MessageFilter mf =
> > -							MessageFilter.create().setField(DMT.UID, 
> 
uid).setType(DMT.FNPInsertTransfersCompleted).setSource(awc.pn).setTimeout(timeout);
> >  
> > -						if(logMINOR) Logger.minor(this, "Waiting for "+awc.pn.getPeer());
> > -				
> > -					Message m;
> > -					try {
> > -						m = node.usm.waitFor(mf, CHKInsertSender.this);
> > -					} catch (DisconnectedException e) {
> > -						Logger.normal(this, "Disconnected (on waitFor): "+awc.pn+" 
> in "+this);
> > -						return false;
> > -					}
> > -					if(m == null) {
> > -						Logger.error(this, "Timed out waiting for a final ack 
> from: "+awc.pn);
> > -						return false;
> > -					} else {
> > -						PeerNode pn = (PeerNode) m.getSource();
> > -						// pn cannot be null, because the filters will prevent garbage 
> collection of the nodes
> > -
> > -							if(awc.pn.equals(pn)) {
> > -								boolean anyTimedOut = m.getBoolean(DMT.ANY_TIMED_OUT);
> > -								if(anyTimedOut) {
> > -									setTransferTimedOut();
> > -								}
> > -								return !anyTimedOut;
> > -							} else {
> > -								Logger.error(this, "received completion notice for wrong 
> node: "+awc);
> > -								continue;
> > -							}
> > -					}
> > -				}
> > -		}
> > -
> >  		/**
> >  		 * Block until all transfers have reached a final-terminal state 
> (success/failure). On success this means that a
> >  		 * successful 'received-notification' has been received.
> > @@ -751,6 +722,7 @@
> >  						completedTransfers = false;
> >  						break;
> >  					}
> > +					transfers[i].maybeTimedOut();
> >  					if (!transfers[i].receivedCompletionNotice) {
> >  						//must wait
> >  						completedNotifications = false;
> > 
> > _______________________________________________
> > cvs mailing list
> > cvs at freenetproject.org
> > http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> > 
> > 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 189 bytes
Desc: not available
Url : http://emu.freenetproject.org/pipermail/devl/attachments/20071220/bb7e3a8e/attachment.pgp 


More information about the Devl mailing list