[jboss-cvs] JBoss Messaging SVN: r7795 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 1 01:19:49 EDT 2009
Author: gaohoward
Date: 2009-09-01 01:19:48 -0400 (Tue, 01 Sep 2009)
New Revision: 7795
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
Log:
JBMESSAGING-1728
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-09-01 05:19:48 UTC (rev 7795)
@@ -162,8 +162,6 @@
// Each channel has its own copy of the reference
ref = ref.copy();
-
- monitor.registerMessage(ref);
try
{
@@ -223,9 +221,6 @@
// Each channel has its own copy of the reference
ref = ref.copy();
-
- //guarding against any ordering group messages.
- monitor.registerMessage(ref);
try
{
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2009-09-01 05:19:48 UTC (rev 7795)
@@ -82,7 +82,7 @@
}
try
{
- holder = new ReferenceHolder(ref);
+ holder = new ReferenceHolder(mid);
}
catch (JMSException e)
{
@@ -201,18 +201,15 @@
class ReferenceHolder
{
+ private Long mid;
- private Long seq;
-
- private MessageReference ref;
-
private long refCount;
private long pendingSentCount;
- public ReferenceHolder(MessageReference r) throws JMSException
+ public ReferenceHolder(Long id) throws JMSException
{
- ref = r;
+ mid = id;
refCount = 1;
pendingSentCount = 0;
}
@@ -283,13 +280,12 @@
public boolean matchMessage(MessageReference newRef)
{
Long mid1 = newRef.getMessage().getMessageID();
- Long mid2 = ref.getMessage().getMessageID();
- return mid1.equals(mid2);
+ return mid1.equals(mid);
}
- public MessageReference getMessageRef()
+ public Long getMessageID()
{
- return ref;
+ return mid;
}
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2009-09-01 05:19:48 UTC (rev 7795)
@@ -85,9 +85,11 @@
}
/**
- * If ref is not in our registry, just return true.
+ * If ref is not in our registry, just return OK.
* If in our registry, check if the ref is the first of the group.
- * return true if it at the first place. return false otherwise.
+ * return OK if it is at the first place.
+ * otherwise return NOT_OK_BEING_SENT if the ref is being sent or
+ * NOT_OK_NOT_FIRST is the ref is not at the first place.
*/
public int isAvailable(MessageReference ref)
{
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2009-09-01 05:19:48 UTC (rev 7795)
@@ -267,9 +267,6 @@
MessageReference ref = addFromRefInfo(info, refMap);
- //https://jira.jboss.org/jira/browse/JBMESSAGING-1664
- monitor.registerMessage(ref);
-
if (recoverable && ref.getMessage().isReliable())
{
loadedReliable = true;
@@ -386,6 +383,9 @@
}
else
{
+ //guarding against any ordering group messages.
+ monitor.registerMessage(ref);
+
super.addReferenceInMemory(ref);
if (messageRefs.size() == fullSize)
@@ -511,9 +511,6 @@
ReferenceInfo info = (ReferenceInfo)iter.next();
MessageReference added = addFromRefInfo(info, refMap);
- //note, we registered the ref 'after' it has been added to the list
- //it is safe as long as the caller of this method is synchronized on lock.
- monitor.registerMessage(added);
}
}
@@ -534,6 +531,7 @@
//Schedule the delivery if necessary, or just add to the in memory queue
if (!checkAndSchedule(ref))
{
+ monitor.registerMessage(ref);
messageRefs.addLast(ref, ref.getMessage().getPriority());
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java 2009-08-31 07:03:58 UTC (rev 7794)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java 2009-09-01 05:19:48 UTC (rev 7795)
@@ -625,6 +625,174 @@
}
}
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-11728
+ //test ordering while depaging.
+ public void testOrderingGroupOnPaging3() throws Exception
+ {
+ //can't run invm mode.
+ if (!this.isRemote()) return;
+
+ ServerManagement.deployQueue("pagingQ", null, 5, 2, 2);
+
+ Queue queue = (Queue)ic.lookup("/queue/pagingQ");
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer sender = (JBossMessageProducer)session.createProducer(queue);
+
+ sender.enableOrderingGroup("my-group");
+
+ final int numMessages = 20;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage m = session.createTextMessage("Hello" + i);
+ sender.send(m, DeliveryMode.PERSISTENT, 5, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ TextMessage rm = null;
+ for (int i = 0; i < 2; i++)
+ {
+ rm = (TextMessage)consumer.receive(5000);
+ assertNotNull(rm);
+ assertEquals("Hello" + i, rm.getText());
+ }
+
+ //receive 3
+ rm = (TextMessage)consumer.receive(5000);
+ assertEquals("Hello2", rm.getText());
+
+ //add a new one
+ sender.send(session.createTextMessage("NewMessage"));
+
+ //receive all left.
+ for (int i = 3; i < numMessages; i++)
+ {
+ rm = (TextMessage)consumer.receive(5000);
+ assertNotNull(rm);
+ assertEquals("Hello" + i, rm.getText());
+ }
+
+ //last one
+ rm = (TextMessage)consumer.receive(5000);
+ assertEquals("NewMessage", rm.getText());
+
+ rm = (TextMessage)consumer.receive(5000);
+ assertNull(rm);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("pagingQ");
+ }
+ catch(Exception ignore)
+ {
+
+ }
+ }
+ }
+
+ //https://jira.jboss.org/jira/browse/JBMESSAGING-11728
+ //test ordering while depaging (messages sent with various priorities)
+ public void testOrderingGroupOnPaging4() throws Exception
+ {
+ //can't run invm mode.
+ if (!this.isRemote()) return;
+
+ ServerManagement.deployQueue("pagingQ", null, 5, 2, 2);
+
+ Queue queue = (Queue)ic.lookup("/queue/pagingQ");
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer sender = (JBossMessageProducer)session.createProducer(queue);
+
+ sender.enableOrderingGroup("my-group");
+
+ final int numMessages = 20;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage m = session.createTextMessage("Hello" + i);
+ sender.send(m, DeliveryMode.PERSISTENT, i%10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ TextMessage rm = null;
+ for (int i = 0; i < 2; i++)
+ {
+ rm = (TextMessage)consumer.receive(5000);
+ assertNotNull(rm);
+ assertEquals("Hello" + i, rm.getText());
+ }
+
+ //receive 3
+ rm = (TextMessage)consumer.receive(5000);
+ assertEquals("Hello2", rm.getText());
+
+ //add a new one
+ sender.send(session.createTextMessage("NewMessage"));
+
+ //receive all left.
+ for (int i = 3; i < numMessages; i++)
+ {
+ rm = (TextMessage)consumer.receive(5000);
+ assertNotNull(rm);
+ assertEquals("Hello" + i, rm.getText());
+ }
+
+ //last one
+ rm = (TextMessage)consumer.receive(5000);
+ assertEquals("NewMessage", rm.getText());
+
+ rm = (TextMessage)consumer.receive(5000);
+ assertNull(rm);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("pagingQ");
+ }
+ catch(Exception ignore)
+ {
+
+ }
+ }
+ }
+
//https://jira.jboss.org/jira/browse/JBMESSAGING-1726
//send one ordering group message and receive it, then resend it out
//using another group name, the group name should not be changed.
More information about the jboss-cvs-commits
mailing list