[jboss-cvs] JBoss Messaging SVN: r1661 - in branches/Branch_1_0_1_SP: src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/stress
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 30 03:01:41 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-11-30 03:01:35 -0500 (Thu, 30 Nov 2006)
New Revision: 1661
Modified:
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
Log:
fix for http://jira.jboss.org/jira/browse/JBMESSAGING-660 and many TRACE logging improvments
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -217,8 +217,21 @@
*/
public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
{
- if (trace) { log.trace(this + " receiving " + msgs.size() + " message(s) from the remoting layer"); }
-
+ if (trace)
+ {
+ StringBuffer sb = new StringBuffer(this + " receiving [");
+ for(int i = 0; i < msgs.size(); i++)
+ {
+ sb.append(((MessageProxy)msgs.get(i)).getMessage().getMessageID());
+ if (i < msgs.size() - 1)
+ {
+ sb.append(",");
+ }
+ }
+ sb.append("] from the remoting layer");
+ log.trace(sb.toString());
+ }
+
synchronized (mainLock)
{
if (closed)
@@ -233,7 +246,7 @@
buffer.addAll(msgs);
- if (trace) { log.trace(this + " added messages to the buffer"); }
+ if (trace) { log.trace(this + " added message(s) to the buffer"); }
boolean full = buffer.size() >= bufferSize;
@@ -341,9 +354,9 @@
{
sessionExecutor.execute(new Closer(result));
- if (trace) { log.trace("blocking wait for Closer execution"); }
+ if (trace) { log.trace(this + " blocking wait for Closer execution"); }
result.getResult();
- if (trace) { log.trace("got Closer result"); }
+ if (trace) { log.trace(this + " got Closer result"); }
}
catch (InterruptedException e)
{
@@ -426,7 +439,7 @@
}
}
- if (trace) { log.trace("received " + m + " after being blocked on buffer"); }
+ if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
// If message is expired we still call pre and post deliver. This makes sure the
// message is acknowledged so it gets removed from the queue/subscription.
@@ -436,14 +449,14 @@
if (!m.getMessage().isExpired())
{
- if (trace) { log.trace("message " + m + " is not expired, pushing it to the caller"); }
+ if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
break;
}
if (trace)
{
- log.trace("message expired, discarding");
+ log.trace(this + ": message expired, discarding");
}
// the message expired, so discard the message, adjust timeout and reenter the buffer
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -753,7 +753,7 @@
if (trace) { log.trace("sent " + m); }
}
- if (trace) { log.trace("done the sends"); }
+ if (trace) { log.trace(tx + ": done the sends"); }
// Then ack the acks
@@ -775,7 +775,7 @@
if (trace) { log.trace("acked " + ack.getMessageID()); }
}
- if (trace) { log.trace("done the acks"); }
+ if (trace) { log.trace(tx + ": done the acks"); }
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -772,7 +772,20 @@
try
{
- if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
+ if (trace)
+ {
+ StringBuffer sb = new StringBuffer(ServerConsumerEndpoint.this + " handing [");
+ for(int i = 0; i < list.size(); i++)
+ {
+ sb.append(((MessageProxy)list.get(i)).getMessage().getMessageID());
+ if (i < list.size() - 1)
+ {
+ sb.append(",");
+ }
+ }
+ sb.append("] over to the remoting layer");
+ log.trace(sb.toString());
+ }
ClientDelivery del = new ClientDelivery(list, id);
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -721,11 +721,12 @@
protected void acknowledgeInternal(AckInfo ackInfo) throws Throwable
{
- //If the message was delivered via a connection consumer then the message needs to be acked
- //via the original consumer that was used to feed the connection consumer - which
- //won't be one of the consumers of this session
- //Therefore we always look in the global map of consumers held in the server peer
- ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
+ // If the message was delivered via a connection consumer then the message needs to be acked
+ // via the original consumer that was used to feed the connection consumer - which won't be
+ // one of the consumers of this session. Therefore we always look in the global map of
+ // consumers held in the server peer.
+ ServerConsumerEndpoint consumer =
+ connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
if (consumer == null)
{
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -649,146 +649,147 @@
ListIterator iter = null;
MessageReference ref = null;
-
- while (true)
- {
- synchronized (refLock)
- {
+
+ synchronized (refLock)
+ {
+ while (true)
+ {
if (iter == null)
{
- ref = (MessageReference) messageRefs.peekFirst();
+ ref = (MessageReference)messageRefs.peekFirst();
}
else
{
if (iter.hasNext())
- {
+ {
ref = (MessageReference)iter.next();
- }
+ }
else
{
ref = null;
}
}
- }
- if (ref != null)
- {
- // Check if message is expired (we also do this on the client
- // side)
- // If so ack it from the channel
- if (ref.isExpired())
+ if (ref != null)
{
- if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+ if (trace) { log.trace(this + " pushing " + ref); }
- // remove and acknowledge it
- if (iter == null)
+ // Check if message is expired (we also do this on the client side). If so ack it
+ // from the channel.
+
+ if (ref.isExpired())
{
- removeFirstInMemory();
+ if (trace) { log.trace("Message reference: " + ref + " has expired"); }
+
+ // remove and acknowledge it
+ if (iter == null)
+ {
+ removeFirstInMemory();
+ }
+ else
+ {
+ iter.remove();
+ }
+
+ Delivery delivery = new SimpleDelivery(this, ref, true);
+
+ acknowledgeInternal(delivery);
}
else
{
- iter.remove();
- }
+ // Reference is not expired
- Delivery delivery = new SimpleDelivery(this, ref, true);
+ // Attempt to push the ref to a receiver
+ Delivery del = push(ref);
- acknowledgeInternal(delivery);
- }
- else
- {
- // Reference is not expired
+ if (del == null)
+ {
+ // No receiver, broken receiver or full receiver so we stop delivering; also
+ // we need to decrement the delivery count, as no real delivery has been
+ // actually performed
- // Attempt to push the ref to a receiver
- Delivery del = push(ref);
+ if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
- if (del == null)
- {
- // No receiver, broken receiver or full receiver so we stop delivering; also
- // we need to decrement the delivery count, as no real delivery has been
- // actually performed
+ receiversReady = false;
- if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
-
- receiversReady = false;
-
- return;
- }
- else if (!del.isSelectorAccepted())
- {
- // No receiver accepted the message because no selectors matched, so we create
- // an iterator (if we haven't already created it) to iterate through the refs
- // in the channel.
-
- // TODO Note that this is only a partial solution since if there are messages
- // paged to storage it won't try those - i.e. it will only iterate through
- // those refs in memory. Dealing with refs in storage is somewhat tricky since
- // we can't just load them and iterate through them since we might run out of
- // memory, so we will need to load individual refs from storage given the
- // selector expressions. Secondly we should also introduce some in memory
- // indexes here to prevent having to iterate through all the refs every time.
- // Having said all that, having consumers on a queue that don't match many
- // messages is an antipattern and should be avoided by the user.
- if (iter == null)
+ return;
+ }
+ else if (!del.isSelectorAccepted())
{
- iter = messageRefs.iterator();
- }
- }
- else
- {
- if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-
- // Receiver accepted the reference
+ // No receiver accepted the message because no selectors matched, so we create
+ // an iterator (if we haven't already created it) to iterate through the refs
+ // in the channel.
- // We must synchronize here to cope with another race condition where message
- // is cancelled/acked in flight while the following few actions are being
- // performed. e.g. delivery could be cancelled acked after being removed from
- // state but before delivery being added (observed).
- synchronized (del)
+ // TODO Note that this is only a partial solution since if there are messages
+ // paged to storage it won't try those - i.e. it will only iterate through
+ // those refs in memory. Dealing with refs in storage is somewhat tricky since
+ // we can't just load them and iterate through them since we might run out of
+ // memory, so we will need to load individual refs from storage given the
+ // selector expressions. Secondly we should also introduce some in memory
+ // indexes here to prevent having to iterate through all the refs every time.
+ // Having said all that, having consumers on a queue that don't match many
+ // messages is an antipattern and should be avoided by the user.
+ if (iter == null)
+ {
+ iter = messageRefs.iterator();
+ }
+ }
+ else
{
- if (trace) { log.trace(this + " incrementing delivery count for " + del); }
+ if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
- // FIXME - It's actually possible the delivery could be
- // cancelled before it reaches
- // here, in which case we wouldn't get a delivery but we
- // still need to increment the
- // delivery count
- // All the problems related to these race conditions and
- // fiddly edge cases will disappear
- // once we do
- // http://jira.jboss.com/jira/browse/JBMESSAGING-355
- // This will make life a lot easier
+ // Receiver accepted the reference
- if (!del.isCancelled())
+ // We must synchronize here to cope with another race condition where message
+ // is cancelled/acked in flight while the following few actions are being
+ // performed. e.g. delivery could be cancelled acked after being removed from
+ // state but before delivery being added (observed).
+ synchronized (del)
{
- if (iter == null)
- {
- removeFirstInMemory();
- }
- else
- {
- iter.remove();
- }
+ // FIXME - It's actually possible the delivery could be
+ // cancelled before it reaches
+ // here, in which case we wouldn't get a delivery but we
+ // still need to increment the
+ // delivery count
+ // All the problems related to these race conditions and
+ // fiddly edge cases will disappear
+ // once we do
+ // http://jira.jboss.com/jira/browse/JBMESSAGING-355
+ // This will make life a lot easier
- // delivered
- if (!del.isDone())
+ if (!del.isCancelled())
{
- // Add the delivery to state
- synchronized (deliveryLock)
+ if (iter == null)
{
- deliveries.add(del);
+ removeFirstInMemory();
}
+ else
+ {
+ iter.remove();
+ if (trace) { log.trace(this + " removed current message from iterator"); }
+ }
+
+ // delivered
+ if (!del.isDone())
+ {
+ synchronized (deliveryLock)
+ {
+ deliveries.add(del);
+ if (trace) { log.trace(this + " starting to track " + del); }
+ }
+ }
}
}
}
}
}
+ else
+ {
+ // No more refs in channel
+ if (trace) { log.trace(this + " no more refs to deliver "); }
+ break;
+ }
}
- else
- {
- // No more refs in channel
- if (trace) { log.trace(this + " no more refs to deliver "); }
- break;
- }
}
}
catch (Throwable t)
@@ -845,7 +846,7 @@
if (ref.isReliable() && recoverable)
{
// Reliable message in a recoverable state - also add to db
- if (trace) { log.trace(this + "adding " + ref + " to database non-transactionally"); }
+ if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
pm.addReference(channelID, ref, null);
}
@@ -862,20 +863,13 @@
}
else
{
- if (trace) { log.trace(this + "adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
-
checkMemory();
if (ref.isReliable() && !acceptReliableMessages)
{
- // this transaction has no chance to succeed, since a reliable
- // message cannot be
- // safely stored by a non-recoverable state, so doom the
- // transaction
- if (trace)
- {
- log.trace(this + " cannot handle reliable messages, dooming the transaction");
- }
+ // This transaction has no chance to succeed, since a reliable message cannot be
+ // safely stored by a non-recoverable state, so doom the transaction.
+ if (trace) { log.trace(this + " cannot handle reliable messages, dooming the transaction"); }
tx.setRollbackOnly();
}
else
@@ -883,18 +877,12 @@
// add to post commit callback
ref.setOrdering(messageOrdering.increment());
this.getCallback(tx).addRef(ref);
- if (trace)
- {
- log.trace(this + " added transactionally " + ref
- + " in memory");
- }
+ if (trace) { log.trace(this + " added " + ref + " to memory transactional callback, in transaction: " + tx); }
}
if (ref.isReliable() && recoverable)
{
// Reliable message in a recoverable state - also add to db
- if (trace) { log.trace(this + "adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
-
pm.addReference(channelID, ref, tx);
}
}
@@ -929,10 +917,7 @@
protected void cancelInternal(Delivery del) throws Exception
{
- if (trace)
- {
- log.trace(this + " cancelling " + del + " in memory");
- }
+ if (trace) { log.trace(this + " cancelling " + del + " in memory"); }
boolean removed;
@@ -969,13 +954,13 @@
}
}
- //We may need to update the delivery count in the database
+ // We may need to update the delivery count in the database
if (ref.isReliable())
{
pm.updateDeliveryCount(this.channelID, ref);
}
- if (trace) { log.trace(this + " added " + ref + " back into state"); }
+ if (trace) { log.trace(this + " added " + ref + " back into memory, ready for redelivery"); }
}
}
@@ -983,7 +968,7 @@
{
synchronized (refLock)
{
- MessageReference result = (MessageReference) messageRefs.removeFirst();
+ MessageReference result = (MessageReference)messageRefs.removeFirst();
if (refsInStorage > 0)
{
@@ -999,7 +984,8 @@
paging = false;
}
- return (MessageReference) result;
+ if (trace) { log.trace(this + " removing first message in memory, which is " + result); }
+ return (MessageReference)result;
}
}
@@ -1091,11 +1077,7 @@
{
messageRefs.addLast(ref, ref.getPriority());
- if (trace)
- {
- log.trace(this + " added " + ref
- + " non-transactionally in memory");
- }
+ if (trace) { log.trace(this + " added " + ref + " in memory"); }
if (messageRefs.size() == fullSize)
{
@@ -1591,7 +1573,7 @@
{
MessageReference ref = (MessageReference) iter.next();
- if (trace) { log.trace(this + ": adding " + ref + " to non-recoverable state"); }
+ if (trace) { log.trace(this + " adding " + ref + " to memory"); }
try
{
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -1621,11 +1621,11 @@
{
if (tx != null)
{
- //In a tx so we just add the ref in the tx in memory for now
+ // We are in a transaction so we just add the ref in the tx in memory for now
TransactionCallback callback = getCallback(tx);
-
callback.addReferenceToAdd(channelID, ref);
+ if (trace) { log.trace(this + " added " + ref + " to database transactional callback, in transaction: " + tx); }
}
else
{
@@ -2670,6 +2670,8 @@
//TODO - A slight optimisation - it's possible we have refs referring to the same message
//so we will end up acquiring the lock more than once which is unnecessary
//If find unique set of messages can avoid this
+ if (trace) { log.trace("handling before commit 1PC, tx: " + tx); }
+
List allRefs = new ArrayList(refsToAdd.size() + refsToRemove.size());
Iterator iter = refsToAdd.iterator();
while (iter.hasNext())
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/JMSTest.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -194,6 +194,45 @@
conn.close();
}
+ public void test_Persistent_Transactional_Send() throws Exception
+ {
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ Queue queue = (Queue)ic.lookup("/queue/JMSTestQueue");
+
+ Connection conn = cf.createConnection();
+
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageProducer prod = session.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage m = session.createTextMessage("message one");
+ prod.send(m);
+ m = session.createTextMessage("message two");
+ prod.send(m);
+
+ session.commit();
+
+ conn.close();
+
+ conn = cf.createConnection();
+
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue);
+
+ conn.start();
+
+ TextMessage rm = (TextMessage)cons.receive();
+ assertEquals("message one", rm.getText());
+ rm = (TextMessage)cons.receive();
+ assertEquals("message two", rm.getText());
+
+ conn.close();
+ }
+
+
public void test_NonPersistent_Transactional_Acknowledgment() throws Exception
{
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
@@ -227,7 +266,6 @@
conn.close();
}
-
public void test_Asynchronous_to_Client() throws Exception
{
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java 2006-11-30 02:09:13 UTC (rev 1660)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/stress/ConcurrentCloseStressTest.java 2006-11-30 08:01:35 UTC (rev 1661)
@@ -41,9 +41,8 @@
/**
* This test was added to test regression on http://jira.jboss.com/jira/browse/JBMESSAGING-660
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision:$</tt>
- * <p/>
- * $Id:$
+ *
+ * $Id:$
*/
public class ConcurrentCloseStressTest extends MessagingTestCase
{
@@ -101,8 +100,8 @@
for (int i = 0; i < 20; i++)
{
- producerThread[i] = new ProducerThread(connectionProducer, queue);
- readerThread[i] = new ReaderThread(connectionReader, queue);
+ producerThread[i] = new ProducerThread(i, connectionProducer, queue);
+ readerThread[i] = new ReaderThread(i, connectionReader, queue);
threads[i] = producerThread[i];
threads[i+20] = readerThread[i];
}
@@ -120,6 +119,7 @@
boolean hasFailure=false;
+
for (int i = 0; i < 40; i++)
{
if (threads[i].exceptions.size() > 0)
@@ -143,6 +143,7 @@
static class TestThread extends Thread
{
ArrayList exceptions = new ArrayList();
+ protected int index;
}
@@ -153,8 +154,9 @@
Queue queue;
- public ReaderThread(Connection conn, Queue queue) throws Exception
+ public ReaderThread(int index, Connection conn, Queue queue) throws Exception
{
+ this.index = index;
this.conn = conn;
this.queue = queue;
}
@@ -166,7 +168,7 @@
int commitCounter = 0;
try
{
- Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer((Destination)queue);
while (true)
@@ -177,24 +179,25 @@
{
break;
}
- log.info("read message " + message.getText());
+ log.debug("read message " + message.getText());
// alternating commits and rollbacks
- if ( (commitCounter++) % 2 == 0 )
+ if ( (commitCounter++) % 2 == 0)
{
- log.info("commit");
+ log.debug("commit");
session.commit();
}
else
{
- log.info("rollback");
+ log.debug("rollback");
session.rollback();
}
- if (messageCounter%7 == 0 )
+ if (messageCounter%7 == 0)
{
session.close();
- session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);
+
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer((Destination)queue);
}
}
@@ -207,6 +210,8 @@
{
e.printStackTrace();
exceptions.add(e);
+// log.debug("ReaderThread " + index + " died");
+// System.exit(1);
}
}
@@ -221,8 +226,9 @@
Queue queue;
- public ProducerThread(Connection conn, Queue queue) throws Exception
+ public ProducerThread(int index, Connection conn, Queue queue) throws Exception
{
+ this.index = index;
this.conn = conn;
this.queue = queue;
}
@@ -234,21 +240,21 @@
{
try
{
- Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = sess.createProducer((Destination)queue);
for (int j = 0; j < 20; j++)
{
producer.send(sess.createTextMessage("Message " + i + ", " + j));
- log.info("Sending message " + j + " on i=" + i);
- if (j%2==0)
+ log.debug("Message " + j + " on i=" + i + " sent");
+ if (j % 2 == 0)
{
- log.info("commit");
+ log.debug("commit");
sess.commit();
}
else
{
- log.info("rollback");
+ log.debug("rollback");
sess.rollback();
}
@@ -262,6 +268,8 @@
{
e.printStackTrace();
exceptions.add(e);
+// log.debug("ProducerThread " + index + " died");
+// System.exit(1);
}
}
}
More information about the jboss-cvs-commits
mailing list