[jboss-cvs] JBoss Messaging SVN: r5333 - in branches/Branch_1416_merge/src/main/org/jboss: jms/client/container and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 11 05:44:17 EST 2008
Author: gaohoward
Date: 2008-11-11 05:44:16 -0500 (Tue, 11 Nov 2008)
New Revision: 5333
Modified:
branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java
branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java
branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java
branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
Log:
JBMESSAGING-1416 clean up code
Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-11-11 10:44:16 UTC (rev 5333)
@@ -40,6 +40,7 @@
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -212,6 +213,41 @@
return (Queue)getDestination();
}
+ // JBM Ordering Group Extension Implementation ---------------------------------
+
+
+ /**
+ * Lets the Producer send Ordering Group Messages.
+ *
+ * This methods makes the producer to be able to send messages in an ordering
+ * group named by orgpName. If the ogrpName is null, the group name
+ * will be automatically generated. By default a newly created producer
+ * has this feature disabled.
+ *
+ * Calling this method will override the group name set by previous call, if any.
+ * Users should make sure the uniqueness of the name if they supply one themselves
+ * across the JMS server domain. Auto-generated names always are guaranteed to be
+ * unique.
+ *
+ * @param ogrpName the name of the group or null for an auto-generated name.
+ *
+ */
+ public void enableOrderingGroup(String ogrpName) throws JMSException
+ {
+ delegate.enableOrderingGroup(ogrpName);
+ }
+
+ /**
+ * Disables the ordering group capability of the producer.
+ * After this method is called, the producer no longer sends out
+ * ordering group messages.
+ */
+ public void disableOrderingGroup() throws JMSException
+ {
+ delegate.disableOrderingGroup();
+ }
+
+
// Public --------------------------------------------------------
public ProducerDelegate getDelegate()
Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java 2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/container/ProducerAspect.java 2008-11-11 10:44:16 UTC (rev 5333)
@@ -39,7 +39,6 @@
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.ProducerState;
import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossTemporaryQueue;
@@ -62,6 +61,7 @@
* Remember! PER_INSTANCE aspects are very expensive so we avoid them.
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com>Howard Gao</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -263,7 +263,15 @@
{
m.setJMSMessageID(messageToSend.getJMSMessageID());
}
-
+
+ // Processing the ordering group message here.
+ if (producerState.isOrderingGroupEnabled())
+ {
+ String grpName = producerState.getOrderingGroupName();
+ messageToSend.setJBMOrderingGroupName(grpName);
+ messageToSend.setJBMOrderingGroupSeq(sessionState.getGroupSequenceNum(grpName));
+ }
+
// we now invoke the send(Message) method on the session, which will eventually be fielded
// by connection endpoint
((SessionDelegate)sessionState.getDelegate()).send(messageToSend, false);
@@ -385,6 +393,30 @@
return null;
}
+ public Object handleEnableOrderingGroup(Invocation invocation) throws Throwable
+ {
+ Object[] args = ((MethodInvocation)invocation).getArguments();
+ String orderingGroupName = (String)args[0];
+
+ ProducerState state = getProducerState(invocation);
+ if (orderingGroupName == null)
+ {
+ orderingGroupName = "JBM-ORD-GRP:" + UUID.randomUUID().toString();
+ }
+
+ state.setOrderingGroupName(orderingGroupName);
+ state.setOrderingGroupEnabled(true);
+
+ return null;
+ }
+
+ public Object handleDisableOrderingGroup(Invocation invocation) throws Throwable
+ {
+ ProducerState pState = getProducerState(invocation);
+ pState.disableOrderingGroup();
+ return null;
+ }
+
// Class YYY overrides --------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/delegate/ClientProducerDelegate.java 2008-11-11 10:44:16 UTC (rev 5333)
@@ -33,6 +33,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
*
* @version <tt>$Revision$</tt>
*
@@ -218,19 +219,30 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
- // Public ---------------------------------------------------------------------------------------
+ // JBM Ordering Group Extension ---------------------------------
+ public void enableOrderingGroup(String ogrpName) throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+ public void disableOrderingGroup() throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
public String toString()
{
return "ProducerDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
}
+ // Public ---------------------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
-
+
// Inner Classes --------------------------------------------------------------------------------
}
Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java 2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/ProducerState.java 2008-11-11 10:44:16 UTC (rev 5333)
@@ -36,6 +36,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodoorv</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
*
* @version <tt>$Revision$</tt>
*
@@ -61,6 +62,8 @@
private SessionState parent;
private ProducerDelegate delegate;
+ private boolean isOrderingGroupEnabled = false;
+ private String orderingGroupName = null;
// Constructors ---------------------------------------------------------------------------------
public ProducerState(SessionState parent, ProducerDelegate delegate, Destination dest)
@@ -166,6 +169,36 @@
this.deliveryMode = deliveryMode;
}
+ public void setOrderingGroupEnabled(boolean isOrderingGroupEnabled)
+ {
+ this.isOrderingGroupEnabled = isOrderingGroupEnabled;
+ }
+
+ public boolean isOrderingGroupEnabled()
+ {
+ return isOrderingGroupEnabled;
+ }
+
+ public void setOrderingGroupName(String orderingGroupName)
+ {
+ this.orderingGroupName = orderingGroupName;
+ }
+
+ public String getOrderingGroupName()
+ {
+ return orderingGroupName;
+ }
+
+ //let the session reset the ordering group
+ public void disableOrderingGroup()
+ {
+ if (isOrderingGroupEnabled)
+ {
+ setOrderingGroupEnabled(false);
+ parent.removeOrderingGroup(orderingGroupName);
+ }
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java 2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/jms/client/state/SessionState.java 2008-11-11 10:44:16 UTC (rev 5333)
@@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import javax.jms.MessageListener;
@@ -54,14 +53,13 @@
import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.Version;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-
/**
* State corresponding to a session. This state is acessible inside aspects/interceptors.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -108,6 +106,13 @@
//The distinguished message listener - for ASF
private MessageListener sessionListener;
+ /* Holding sequential counters for ordering groups.
+ * Each ordering group has a sequence counter used to generate sequence numbers for
+ * its messages. The sequence number of a message indicates the order in which the message
+ * is to be delivered.
+ */
+ private HashMap<String, OrderingGroupSeq> orderingGrpSeqMap = new HashMap<String, OrderingGroupSeq>();
+
//This is somewhat strange - but some of the MQ and TCK tests expect an XA session to behaviour as AUTO_ACKNOWLEDGE when not enlisted in
//a transaction
//This is the opposite behaviour as what is required when the XA session handles MDB delivery or when using the message bridge.
@@ -491,6 +496,25 @@
return "SessionState[" + sessionID + "]";
}
+ /**
+ * Generate the sequential number for the ordering group
+ */
+ public long getGroupSequenceNum(String grpName)
+ {
+ OrderingGroupSeq seq = orderingGrpSeqMap.get(grpName);
+ if (seq == null) {
+ seq = new OrderingGroupSeq();
+ orderingGrpSeqMap.put(grpName, seq);
+ }
+ return seq.getNext();
+ }
+
+ //remove the ordering group from map.
+ public void removeOrderingGroup(String orderingGroupName)
+ {
+ orderingGrpSeqMap.remove(orderingGroupName);
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -499,5 +523,22 @@
// Inner classes --------------------------------------------------------------------------------
+ //A sequence generator class. sequence starts from 1
+ public static class OrderingGroupSeq
+ {
+ private long counter;
+
+ public OrderingGroupSeq()
+ {
+ counter = 0;
+ }
+
+ public long getNext()
+ {
+ counter++;
+ return counter;
+ }
+ }
+
}
Modified: branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-11-11 09:24:58 UTC (rev 5332)
+++ branches/Branch_1416_merge/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-11-11 10:44:16 UTC (rev 5333)
@@ -28,6 +28,10 @@
import java.util.ListIterator;
import java.util.Set;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.MessagingTimeoutFactory;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Channel;
@@ -55,6 +59,7 @@
*
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
* @version <tt>$Revision$</tt> $Id: ChannelSupport.java,v 1.65
* 2006/06/27 19:44:39 timfox Exp $
*/
@@ -100,6 +105,8 @@
protected int maxSize;
protected SynchronizedInt messagesAdded;
+
+ protected OrderingGroupMonitor monitor = new OrderingGroupMonitor();
// Constructors ---------------------------------------------------------------------------------
@@ -154,6 +161,8 @@
// Each channel has its own copy of the reference
ref = ref.copy();
+
+ monitor.registerMessage(ref, null);
try
{
@@ -213,6 +222,9 @@
// Each channel has its own copy of the reference
ref = ref.copy();
+
+ //guarding against any ordering group messages.
+ monitor.registerMessage(ref, tx);
try
{
@@ -566,6 +578,7 @@
synchronized (lock)
{
+ monitor.unmarkSending(ref);
messageRefs.addFirst(ref, ref.getMessage().getPriority());
}
@@ -593,7 +606,6 @@
if (!getReceiversReady())
{
if (trace) { log.trace(this + " receivers not ready so not delivering"); }
-
return;
}
@@ -603,66 +615,107 @@
if (ref != null)
{
- // Attempt to push the ref to a receiver
-
- if (trace) { log.trace(this + " pushing " + ref); }
-
- Delivery del = distributor.handle(this, ref, null);
-
- setReceiversReady(del != null);
-
- if (del == null)
+ int status = monitor.isAvailable(ref);
+ if (status != OrderingGroupMonitor.OK)
{
- // No receiver, broken receiver or full receiver so we stop delivering
- if (trace) { log.trace(this + " got no delivery for " + ref + " so no receiver got the message. Stopping delivery."); }
-
- break;
- }
- else if (!del.isSelectorAccepted())
- {
- // No receiver accepted the message because no selectors matched, so we create
- // an iterator (if we haven't already created it) to iterate through the refs
- // in the channel. No delivery was really performed
-
+ if (trace)
+ {
+ log.trace("Hold sending off ordering group message " + ref);
+ }
+ //iterating time
if (iter == null)
{
iter = messageRefs.iterator();
-
//We just tried the first one, so we don't want to try it again
iter.next();
}
}
else
{
- if (trace) { log.trace(this + ": " + del + " returned for message " + ref); }
+ // Attempt to push the ref to a receiver
- // Receiver accepted the reference
+ if (trace)
+ {
+ log.trace(this + " pushing " + ref);
+ }
- synchronized (lock)
+ Delivery del = distributor.handle(this, ref, null);
+
+ setReceiversReady(del != null);
+
+ if (del == null)
{
+
+ // No receiver, broken receiver or full receiver so we stop delivering
+ if (trace)
+ {
+ log.trace(this + " got no delivery for " +
+ ref +
+ " so no receiver got the message. Stopping delivery.");
+ }
+
+ break;
+ }
+ else if (!del.isSelectorAccepted())
+ {
+ // No receiver accepted the message because no selectors matched, so we create
+ // an iterator (if we haven't already created it) to iterate through the refs
+ // in the channel. No delivery was really performed
+
+ if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+ {
+ log.warn("Warning! Using message selectors with ordering group can cause unpredicatable results!");
+ }
+
if (iter == null)
{
- if (trace) { log.trace(this + " removing first ref in memory"); }
+ iter = messageRefs.iterator();
- removeFirstInMemory();
+ // We just tried the first one, so we don't want to try it again
+ iter.next();
}
- else
+ }
+ else
+ {
+ if (trace)
{
- if (trace) { log.trace(this + " removed current message from iterator"); }
+ log.trace(this + ": " + del + " returned for message " + ref);
+ }
- iter.remove();
+ monitor.markSending(ref);
+
+ // Receiver accepted the reference
+ synchronized (lock)
+ {
+ if (iter == null)
+ {
+ if (trace)
+ {
+ log.trace(this + " removing first ref in memory");
+ }
+
+ removeFirstInMemory();
+ }
+ else
+ {
+ if (trace)
+ {
+ log.trace(this + " removed current message from iterator");
+ }
+
+ iter.remove();
+ }
}
+
+ deliveringCount.increment();
}
-
- deliveringCount.increment();
}
}
else
{
// No more refs in channel or only ones that don't match any selectors
if (trace) { log.trace(this + " no more refs to deliver "); }
-
- break;
+ break;
}
}
}
@@ -769,6 +822,22 @@
pm.addTransaction(tx);
}
}
+
+ MessageReference ref = d.getReference();
+ if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+ {
+ if (trace)
+ {
+ log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
+ }
+ synchronized (lock)
+ {
+ if (monitor.messageCompleted(ref))
+ {
+ deliverInternal();
+ }
+ }
+ }
}
protected InMemoryCallback getCallback(Transaction tx)
@@ -811,9 +880,32 @@
// Private --------------------------------------------------------------------------------------
+ //debug use, remove when done.
+ public static String getRefText(MessageReference ref)
+ {
+ String result = "<null>";
+ if (ref == null) return result;
+ Object rmsg = ref.getMessage();
+ if (rmsg instanceof JBossMessage)
+ {
+ JBossMessage msg = (JBossMessage)ref.getMessage();
+ if (msg instanceof TextMessage)
+ {
+ try
+ {
+ result = "(" + ((TextMessage)msg).getText() + ")";
+ }
+ catch (JMSException e)
+ {
+ }
+ }
+ }
+ return result;
+ }
+
private MessageReference nextReference(ListIterator iter) throws Throwable
{
- MessageReference ref;
+ MessageReference ref = null;
if (iter == null)
{
@@ -829,6 +921,7 @@
if (iter.hasNext())
{
ref = (MessageReference)iter.next();
+ //if (monitor.challengeSend(ref) == OrderingGroupMonitor.OK) break;
}
else
{
@@ -954,7 +1047,12 @@
public void afterRollback(boolean onePhase) throws Exception
{
- //NOOP
+ //tx rolled back, we need to inform the monitor
+ for(Iterator i = refsToAdd.iterator(); i.hasNext(); )
+ {
+ MessageReference ref = (MessageReference)i.next();
+ monitor.unmarkSending(ref);
+ }
}
public String toString()
More information about the jboss-cvs-commits
mailing list