[jboss-cvs] jboss-jms/src/main/org/jboss/messaging/core/distributed/queue ...
Timothy Fox
tim.fox at jboss.com
Mon Jul 17 13:14:47 EDT 2006
User: timfox
Date: 06/07/17 13:14:47
Modified: src/main/org/jboss/messaging/core/distributed/queue
DistributedQueue.java QueuePeer.java
Log:
Many changes including implementation of prefetch, SEDAisation of server, changing of recovery
Revision Changes Path
1.16 +54 -54 jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/DistributedQueue.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DistributedQueue.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/DistributedQueue.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -b -r1.15 -r1.16
--- DistributedQueue.java 27 Jun 2006 19:44:39 -0000 1.15
+++ DistributedQueue.java 17 Jul 2006 17:14:47 -0000 1.16
@@ -31,7 +31,6 @@
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.Routable;
import org.jboss.messaging.core.distributed.Distributed;
import org.jboss.messaging.core.distributed.DistributedException;
@@ -50,9 +49,9 @@
* A distributed queue.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1.15 $</tt>
+ * @version <tt>$Revision: 1.16 $</tt>
*
- * $Id: DistributedQueue.java,v 1.15 2006/06/27 19:44:39 timfox Exp $
+ * $Id: DistributedQueue.java,v 1.16 2006/07/17 17:14:47 timfox Exp $
*/
public class DistributedQueue extends Queue implements Distributed
{
@@ -83,7 +82,7 @@
public DistributedQueue(String name, MessageStore ms, PersistenceManager tl,
RpcDispatcher dispatcher)
{
- super(-1, ms, tl, null, true, 0, 0, 0);
+ super(-1, ms, tl, null, true, 0, 0, 0, null);
viewKeeper = new QueueViewKeeper();
peer = new QueuePeer(this, dispatcher);
}
@@ -109,58 +108,59 @@
return messages;
}
- public boolean deliver(Receiver r)
+ public void deliver()
{
- boolean delivered = super.deliver(r);
+// boolean delivered = super.deliver(r);
+//
+// if (delivered)
+// {
+// return true;
+// }
+//
+// // this peer has not redelivered any message but other peers may have messages to redeliver
+//
+// if (r instanceof RemoteQueue)
+// {
+// // this was a remote forward request, but there are no local messages to forward so return
+// return false;
+// }
+//
+// // no message in the local state, try remotely
+// boolean peerWillForward = false;
+// try
+// {
+// peerWillForward = peer.requestForward();
+// }
+// catch(DistributedException e)
+// {
+// log.error("Redelivery request failed", e);
+// return false;
+// }
+//
+// if (!peerWillForward)
+// {
+// return false;
+// }
+//
+// // at least one peer will forward a message
+// while(!delivered)
+// {
+// // TODO: experimental. Also see QueuePeer.requestForward() and QueueFacade.forward()
+// try
+// {
+// Thread.sleep(200);
+// }
+// catch(InterruptedException e)
+// {
+// // ignore
+// }
+//
+// if (log.isTraceEnabled()) { log.trace("retrying redelivery"); }
+// delivered = super.deliver(r);
+// }
+//
+// return delivered;
- if (delivered)
- {
- return true;
- }
-
- // this peer has not redelivered any message but other peers may have messages to redeliver
-
- if (r instanceof RemoteQueue)
- {
- // this was a remote forward request, but there are no local messages to forward so return
- return false;
- }
-
- // no message in the local state, try remotely
- boolean peerWillForward = false;
- try
- {
- peerWillForward = peer.requestForward();
- }
- catch(DistributedException e)
- {
- log.error("Redelivery request failed", e);
- return false;
- }
-
- if (!peerWillForward)
- {
- return false;
- }
-
- // at least one peer will forward a message
- while(!delivered)
- {
- // TODO: experimental. Also see QueuePeer.requestForward() and QueueFacade.forward()
- try
- {
- Thread.sleep(200);
- }
- catch(InterruptedException e)
- {
- // ignore
- }
-
- if (log.isTraceEnabled()) { log.trace("retrying redelivery"); }
- delivered = super.deliver(r);
- }
-
- return delivered;
}
public void close()
1.7 +19 -17 jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/QueuePeer.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: QueuePeer.java
===================================================================
RCS file: /cvsroot/jboss/jboss-jms/src/main/org/jboss/messaging/core/distributed/queue/QueuePeer.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -b -r1.6 -r1.7
--- QueuePeer.java 28 Feb 2006 16:48:13 -0000 1.6
+++ QueuePeer.java 17 Jul 2006 17:14:47 -0000 1.7
@@ -21,35 +21,35 @@
*/
package org.jboss.messaging.core.distributed.queue;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.logging.Logger;
import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.distributed.DistributedException;
+import org.jboss.messaging.core.distributed.PeerIdentity;
+import org.jboss.messaging.core.distributed.PeerSupport;
+import org.jboss.messaging.core.distributed.RemotePeer;
+import org.jboss.messaging.core.distributed.RemotePeerInfo;
import org.jboss.messaging.core.distributed.pipe.DistributedPipe;
import org.jboss.messaging.core.distributed.pipe.DistributedPipeOutput;
+import org.jboss.messaging.core.distributed.util.RpcServer;
import org.jboss.messaging.core.distributed.util.RpcServerCall;
import org.jboss.messaging.core.distributed.util.ServerResponse;
-import org.jboss.messaging.core.distributed.util.RpcServer;
-import org.jboss.messaging.core.distributed.PeerSupport;
-import org.jboss.messaging.core.distributed.PeerIdentity;
-import org.jboss.messaging.core.distributed.DistributedException;
-import org.jboss.messaging.core.distributed.RemotePeer;
-import org.jboss.messaging.core.distributed.RemotePeerInfo;
-import org.jboss.logging.Logger;
import org.jboss.util.id.GUID;
import org.jgroups.blocks.RpcDispatcher;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Collection;
-import java.util.ArrayList;
-import java.util.Iterator;
-
/**
* The class that mediates the access of a distributed queue instance to the group.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @version <tt>$Revision: 1.6 $</tt>
+ * @version <tt>$Revision: 1.7 $</tt>
*
- * $Id: QueuePeer.java,v 1.6 2006/02/28 16:48:13 timfox Exp $
+ * $Id: QueuePeer.java,v 1.7 2006/07/17 17:14:47 timfox Exp $
*/
public class QueuePeer extends PeerSupport implements QueueFacade
{
@@ -100,7 +100,9 @@
if (log.isTraceEnabled()) { log.trace(this + " got forward request from " + targetID); }
RemoteQueue target = ((DistributedQueue.QueueViewKeeper)viewKeeper).getRemoteQueue(targetID);
- return queue.deliver(target);
+ queue.deliver();
+
+ return true;
}
// Public --------------------------------------------------------
More information about the jboss-cvs-commits
mailing list