[jboss-cvs] JBoss Messaging SVN: r5027 - in branches/Branch_1_4: src/main/org/jboss/messaging/core/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 25 08:48:01 EDT 2008
Author: ataylor
Date: 2008-09-25 08:48:01 -0400 (Thu, 25 Sep 2008)
New Revision: 5027
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1282 - we always add a tx to the database evenif no persistent messages are sent/received.
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2008-09-25 12:48:01 UTC (rev 5027)
@@ -85,6 +85,8 @@
boolean idExists(String messageID) throws Exception;
+ void addTransaction(Transaction tx);
+
// Interface value classes ----------------------------------------------------------------------
class MessageChannelPair
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-09-25 12:48:01 UTC (rev 5027)
@@ -255,6 +255,10 @@
pm.addReference(channelID, ref, tx);
}
+ else if(recoverable)
+ {
+ pm.addTransaction(tx);
+ }
}
messagesAdded.increment();
@@ -760,6 +764,10 @@
{
pm.removeReference(channelID, d.getReference(), tx);
}
+ else if(recoverable)
+ {
+ pm.addTransaction(tx);
+ }
}
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2008-09-25 12:48:01 UTC (rev 5027)
@@ -1556,6 +1556,11 @@
return false;
}
+ public void addTransaction(Transaction tx)
+ {
+ //this forces the tx to be added
+ getCallback(tx);
+ }
// Public --------------------------------------------------------
@@ -1938,11 +1943,11 @@
try
{
- // Insert the tx record
- if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
- {
+ // Insert the tx record even if there are no refs as we need it for recovery
+ //if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
+ //{
addTXRecord(conn, tx);
- }
+ //}
boolean first = false;
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2008-09-25 12:48:01 UTC (rev 5027)
@@ -115,6 +115,11 @@
// NOOP
}
+ public void addTransaction(Transaction tx)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public long reserveIDBlock(String counterName, int size) throws Exception
{
checkServerID();
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2008-09-25 12:48:01 UTC (rev 5027)
@@ -21,15 +21,7 @@
*/
package org.jboss.test.messaging.jms;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.XAConnection;
-import javax.jms.XASession;
+import javax.jms.*;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -86,7 +78,317 @@
// Public --------------------------------------------------------
-
+ /* A simple acknowledgement in a transaction using NP messages, recovered with restart */
+ public void testSimpleTransactionalAcknowledgementNPRecoveryWithRestart() throws Exception
+ {
+ log.trace("starting testSimpleTransactionalAcknowledgementRecoveryWithRestart");
+
+ Connection conn1 = null;
+
+ XAConnection conn2 = null;
+
+ XAConnection conn3 = null;
+
+ try
+ {
+ //First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess1.createProducer(queue4);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+
+ prod.send(tm1);
+
+ conn2 = cf.createXAConnection();
+
+ XASession sess2 = conn2.createXASession();
+
+ XAResource res1 = sess2.getXAResource();
+
+ //Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = sess2.createConsumer(queue4);
+
+ conn2.start();
+
+ //Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ //prepare the tx
+
+ res1.prepare(xid1);
+
+ conn1.close();
+
+ conn2.close();
+
+ conn1 = null;
+
+ conn2 = null;
+
+ // Now "crash" the server
+
+ ServerManagement.stopServerPeer();
+
+ ServerManagement.startServerPeer();
+
+ deployAndLookupAdministeredObjects();
+
+ //Now recover
+
+ conn3 = cf.createXAConnection();
+
+ XASession sess3 = conn3.createXASession();
+
+ XAResource res3 = sess3.getXAResource();
+
+ Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ assertEquals(xid1, xids[0]);
+
+ //Commit the tx
+
+ res3.commit(xids[0], false);
+
+ //The message should be acknowldged
+
+ conn3.close();
+
+ conn1 = cf.createConnection();
+
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue4);
+
+ conn1.start();
+
+ Message m = cons1.receive(1000);
+
+ assertNull(m);
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn2 != null)
+ {
+ try
+ {
+ conn2.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn3 != null)
+ {
+ try
+ {
+ conn3.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+ }
+ }
+
+
+ /* A simple send in a transaction usingNP messages - recovered after restarting the server */
+ public void testSimpleTransactionalSendNPRecoveryWithRestart() throws Exception
+ {
+ log.trace("starting testSimpleTransactionalDeliveryRecoveryWithRestart");
+
+ XAConnection conn1 = null;
+
+ Connection conn2 = null;
+
+ XAConnection conn3 = null;
+
+ try
+ {
+ conn1 = cf.createXAConnection();
+
+ XASession sess1 = conn1.createXASession();
+
+ XAResource res1 = sess1.getXAResource();
+
+ //Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ log.trace("Sending message");
+
+ //Send message in tx
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageProducer prod1 = sess1.createProducer(queue4);
+
+ prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+
+ prod1.send(tm1);
+ prod1.send(tm2);
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ log.trace("Sent message");
+
+ //prepare tx
+
+ res1.prepare(xid1);
+
+ log.trace("prepared tx");
+
+ conn2 = cf.createConnection();
+
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess2.createConsumer(queue4);
+
+ conn2.start();
+
+ //Verify message can't be received
+
+ Message m = cons2.receive(1000);
+
+ assertNull(m);
+
+ conn1.close();
+
+ conn2.close();
+
+ conn1 = null;
+
+ conn2 = null;
+
+ // Now "crash" the server
+
+ ServerManagement.stopServerPeer();
+
+ ServerManagement.startServerPeer();
+
+ deployAndLookupAdministeredObjects();
+
+ conn3 = cf.createXAConnection();
+
+ XASession sess3 = conn3.createXASession();
+
+ XAResource res3 = sess3.getXAResource();
+
+ log.trace("created connection");
+
+ Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ assertEquals(xid1, xids[0]);
+
+ log.trace("recovered");
+
+ conn2 = cf.createConnection();
+
+ sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons2 = sess2.createConsumer(queue4);
+
+ conn2.start();
+
+ // Verify message still can't be received
+
+ m = cons2.receive(1000);
+
+ assertNull(m);
+
+ log.trace("still can't see message");
+
+ //Commit the tx
+
+ res3.commit(xids[0], false);
+
+ log.trace("committed");
+
+ //The message should now be available
+
+ TextMessage rm1 = (TextMessage)cons2.receive(1000);
+
+ assertNull(rm1);
+
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn2 != null)
+ {
+ try
+ {
+ conn2.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn3 != null)
+ {
+ try
+ {
+ conn3.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+ }
+ }
/*
* In this test, we have two queues, each with four messages already in them.
*
More information about the jboss-cvs-commits
mailing list