[jboss-cvs] JBoss Messaging SVN: r5215 - in branches/Branch_JBMESSAGING_1416: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 30 02:55:52 EDT 2008
Author: gaohoward
Date: 2008-10-30 02:55:52 -0400 (Thu, 30 Oct 2008)
New Revision: 5215
Modified:
branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
Log:
JBMESSAGING-1416
Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-10-30 06:55:52 UTC (rev 5215)
@@ -224,8 +224,9 @@
// Each channel has its own copy of the reference
ref = ref.copy();
- dlog("registering -- ", ref);
monitor.registerMessage(ref, tx);
+
+ dlog("---registering msg: ", ref);
try
{
@@ -592,6 +593,7 @@
*
* @see org.jboss.messaging.core.contract.Channel#deliver()
*/
+ /*debug use, delete them!!!*/
private void dlog(String lgmsg)
{
log.error("(*)-" + lgmsg);
@@ -600,6 +602,7 @@
{
dlog(lgmsg + this.getRefText(r));
}
+ /*debug*/
protected void deliverInternal()
{
if (trace) { log.trace(this + " was prompted delivery"); }
@@ -622,7 +625,6 @@
{
ref = nextReference(iter);
- dlog("got message - ", ref);
if (ref != null)
{
@@ -631,14 +633,11 @@
int status = monitor.isAvailable(ref);
if (status != OrderingGroupMonitor.OK)
{
- dlog("leave this message alone and iterating to next message");
//iterating time
if (iter == null)
{
- dlog("iter still null when inspecting this message ", ref);
iter = messageRefs.iterator();
//We just tried the first one, so we don't want to try it again
- dlog("call next on iter to skip over the current one", ref);
iter.next();
}
}
@@ -651,15 +650,12 @@
log.trace(this + " pushing " + ref);
}
- dlog("everything looks fine, start to deliver the message ", ref);
Delivery del = distributor.handle(this, ref, null);
- dlog("got the result del " + del);
setReceiversReady(del != null);
if (del == null)
{
- dlog("del is null, so we release the sending count for ", ref);
// No receiver, broken receiver or full receiver so we stop delivering
if (trace)
@@ -669,7 +665,6 @@
" so no receiver got the message. Stopping delivery.");
}
- dlog("as this message is not delivered, we do nothing for it, this round of delivery stopped. ", ref);
break;
}
else if (!del.isSelectorAccepted())
@@ -678,12 +673,10 @@
// an iterator (if we haven't already created it) to iterate through the refs
// in the channel. No delivery was really performed
- dlog("delivery dropped by the selector, so we also drop our message too.");
monitor.dropSend(ref);//leaving a 'hole'
if (iter == null)
{
- dlog("so this is the first msg dropped, iterating ...");
iter = messageRefs.iterator();
// We just tried the first one, so we don't want to try it again
@@ -697,7 +690,6 @@
log.trace(this + ": " + del + " returned for message " + ref);
}
- dlog("message accepted so remove from memory: ", ref);
monitor.markSending(ref);
// Receiver accepted the reference
@@ -710,7 +702,6 @@
log.trace(this + " removing first ref in memory");
}
- dlog("still first in mem, removing: ", ref);
removeFirstInMemory();
}
else
@@ -720,7 +711,6 @@
log.trace(this + " removed current message from iterator");
}
- dlog("already iterated, removing: ", ref);
iter.remove();
}
}
@@ -731,13 +721,12 @@
}
else
{
- dlog("We have no more message to deliver for this time, stop this round and return.");
// No more refs in channel or only ones that don't match any selectors
if (trace) { log.trace(this + " no more refs to deliver "); }
break;
}
}
- dlog("deliverInteral() actually exit.");
+ log.error("======delever end==========");
}
catch (Throwable t)
{
@@ -843,23 +832,19 @@
}
}
- dlog("message acknowledged for ", d.getReference());
- monitor.messageCompleted(d.getReference());
-
- if (monitor.hasMessageInQueue())
+ synchronized (lock)
{
- dlog("we do still have messages in queue, trigger the delivery again");
- synchronized (lock)
+ dlog("---acknowledging ", d.getReference());
+ if (monitor.messageCompleted(d.getReference()))
{
- dlog("begin trigger delivering...");
+ log.error("-----more og msg, trigger...");
deliverInternal();
- dlog("triggered delivering done...");
}
+ else
+ {
+ log.error("---no more og msg, no trigger!");
+ }
}
- else
- {
- dlog("we don't have any messages in order queue, so don't trigger more delivery.");
- }
}
protected InMemoryCallback getCallback(Transaction tx)
@@ -940,10 +925,8 @@
// We need to extend it to work with refs from the db
//We have an iterator - this means we are iterating through the queue to find a ref that matches
- log.error("---nextRef call, loop for next");
if (iter.hasNext())
{
- log.error("---nextRef call, found one");
ref = (MessageReference)iter.next();
//if (monitor.challengeSend(ref) == OrderingGroupMonitor.OK) break;
}
Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2008-10-30 06:55:52 UTC (rev 5215)
@@ -141,16 +141,19 @@
{
boolean result = false;
ReferenceHolder holder = sortedList.getFirst();
+ log.error("---haspending, holder: " + holder);
if (holder != null)
{
if (holder.isPending())
{
//true if the sortedList has more to offer.
+ log.error("---see our size: " + sortedList.size());
result = sortedList.size() > 1;
}
else
{
//means we still have the first un-sent.
+ log.error("--- first is not sent yet.");
result = true;
}
}
@@ -207,6 +210,11 @@
}
}
+ public String getGroupName()
+ {
+ return groupName;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2008-10-30 06:55:52 UTC (rev 5215)
@@ -120,13 +120,15 @@
* it is called when a message is acked, commited or rollback
* once the message is completed, the next one in a ordering
* group becomes deliverable.
+ * return if there is more messages available after this one.
*/
- public void messageCompleted(MessageReference ref)
+ public boolean messageCompleted(MessageReference ref)
{
String grpName = extractGroupName(ref);
if (grpName == null)
{
- return;
+ //not a ordering group message
+ return false;
}
synchronized (orderingGroups)
{
@@ -135,13 +137,14 @@
{
group.unregister(ref);
}
+ return this.hasMessageInQueue();
}
}
/**
* Check if there is any pending messages in any group.
*/
- public boolean hasMessageInQueue()
+ private boolean hasMessageInQueue()
{
boolean result = false;
synchronized (orderingGroups)
@@ -150,6 +153,7 @@
while (iter.hasNext())
{
OrderingGroup group = iter.next();
+ log.error("--checking group: " + group.getGroupName());
if (group.hasPendingMessage())
{
result = true;
Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java 2008-10-29 22:51:42 UTC (rev 5214)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupConnectionConsumerTest.java 2008-10-30 06:55:52 UTC (rev 5215)
@@ -96,14 +96,12 @@
consumerConn.start();
Session sessCons1 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session sessCons2 = consumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
OrderingGroupMessageListener listener = new OrderingGroupMessageListener(this);
sessCons1.setMessageListener(listener);
- sessCons2.setMessageListener(listener);
- ServerSessionPool pool = new OrderingServerSessionPool(sessCons1, sessCons2);
+ ServerSessionPool pool = new OrderingServerSessionPool(sessCons1);
JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 1);
@@ -133,8 +131,15 @@
TextMessage txm = mList.get(i);
assertEquals(txm.getText(), "testing" + i);
}
-
+ //allow consumer thread gracefully shutdown
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch (InterruptedException e)
+ {
+ }
cc.close();
consumerConn.close();
@@ -147,11 +152,83 @@
if (consumerConn != null) consumerConn.close();
if (producerConn != null) producerConn.close();
- removeAllMessages(queue1.getQueueName(), true, 0);
+ }
+ }
+
+
+ /*
+ * Make sure the ordering group messages are received in order
+ * thru a ConnectionConsumer in transaction mode.
+ */
+ /*
+ public void testTransactedReceive() throws Exception
+ {
+ if (ServerManagement.isRemote()) return;
+
+ Connection consumerConn = null;
+
+ Connection producerConn = null;
+
+ try
+ {
+ consumerConn = cf.createConnection();
+
+ consumerConn.start();
+
+ Session sessCons1 = consumerConn.createSession(true, Session.SESSION_TRANSACTED);
+
+ TxOrderingGroupMessageListener listener1 = new TxOrderingGroupMessageListener(this, sessCons1);
+
+ sessCons1.setMessageListener(listener1);
+
+ ServerSessionPool pool = new MockServerSessionPool(sessCons1);
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)consumerConn.createConnectionConsumer(queue1, null, pool, 5);
+
+ producerConn = cf.createConnection();
+
+ Session sessProd = producerConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ JBossMessageProducer prod = (JBossMessageProducer)sessProd.createProducer(queue1);
+ prod.enableOrderingGroup(null);
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage m = sessProd.createTextMessage("testing" + i);
+ prod.send(m, Message.DEFAULT_DELIVERY_MODE, i%10, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ //waiting enough time to allow delivery complete.
+ msgLatch.attempt(10000);
+
+ //check the order
+ assertEquals(NUM_MESSAGES, mList.size());
+
+ for (int i = 0; i < NUM_MESSAGES; ++i)
+ {
+ TextMessage txm = mList.get(i);
+ assertEquals(txm.getText(), "testing" + i);
+ }
+
+ checkEmpty(queue1);
+
+ cc.close();
+
+ consumerConn.close();
+ consumerConn = null;
+ producerConn.close();
+ producerConn = null;
}
+ finally
+ {
+ if (consumerConn != null) consumerConn.close();
+ if (producerConn != null) producerConn.close();
+
+ }
}
-
+*/
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -176,6 +253,7 @@
{
try
{
+ System.err.println("===== ======= ======== ========== message received: " + ((TextMessage)message).getText());
owner.addReceived((TextMessage)message);
}
catch (Exception e)
@@ -185,27 +263,58 @@
}
}
+ class TxOrderingGroupMessageListener implements MessageListener
+ {
+
+ OrderingGroupConnectionConsumerTest owner;
+ long counter = 0;
+ Session sessRef;
+
+ TxOrderingGroupMessageListener(OrderingGroupConnectionConsumerTest theTest, Session sess)
+ {
+ owner = theTest;
+ sessRef = sess;
+ }
+
+ public synchronized void onMessage(Message message)
+ {
+ try
+ {
+ System.err.println("===== ======= ======== ========== coutner: " + counter);
+ //roll back once for every 5 messages
+ if (counter%5 == 0) {
+ System.err.println("===== ======= ======== ========== rolling back : " + ((TextMessage)message).getText());
+ sessRef.rollback();
+ }
+ else
+ {
+ System.err.println("===== ======= ======== ========== received : " + ((TextMessage)message).getText());
+ owner.addReceived((TextMessage)message);
+ System.err.println("===== ======= ======== ========== commiting : " + ((TextMessage)message).getText());
+ sessRef.commit();
+ }
+ counter++;
+ System.err.println("===coutner: " + counter);
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
+ }
+ }
+
class OrderingServerSessionPool implements ServerSessionPool
{
- private ServerSession serverSession1;
- private ServerSession serverSession2;
- private long flag;
+ private ServerSession serverSession;
- OrderingServerSessionPool(Session sess1, Session sess2)
+ OrderingServerSessionPool(Session sess)
{
- serverSession1 = new MockServerSession(sess1);
- serverSession2 = new MockServerSession(sess2);
- flag = 0L;
+ serverSession = new MockServerSession(sess);
}
public synchronized ServerSession getServerSession() throws JMSException
{
- flag++;
- if (flag%2 == 0)
- {
- return serverSession1;
- }
- return serverSession2;
+ return serverSession;
}
}
More information about the jboss-cvs-commits
mailing list