[jboss-cvs] jboss-jms/src/main/org/jboss/messaging/core/distributed/queue ...

Timothy Fox tim.fox at jboss.com
Mon Jul 17 13:14:47 EDT 2006


  User: timfox  
  Date: 06/07/17 13:14:47

  Modified:    src/main/org/jboss/messaging/core/distributed/queue  
                        DistributedQueue.java QueuePeer.java
  Log:
  Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
  
  Revision  Changes    Path
  1.16      +54 -54    jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/DistributedQueue.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: DistributedQueue.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/DistributedQueue.java,v
  retrieving revision 1.15
  retrieving revision 1.16
  diff -u -b -r1.15 -r1.16
  --- DistributedQueue.java	27 Jun 2006 19:44:39 -0000	1.15
  +++ DistributedQueue.java	17 Jul 2006 17:14:47 -0000	1.16
  @@ -31,7 +31,6 @@
   import org.jboss.messaging.core.Filter;
   import org.jboss.messaging.core.Message;
   import org.jboss.messaging.core.MessageReference;
  -import org.jboss.messaging.core.Receiver;
   import org.jboss.messaging.core.Routable;
   import org.jboss.messaging.core.distributed.Distributed;
   import org.jboss.messaging.core.distributed.DistributedException;
  @@ -50,9 +49,9 @@
    * A distributed queue.
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  - * @version <tt>$Revision: 1.15 $</tt>
  + * @version <tt>$Revision: 1.16 $</tt>
    *
  - * $Id: DistributedQueue.java,v 1.15 2006/06/27 19:44:39 timfox Exp $
  + * $Id: DistributedQueue.java,v 1.16 2006/07/17 17:14:47 timfox Exp $
    */
   public class DistributedQueue extends Queue implements Distributed
    {
  @@ -83,7 +82,7 @@
      public DistributedQueue(String name, MessageStore ms, PersistenceManager tl,
                              RpcDispatcher dispatcher)
      {
  -      super(-1, ms, tl, null, true, 0, 0, 0);
  +      super(-1, ms, tl, null, true, 0, 0, 0, null);
         viewKeeper = new QueueViewKeeper();
         peer = new QueuePeer(this, dispatcher);
      }
  @@ -109,58 +108,59 @@
         return messages;
      }
   
  -   public boolean deliver(Receiver r)
  +   public void deliver()
      {
  -      boolean delivered = super.deliver(r);
  +//      boolean delivered = super.deliver(r);
  +//
  +//      if (delivered)
  +//      {
  +//         return true;
  +//      }
  +//
  +//      // this peer has not redelivered any message but other peers may have messages to redeliver
  +//
  +//      if (r instanceof RemoteQueue)
  +//      {
  +//         // this was a remote forward request, but there are no local messages to forward so return
  +//         return false;
  +//      }
  +//
  +//      // no message in the local state, try remotely
  +//      boolean peerWillForward = false;
  +//      try
  +//      {
  +//         peerWillForward = peer.requestForward();
  +//      }
  +//      catch(DistributedException e)
  +//      {
  +//         log.error("Redelivery request failed", e);
  +//         return false;
  +//      }
  +//
  +//      if (!peerWillForward)
  +//      {
  +//         return false;
  +//      }
  +//
  +//      // at least one peer will forward a message
  +//      while(!delivered)
  +//      {
  +//         // TODO: experimental. Also see QueuePeer.requestForward() and QueueFacade.forward()
  +//         try
  +//         {
  +//            Thread.sleep(200);
  +//         }
  +//         catch(InterruptedException e)
  +//         {
  +//            // ignore
  +//         }
  +//
  +//         if (log.isTraceEnabled()) { log.trace("retrying redelivery"); }
  +//         delivered = super.deliver(r);
  +//      }
  +//
  +//      return delivered;
   
  -      if (delivered)
  -      {
  -         return true;
  -      }
  -
  -      // this peer has not redelivered any message but other peers may have messages to redeliver
  -
  -      if (r instanceof RemoteQueue)
  -      {
  -         // this was a remote forward request, but there are no local messages to forward so return
  -         return false;
  -      }
  -
  -      // no message in the local state, try remotely
  -      boolean peerWillForward = false;
  -      try
  -      {
  -         peerWillForward = peer.requestForward();
  -      }
  -      catch(DistributedException e)
  -      {
  -         log.error("Redelivery request failed", e);
  -         return false;
  -      }
  -
  -      if (!peerWillForward)
  -      {
  -         return false;
  -      }
  -
  -      // at least one peer will forward a message
  -      while(!delivered)
  -      {
  -         // TODO: experimental. Also see QueuePeer.requestForward() and QueueFacade.forward()
  -         try
  -         {
  -            Thread.sleep(200);
  -         }
  -         catch(InterruptedException e)
  -         {
  -            // ignore
  -         }
  -
  -         if (log.isTraceEnabled()) { log.trace("retrying redelivery"); }
  -         delivered = super.deliver(r);
  -      }
  -
  -      return delivered;
      }
   
      public void close()
  
  
  
  1.7       +19 -17    jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/QueuePeer.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: QueuePeer.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/QueuePeer.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -b -r1.6 -r1.7
  --- QueuePeer.java	28 Feb 2006 16:48:13 -0000	1.6
  +++ QueuePeer.java	17 Jul 2006 17:14:47 -0000	1.7
  @@ -21,35 +21,35 @@
     */
   package org.jboss.messaging.core.distributed.queue;
   
  +import java.io.Serializable;
  +import java.util.ArrayList;
  +import java.util.Collection;
  +import java.util.Collections;
  +import java.util.Iterator;
  +import java.util.List;
  +
  +import org.jboss.logging.Logger;
   import org.jboss.messaging.core.Filter;
  +import org.jboss.messaging.core.distributed.DistributedException;
  +import org.jboss.messaging.core.distributed.PeerIdentity;
  +import org.jboss.messaging.core.distributed.PeerSupport;
  +import org.jboss.messaging.core.distributed.RemotePeer;
  +import org.jboss.messaging.core.distributed.RemotePeerInfo;
   import org.jboss.messaging.core.distributed.pipe.DistributedPipe;
   import org.jboss.messaging.core.distributed.pipe.DistributedPipeOutput;
  +import org.jboss.messaging.core.distributed.util.RpcServer;
   import org.jboss.messaging.core.distributed.util.RpcServerCall;
   import org.jboss.messaging.core.distributed.util.ServerResponse;
  -import org.jboss.messaging.core.distributed.util.RpcServer;
  -import org.jboss.messaging.core.distributed.PeerSupport;
  -import org.jboss.messaging.core.distributed.PeerIdentity;
  -import org.jboss.messaging.core.distributed.DistributedException;
  -import org.jboss.messaging.core.distributed.RemotePeer;
  -import org.jboss.messaging.core.distributed.RemotePeerInfo;
  -import org.jboss.logging.Logger;
   import org.jboss.util.id.GUID;
   import org.jgroups.blocks.RpcDispatcher;
   
  -import java.io.Serializable;
  -import java.util.Collections;
  -import java.util.List;
  -import java.util.Collection;
  -import java.util.ArrayList;
  -import java.util.Iterator;
  -
   /**
    * The class that mediates the access of a distributed queue instance to the group.
    *
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  - * @version <tt>$Revision: 1.6 $</tt>
  + * @version <tt>$Revision: 1.7 $</tt>
    *
  - * $Id: QueuePeer.java,v 1.6 2006/02/28 16:48:13 timfox Exp $
  + * $Id: QueuePeer.java,v 1.7 2006/07/17 17:14:47 timfox Exp $
    */
   public class QueuePeer extends PeerSupport implements QueueFacade
    {
  @@ -100,7 +100,9 @@
         if (log.isTraceEnabled()) { log.trace(this + " got forward request from " + targetID); }
   
         RemoteQueue target = ((DistributedQueue.QueueViewKeeper)viewKeeper).getRemoteQueue(targetID);
  -      return queue.deliver(target);
  +      queue.deliver();
  +      
  +      return true;
      }
   
      // Public --------------------------------------------------------
  
  
  



More information about the jboss-cvs-commits mailing list