JBoss hornetq SVN: r9843 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-04 04:04:34 -0400 (Thu, 04 Nov 2010)
New Revision: 9843
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
Log:
test fix
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2010-11-04 08:02:21 UTC (rev 9842)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/SessionCloseOnGCTest.java 2010-11-04 08:04:34 UTC (rev 9843)
@@ -282,8 +282,8 @@
UnitTestCase.checkWeakReferences(wses);
Assert.assertEquals(0, sf.numSessions());
- Assert.assertEquals(0, sf.numConnections());
- Assert.assertEquals(0, server.getRemotingService().getConnections().size());
+ Assert.assertEquals(1, sf.numConnections());
+ Assert.assertEquals(1, server.getRemotingService().getConnections().size());
}
public void testCloseSeveralSessionOnGC() throws Exception
@@ -307,8 +307,8 @@
UnitTestCase.checkWeakReferences(ref1, ref2, ref3);
Assert.assertEquals(0, sf.numSessions());
- Assert.assertEquals(0, sf.numConnections());
- Assert.assertEquals(0, server.getRemotingService().getConnections().size());
+ Assert.assertEquals(1, sf.numConnections());
+ Assert.assertEquals(1, server.getRemotingService().getConnections().size());
}
}
14 years, 1 month
JBoss hornetq SVN: r9842 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-04 04:02:21 -0400 (Thu, 04 Nov 2010)
New Revision: 9842
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java
Log:
test fix
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2010-11-04 01:25:11 UTC (rev 9841)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2010-11-04 08:02:21 UTC (rev 9842)
@@ -194,7 +194,15 @@
rc1.fail(new HornetQException(HornetQException.NOT_CONNECTED, "blah"));
- Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ try
+ {
+ Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("should throw exception");
+ }
+ catch (JMSException e)
+ {
+ //pass
+ }
conn1.close();
}
14 years, 1 month
JBoss hornetq SVN: r9841 - in branches/Branch_New_Paging/src/main/org/hornetq/core: paging/cursor/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-03 21:25:11 -0400 (Wed, 03 Nov 2010)
New Revision: 9841
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
backup
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-03 21:17:32 UTC (rev 9840)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-04 01:25:11 UTC (rev 9841)
@@ -13,8 +13,7 @@
package org.hornetq.core.paging.cursor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.LinkedListIterator;
@@ -89,7 +88,7 @@
void processReload() throws Exception;
- /**
+ /**
* To be used on redeliveries
* @param position
*/
@@ -101,7 +100,12 @@
* @param minPage
* @return
*/
- boolean isComplete(long minPage);
+ boolean isComplete(long page);
+ /** wait all the scheduled runnables to finish their current execution */
void flushExecutors();
+
+ void setQueue(Queue queue);
+
+ Queue getQueue();
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-03 21:17:32 UTC (rev 9840)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-04 01:25:11 UTC (rev 9841)
@@ -54,6 +54,11 @@
this.a = a;
this.b = b;
}
+
+ public boolean isPaged()
+ {
+ return true;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-03 21:17:32 UTC (rev 9840)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-04 01:25:11 UTC (rev 9841)
@@ -38,6 +38,7 @@
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperationAbstract;
@@ -77,6 +78,8 @@
private final long cursorId;
+ private Queue queue;
+
private final boolean persistent;
private final Filter filter;
@@ -121,6 +124,16 @@
// Public --------------------------------------------------------
+ public Queue getQueue()
+ {
+ return queue;
+ }
+
+ public void setQueue(Queue queue)
+ {
+ this.queue = queue;
+ }
+
public void disableAutoCleanup()
{
autoCleanup = false;
@@ -267,8 +280,6 @@
{
return new CursorIterator();
}
-
- int validCount = 0;
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java 2010-11-03 21:17:32 UTC (rev 9840)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java 2010-11-04 01:25:11 UTC (rev 9841)
@@ -26,6 +26,9 @@
*/
public interface MessageReference
{
+
+ boolean isPaged();
+
ServerMessage getMessage();
MessageReference copy(Queue queue);
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-03 21:17:32 UTC (rev 9840)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2010-11-04 01:25:11 UTC (rev 9841)
@@ -228,5 +228,10 @@
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
+
+ public boolean isPaged()
+ {
+ return false;
+ }
}
}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2010-11-03 21:17:32 UTC (rev 9840)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2010-11-04 01:25:11 UTC (rev 9841)
@@ -144,6 +144,11 @@
{
queue.referenceHandled();
}
+
+ public boolean isPaged()
+ {
+ return false;
+ }
// Public --------------------------------------------------------
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-03 21:17:32 UTC (rev 9840)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-04 01:25:11 UTC (rev 9841)
@@ -217,6 +217,11 @@
{
expiryAddress = null;
}
+
+ if (pageSubscription != null)
+ {
+ pageSubscription.setQueue(this);
+ }
this.executor = executor;
14 years, 1 month
JBoss hornetq SVN: r9840 - in branches/Branch_New_Paging: tests/src/org/hornetq/tests/integration/paging and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-03 17:17:32 -0400 (Wed, 03 Nov 2010)
New Revision: 9840
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
repeat on paging
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-03 16:55:38 UTC (rev 9839)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-03 21:17:32 UTC (rev 9840)
@@ -215,6 +215,7 @@
PagedReferenceImpl nextPos = moveNext(position);
if (nextPos != null)
{
+ lastOperation = position;
position = nextPos.getPosition();
}
return nextPos;
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-03 16:55:38 UTC (rev 9839)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-03 21:17:32 UTC (rev 9840)
@@ -902,9 +902,9 @@
PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
- Iterator<PagedReferenceImpl> iter = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iter = cursor.iterator();
- Iterator<PagedReferenceImpl> iter2 = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iter2 = cursor.iterator();
assertTrue(iter.hasNext());
@@ -926,6 +926,26 @@
assertEquals(2, tstProperty(msg2.getMessage()));
+ iter2.repeat();
+
+ msg2 = iter2.next();
+
+ assertEquals(2, tstProperty(msg2.getMessage()));
+
+ iter2.repeat();
+
+ assertEquals(2, tstProperty(msg2.getMessage()));
+
+ msg1 = iter.next();
+
+ assertEquals(2, tstProperty(msg1.getMessage()));
+
+ iter.repeat();
+
+ msg1 = iter.next();
+
+ assertEquals(2, tstProperty(msg1.getMessage()));
+
assertTrue(iter2.hasNext());
14 years, 1 month
JBoss hornetq SVN: r9839 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-03 12:55:38 -0400 (Wed, 03 Nov 2010)
New Revision: 9839
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
Log:
tweak
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-03 15:55:22 UTC (rev 9838)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-03 16:55:38 UTC (rev 9839)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.cursor;
-import org.hornetq.api.core.Pair;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
14 years, 1 month
JBoss hornetq SVN: r9838 - in branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster: restart and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-03 11:55:22 -0400 (Wed, 03 Nov 2010)
New Revision: 9838
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
Log:
test fix
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-11-03 15:32:04 UTC (rev 9837)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-11-03 15:55:22 UTC (rev 9838)
@@ -1163,6 +1163,36 @@
sfs[node] = sf;
}
+ protected void setupSessionFactory(final int node, final boolean netty, int reconnectAttempts) throws Exception
+ {
+ if (sfs[node] != null)
+ {
+ throw new IllegalArgumentException("Already a server at " + node);
+ }
+
+ Map<String, Object> params = generateParams(node, netty);
+
+ TransportConfiguration serverTotc;
+
+ if (netty)
+ {
+ serverTotc = new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, params);
+ }
+ else
+ {
+ serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
+ }
+
+ locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+
+ locators[node].setBlockOnNonDurableSend(true);
+ locators[node].setBlockOnDurableSend(true);
+ locators[node].setReconnectAttempts(reconnectAttempts);
+ ClientSessionFactory sf = locators[node].createSessionFactory();
+
+ sfs[node] = sf;
+ }
+
protected void setupSessionFactory(final int node, final int backupNode, final boolean netty, final boolean blocking) throws Exception
{
if (sfs[node] != null)
@@ -1184,14 +1214,14 @@
}
- ServerLocator locator = HornetQClient.createServerLocatorWithHA(serverTotc);
- locator.setRetryInterval(100);
- locator.setRetryIntervalMultiplier(1d);
- locator.setReconnectAttempts(-1);
- locator.setBlockOnNonDurableSend(blocking);
- locator.setBlockOnDurableSend(blocking);
+ locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+ locators[node].setRetryInterval(100);
+ locators[node].setRetryIntervalMultiplier(1d);
+ locators[node].setReconnectAttempts(-1);
+ locators[node].setBlockOnNonDurableSend(blocking);
+ locators[node].setBlockOnDurableSend(blocking);
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locators[node].createSessionFactory();
sfs[node] = sf;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2010-11-03 15:32:04 UTC (rev 9837)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2010-11-03 15:55:22 UTC (rev 9838)
@@ -41,7 +41,7 @@
try
{
- setupSessionFactory(0, isNetty());
+ setupSessionFactory(0, isNetty(), -1);
setupSessionFactory(1, isNetty());
// create some dummy queues to ensure that the test queue has a high numbered binding
@@ -77,8 +77,6 @@
startServers(0);
- addConsumer(1, 0, "queue10", null);
-
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
@@ -89,7 +87,7 @@
sendInRange(1, "queues.testaddress", 10, 20, false, null);
- verifyReceiveAllInRange(10, 20, 1);
+ verifyReceiveAllInRange(10, 20, 0);
System.out.println("*****************************************************************************");
}
finally
@@ -121,7 +119,7 @@
try
{
- setupSessionFactory(0, isNetty());
+ setupSessionFactory(0, isNetty(), -1);
setupSessionFactory(1, isNetty());
// create some dummy queues to ensure that the test queue has a high numbered binding
14 years, 1 month
JBoss hornetq SVN: r9837 - in branches/2_2_0_HA_Improvements: tests/src/org/hornetq/tests/integration/cluster/util and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-03 11:32:04 -0400 (Wed, 03 Nov 2010)
New Revision: 9837
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
get backup config before we call node down
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-03 13:02:01 UTC (rev 9836)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-03 15:32:04 UTC (rev 9837)
@@ -26,10 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.*;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
@@ -88,6 +85,8 @@
private TransportConfiguration connectorConfig;
+ private TransportConfiguration backupConfig;
+
private ConnectorFactory connectorFactory;
private Map<String, Object> transportParams;
@@ -514,8 +513,6 @@
// We will try to failover if there is a backup connector factory, but we don't do this if the server
// has been shutdown cleanly unless failoverOnServerShutdown is true
- TransportConfiguration backupConfig = serverLocator.getBackup(connectorConfig);
-
boolean attemptFailover = (backupConfig != null) && !serverShutdown;
boolean attemptReconnect;
@@ -592,6 +589,8 @@
// Now try failing over to backup
this.connectorConfig = backupConfig;
+
+ backupConfig = null;
connectorFactory = instantiateConnectorFactory(connectorConfig.getFactoryClassName());
@@ -1205,8 +1204,10 @@
// cause reconnect loop
public void run()
{
- if (msg.getNodeID() != null)
+ SimpleString nodeID = msg.getNodeID();
+ if (nodeID != null)
{
+ backupConfig = serverLocator.getBackup(connectorConfig);
serverLocator.notifyNodeDown(msg.getNodeID().toString());
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-11-03 13:02:01 UTC (rev 9836)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-11-03 15:32:04 UTC (rev 9837)
@@ -88,12 +88,12 @@
{
session.addFailureListener(new MyListener());
}
- Set<RemotingConnection> connections = server.getRemotingService().getConnections();
+ /*Set<RemotingConnection> connections = server.getRemotingService().getConnections();
for (RemotingConnection remotingConnection : connections)
{
remotingConnection.destroy();
server.getRemotingService().removeConnection(remotingConnection.getID());
- }
+ }*/
ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
clusterManager.clear();
14 years, 1 month
JBoss hornetq SVN: r9836 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-11-03 09:02:01 -0400 (Wed, 03 Nov 2010)
New Revision: 9836
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
call listeners before reconnect and test fixes
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-03 13:02:01 UTC (rev 9836)
@@ -165,7 +165,7 @@
final int reconnectAttempts,
final ExecutorService threadPool,
final ScheduledExecutorService scheduledThreadPool,
- final List<Interceptor> interceptors) throws HornetQException
+ final List<Interceptor> interceptors)
{
e.fillInStackTrace();
@@ -613,15 +613,19 @@
connection = null;
}
- callFailureListeners(me, true, connection != null);
if (connection == null)
{
sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ callFailureListeners(me, true, false);
}
}
// This needs to be outside the failover lock to prevent deadlock
+ if(connection != null)
+ {
+ callFailureListeners(me, true, true);
+ }
if (sessionsToClose != null)
{
// If connection is null it means we didn't succeed in failing over or reconnecting
@@ -1191,18 +1195,24 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
+ if (msg.getNodeID() != null)
+ {
+ System.out.println("received disconnect from node " + msg.getNodeID());
+ }
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
// cause reconnect loop
public void run()
{
- conn.fail(new HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
- "The connection was disconnected because of server shutdown"));
if (msg.getNodeID() != null)
{
serverLocator.notifyNodeDown(msg.getNodeID().toString());
}
+
+ conn.fail(new HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
+ "The connection was disconnected because of server shutdown"));
+
}
});
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-11-03 13:02:01 UTC (rev 9836)
@@ -1297,7 +1297,19 @@
connectors = new ArrayList<Connector>();
for (TransportConfiguration initialConnector : initialConnectors)
{
- connectors.add(new Connector(initialConnector));
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
+ initialConnector,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+ connectors.add(new Connector(initialConnector, factory));
}
}
@@ -1336,18 +1348,14 @@
private boolean interrupted = false;
private Exception e;
- public Connector(TransportConfiguration initialConnector)
+ public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
{
this.initialConnector = initialConnector;
+ this.factory = factory;
}
public ClientSessionFactory call() throws HornetQException
{
- factory = getFactory();
- if(factory == null)
- {
- return null;
- }
try
{
factory.connect(reconnectAttempts, failoverOnInitialConnection);
@@ -1359,11 +1367,11 @@
this.e = e;
throw e;
}
- if(factory != null)
+ /*if(factory != null)
{
factory.close();
factory = null;
- }
+ }*/
return null;
}
isConnected = true;
@@ -1382,7 +1390,7 @@
return isConnected;
}
- public synchronized void disconnect()
+ public void disconnect()
{
interrupted = true;
@@ -1393,39 +1401,6 @@
factory = null;
}
}
-
- private synchronized ClientSessionFactoryInternal getFactory() throws HornetQException
- {
- if(interrupted)
- {
- return null;
- }
- if (factory == null)
- {
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- }
- return factory;
- }
}
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-03 13:02:01 UTC (rev 9836)
@@ -238,7 +238,7 @@
{
return;
}
-
+
if (serverLocator != null)
{
serverLocator.removeClusterTopologyListener(this);
@@ -259,10 +259,10 @@
if (serverLocator != null)
{
+ //serverLocator.removeClusterTopologyListener(this);
serverLocator.close();
}
-
if (managementService != null)
{
TypedProperties props = new TypedProperties();
@@ -331,14 +331,13 @@
//Remove the flow record for that node
- MessageFlowRecord record = records.remove(nodeID);
+ MessageFlowRecord record = records.get(nodeID);
if (record != null)
{
try
{
record.reset();
- //record.close();
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-03 13:02:01 UTC (rev 9836)
@@ -59,7 +59,7 @@
private static final Logger log = Logger.getLogger(TopologyClusterTestBase.class);
- private static final long WAIT_TIMEOUT = 30000;
+ private static final long WAIT_TIMEOUT = 5000;
abstract protected ServerLocator createHAServerLocator();
@@ -239,8 +239,6 @@
stopServers(2, 3, 1, 4);
- waitForClusterConnections(0, 0);
-
assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
checkContains(new int[] { 0 }, nodeIDs, nodes);
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-03 13:02:01 UTC (rev 9836)
@@ -93,7 +93,9 @@
{
for (ClientSessionFactoryImpl factory : ClientSessionFactoryImpl.factories)
{
+ // System.out.println(threadDump("oops"));
//factory.e.printStackTrace();
+ // System.exit(0);
}
}
super.tearDown(); //To change body of overridden methods use File | Settings | File Templates.
14 years, 1 month
JBoss hornetq SVN: r9835 - in trunk: src/main/org/hornetq/api/jms/management and 16 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2010-11-03 04:37:11 -0400 (Wed, 03 Nov 2010)
New Revision: 9835
Added:
trunk/src/main/org/hornetq/core/transaction/TransactionDetail.java
trunk/src/main/org/hornetq/core/transaction/impl/CoreTransactionDetail.java
trunk/src/main/org/hornetq/jms/transaction/
trunk/src/main/org/hornetq/jms/transaction/JMSTransactionDetail.java
Modified:
trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-405
Add listPreparedTransactionDetailsAsJSON(), listPreparedTransactionDetailsAsHTML() on HornetQServerControl and JMSServerControl. You can see transaction ID, message contents, and some other information about prepared transaction with these method via JMX.
Modified: trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -411,6 +411,20 @@
String[] listPreparedTransactions() throws Exception;
/**
+ * List all the prepared transaction, sorted by date,
+ * oldest first, with details, in text format.
+ */
+ @Operation(desc = "List all the prepared transaction, sorted by date, oldest first, with details, in JSON format")
+ String listPreparedTransactionDetailsAsJSON() throws Exception;
+
+ /**
+ * List all the prepared transaction, sorted by date,
+ * oldest first, with details, in HTML format
+ */
+ @Operation(desc = "List all the prepared transaction, sorted by date, oldest first, with details, in HTML format")
+ String listPreparedTransactionDetailsAsHTML() throws Exception;
+
+ /**
* List transactions which have been heuristically committed.
*/
String[] listHeuristicCommittedTransactions() throws Exception;
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -234,5 +234,19 @@
@Operation(desc = "List the sessions for the given connectionID", impact = MBeanOperationInfo.INFO)
String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
+ /**
+ * List all the prepared transaction, sorted by date,
+ * oldest first, with details, in text format
+ */
+ @Operation(desc = "List all the prepared transaction, sorted by date, oldest first, with details, in JSON format", impact = MBeanOperationInfo.INFO)
+ String listPreparedTransactionDetailsAsJSON() throws Exception;
+
+ /**
+ * List all the prepared transaction, sorted by date,
+ * oldest first, with details, in HTML format
+ */
+ @Operation(desc = "List all the prepared transaction, sorted by date, oldest first, with details, in HTML format", impact = MBeanOperationInfo.INFO)
+ String listPreparedTransactionDetailsAsHTML() throws Exception;
+
}
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -20,6 +20,7 @@
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -64,6 +65,8 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionDetail;
+import org.hornetq.core.transaction.impl.CoreTransactionDetail;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.SecurityFormatter;
@@ -902,6 +905,134 @@
}
}
+ public String listPreparedTransactionDetailsAsJSON() throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+ try
+ {
+ Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
+ if(xids == null || xids.size()==0)
+ {
+ return "";
+ }
+
+ ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
+ Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
+ {
+ public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2)
+ {
+ // sort by creation time, oldest first
+ return (int)(entry1.getValue() - entry2.getValue());
+ }
+ });
+
+ JSONArray txDetailListJson = new JSONArray();
+ for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+ {
+ Xid xid = entry.getKey();
+ TransactionDetail detail = new CoreTransactionDetail(xid,
+ resourceManager.getTransaction(xid),
+ entry.getValue());
+
+ txDetailListJson.put(detail.toJSON());
+ }
+ return txDetailListJson.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public String listPreparedTransactionDetailsAsHTML() throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+ try
+ {
+ Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
+ if(xids == null || xids.size() == 0)
+ {
+ return "<h3>*** Prepared Transaction Details ***</h3><p>No entry.</p>";
+ }
+
+ ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
+ Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
+ {
+ public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2)
+ {
+ // sort by creation time, oldest first
+ return (int)(entry1.getValue() - entry2.getValue());
+ }
+ });
+
+ StringBuilder html = new StringBuilder();
+ html.append("<h3>*** Prepared Transaction Details ***</h3>");
+
+ for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+ {
+ Xid xid = entry.getKey();
+ TransactionDetail detail = new CoreTransactionDetail(xid,
+ resourceManager.getTransaction(xid),
+ entry.getValue());
+
+ JSONObject txJson = detail.toJSON();
+
+ html.append("<table border=\"1\">");
+ html.append("<tr><th>creation_time</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_CREATION_TIME) + "</td>");
+ html.append("<th>xid_as_base_64</th>");
+ html.append("<td colspan=\"3\">" + txJson.get(TransactionDetail.KEY_XID_AS_BASE64) + "</td></tr>");
+ html.append("<tr><th>xid_format_id</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_XID_FORMAT_ID) + "</td>");
+ html.append("<th>xid_global_txid</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_XID_GLOBAL_TXID) + "</td>");
+ html.append("<th>xid_branch_qual</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_XID_BRANCH_QUAL) + "</td></tr>");
+
+ html.append("<tr><th colspan=\"6\">Message List</th></tr>");
+ html.append("<tr><td colspan=\"6\">");
+ html.append("<table border=\"1\" cellspacing=\"0\" cellpadding=\"0\">");
+
+ JSONArray msgs = txJson.getJSONArray(TransactionDetail.KEY_TX_RELATED_MESSAGES);
+ for(int i=0; i<msgs.length(); i++)
+ {
+ JSONObject msgJson = msgs.getJSONObject(i);
+ JSONObject props = msgJson.getJSONObject(TransactionDetail.KEY_MSG_PROPERTIES);
+ StringBuilder propstr = new StringBuilder();
+ @SuppressWarnings("unchecked")
+ Iterator<String> propkeys = props.keys();
+ while(propkeys.hasNext())
+ {
+ String key = propkeys.next();
+ propstr.append(key);
+ propstr.append("=");
+ propstr.append(props.get(key));
+ propstr.append(", ");
+ }
+
+ html.append("<th>operation_type</th>");
+ html.append("<td>" + msgJson.get(TransactionDetail.KEY_MSG_OP_TYPE) + "</th>");
+ html.append("<th>message_type</th>");
+ html.append("<td>" + msgJson.get(TransactionDetail.KEY_MSG_TYPE) + "</td></tr>");
+ html.append("<tr><th>properties</th>");
+ html.append("<td colspan=\"3\">" + propstr.toString() + "</td></tr>");
+ }
+ html.append("</table></td></tr>");
+ html.append("</table><br/>");
+ }
+
+ return html.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public String[] listHeuristicCommittedTransactions()
{
checkStarted();
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -13,6 +13,7 @@
package org.hornetq.core.paging.impl;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -22,6 +23,7 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.utils.DataConstants;
@@ -196,6 +198,11 @@
{
pgToUpdate.update(depages, storageManager, pagingManager);
}
+
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
});
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -2068,6 +2068,10 @@
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
}
private static class ScheduledDeliveryEncoding extends QueueEncoding
@@ -2191,6 +2195,11 @@
{
}
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
+
}
private static final class AddMessageRecord
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -25,6 +25,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -298,6 +299,11 @@
{
}
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
*/
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -1212,6 +1212,11 @@
}
}
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
+
private void pageMessages(final Transaction tx) throws Exception
{
if (!pagingData.isEmpty())
@@ -1341,6 +1346,11 @@
message.decrementRefCount();
}
}
+
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return refs;
+ }
}
public Bindings createBindings()
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -1636,6 +1636,10 @@
public void beforeRollback(final Transaction tx) throws Exception
{
}
+
+ public List<MessageReference> getRelatedMessageReferences() {
+ return refsToAck;
+ }
}
private class DelayedAddRedistributor implements Runnable
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -13,6 +13,8 @@
package org.hornetq.core.transaction;
+import java.util.List;
+
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
@@ -59,6 +61,8 @@
void removeOperation(TransactionOperation sync);
+ public List<TransactionOperation> getAllOperations();
+
boolean hasTimedOut(long currentTime, int defaultTimeout);
/** We don't want to look on operations at every send, so we keep the paging attribute and will only look at
Added: trunk/src/main/org/hornetq/core/transaction/TransactionDetail.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionDetail.java (rev 0)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionDetail.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.transaction;
+
+import java.text.DateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.impl.XidImpl;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A TransactionDetail
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public abstract class TransactionDetail
+{
+ public static final String KEY_CREATION_TIME = "creation_time";
+
+ public static final String KEY_XID_AS_BASE64 = "xid_as_base64";
+
+ public static final String KEY_XID_FORMAT_ID = "xid_format_id";
+
+ public static final String KEY_XID_GLOBAL_TXID = "xid_global_txid";
+
+ public static final String KEY_XID_BRANCH_QUAL = "xid_branch_qual";
+
+ public static final String KEY_TX_RELATED_MESSAGES = "tx_related_messages";
+
+ public static final String KEY_MSG_OP_TYPE = "message_operation_type";
+
+ public static final String KEY_MSG_BODY_BUFFER = "message_body";
+
+ public static final String KEY_MSG_TYPE = "message_type";
+
+ public static final String KEY_MSG_PROPERTIES = "message_properties";
+
+ public static final String KEY_MSG_PAYLOAD = "message_payload";
+
+ private Xid xid;
+ private Transaction transaction;
+ private Long creationTime;
+
+ public TransactionDetail(Xid xid, Transaction tx, Long creation)
+ {
+ this.xid = xid;
+ this.transaction = tx;
+ this.creationTime = creation;
+ }
+
+ public JSONObject toJSON() throws Exception
+ {
+ DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+ JSONObject detailJson = new JSONObject();
+
+ detailJson.put(KEY_CREATION_TIME, dateFormat.format(new Date(this.creationTime)));
+ detailJson.put(KEY_XID_AS_BASE64, XidImpl.toBase64String(this.xid));
+ detailJson.put(KEY_XID_FORMAT_ID, this.xid.getFormatId());
+ detailJson.put(KEY_XID_GLOBAL_TXID, new String(this.xid.getGlobalTransactionId()));
+ detailJson.put(KEY_XID_BRANCH_QUAL, new String(this.xid.getBranchQualifier()));
+
+ JSONArray msgsJson = new JSONArray();
+ List<TransactionOperation> txops = this.transaction.getAllOperations();
+ detailJson.put(KEY_TX_RELATED_MESSAGES, msgsJson);
+ if (txops == null)
+ {
+ return detailJson;
+ }
+
+ for (TransactionOperation op : txops)
+ {
+ String opClassName = op.getClass().getName();
+ String opType = null;
+ if (opClassName.equals("org.hornetq.core.postoffice.impl.PostOfficeImpl$AddOperation"))
+ {
+ opType = "(+) send";
+ }
+ else if (opClassName.equals("org.hornetq.core.server.impl.QueueImpl$RefsOperation"))
+ {
+ opType = "(-) receive";
+ }
+
+ List<MessageReference> msgs = op.getRelatedMessageReferences();
+ if (msgs == null)
+ {
+ continue;
+ }
+
+ for (MessageReference ref : msgs)
+ {
+ JSONObject msgJson = new JSONObject();
+ msgsJson.put(msgJson);
+
+ msgJson.put(KEY_MSG_OP_TYPE, opType);
+
+ ServerMessage msg = ref.getMessage().copy();
+
+ msgJson.put(KEY_MSG_TYPE, decodeMessageType(msg));
+ msgJson.put(KEY_MSG_PAYLOAD, decodeMessagePayload(msg));
+ msgJson.put(KEY_MSG_PROPERTIES, decodeMessageProperties(msg));
+ }
+ }
+ return detailJson;
+ }
+
+ public abstract String decodeMessageType(ServerMessage msg);
+
+ public abstract String decodeMessagePayload(ServerMessage msg);
+
+ public abstract Map<String,Object> decodeMessageProperties(ServerMessage msg);
+}
Property changes on: trunk/src/main/org/hornetq/core/transaction/TransactionDetail.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -13,6 +13,10 @@
package org.hornetq.core.transaction;
+import java.util.List;
+
+import org.hornetq.core.server.MessageReference;
+
/**
*
* A TransactionOperation
@@ -36,4 +40,6 @@
/** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
void afterRollback(Transaction tx);
+
+ List<MessageReference> getRelatedMessageReferences();
}
Added: trunk/src/main/org/hornetq/core/transaction/impl/CoreTransactionDetail.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/CoreTransactionDetail.java (rev 0)
+++ trunk/src/main/org/hornetq/core/transaction/impl/CoreTransactionDetail.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.transaction.impl;
+
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionDetail;
+
+/**
+ * A CoreTransactionDetail
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ *
+ *
+ */
+public class CoreTransactionDetail extends TransactionDetail
+{
+ public CoreTransactionDetail(Xid xid, Transaction tx, Long creation) throws Exception
+ {
+ super(xid,tx,creation);
+ }
+
+ @Override
+ public String decodeMessageType(ServerMessage msg)
+ {
+ int type = msg.getType();
+ switch (type)
+ {
+ case Message.DEFAULT_TYPE: // 0
+ return "Default";
+ case Message.OBJECT_TYPE: // 2
+ return "ObjectMessage";
+ case Message.TEXT_TYPE: // 3
+ return "TextMessage";
+ case Message.BYTES_TYPE: // 4
+ return "ByteMessage";
+ case Message.MAP_TYPE: // 5
+ return "MapMessage";
+ case Message.STREAM_TYPE: // 6
+ return "StreamMessage";
+ default:
+ return "(Unknown Type)";
+ }
+ }
+
+ @Override
+ public String decodeMessagePayload(ServerMessage msg)
+ {
+ return msg.getBodyBuffer().toByteBuffer().toString();
+ }
+
+ @Override
+ public Map<String, Object> decodeMessageProperties(ServerMessage msg)
+ {
+ return msg.toMap();
+ }
+}
Property changes on: trunk/src/main/org/hornetq/core/transaction/impl/CoreTransactionDetail.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -401,6 +401,10 @@
return operations.size();
}
+ public List<TransactionOperation> getAllOperations() {
+ return operations;
+ }
+
public void putProperty(final int index, final Object property)
{
if (index >= properties.length)
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -31,7 +31,6 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.api.jms.management.JMSServerControl;
@@ -700,6 +699,38 @@
}
}
+ public String listPreparedTransactionDetailsAsJSON() throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ return server.listPreparedTransactionDetailsAsJSON();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public String listPreparedTransactionDetailsAsHTML() throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ return server.listPreparedTransactionDetailsAsHTML();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
@Override
public MBeanInfo getMBeanInfo()
{
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -286,6 +286,10 @@
String[] listSessions(String connectionID) throws Exception;
+ String listPreparedTransactionDetailsAsJSON() throws Exception;
+
+ String listPreparedTransactionDetailsAsHTML() throws Exception;
+
void setContext(final Context context);
HornetQServer getHornetQServer();
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -14,16 +14,20 @@
package org.hornetq.jms.server.impl;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
@@ -45,6 +49,8 @@
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.TransactionDetail;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQQueue;
@@ -65,8 +71,11 @@
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.management.JMSManagementService;
import org.hornetq.jms.server.management.impl.JMSManagementServiceImpl;
+import org.hornetq.jms.transaction.JMSTransactionDetail;
import org.hornetq.spi.core.naming.BindingRegistry;
import org.hornetq.utils.TimeAndCounterIDGenerator;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
/**
* A Deployer used to create and add to JNDI queues, topics and connection
@@ -1372,7 +1381,117 @@
checkInitialised();
return server.getHornetQServerControl().listSessions(connectionID);
}
+
+ public String listPreparedTransactionDetailsAsJSON() throws Exception
+ {
+ ResourceManager resourceManager = server.getResourceManager();
+ Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
+ if(xids == null || xids.size()==0)
+ {
+ return "";
+ }
+
+ ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
+ Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
+ {
+ public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2)
+ {
+ // sort by creation time, oldest first
+ return (int)(entry1.getValue() - entry2.getValue());
+ }
+ });
+
+ JSONArray txDetailListJson = new JSONArray();
+ for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+ {
+ Xid xid = entry.getKey();
+ TransactionDetail detail = new JMSTransactionDetail(xid,
+ resourceManager.getTransaction(xid),
+ entry.getValue());
+ txDetailListJson.put(detail.toJSON());
+ }
+ return txDetailListJson.toString();
+ }
+
+ public String listPreparedTransactionDetailsAsHTML() throws Exception
+ {
+ ResourceManager resourceManager = server.getResourceManager();
+ Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
+ if(xids == null || xids.size() == 0)
+ {
+ return "<h3>*** Prepared Transaction Details ***</h3><p>No entry.</p>";
+ }
+ ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
+ Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
+ {
+ public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2)
+ {
+ // sort by creation time, oldest first
+ return (int)(entry1.getValue() - entry2.getValue());
+ }
+ });
+
+ StringBuilder html = new StringBuilder();
+ html.append("<h3>*** Prepared Transaction Details ***</h3>");
+
+ for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+ {
+ Xid xid = entry.getKey();
+ TransactionDetail detail = new JMSTransactionDetail(xid,
+ resourceManager.getTransaction(xid),
+ entry.getValue());
+ JSONObject txJson = detail.toJSON();
+
+ html.append("<table border=\"1\">");
+ html.append("<tr><th>creation_time</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_CREATION_TIME) + "</td>");
+ html.append("<th>xid_as_base_64</th>");
+ html.append("<td colspan=\"3\">" + txJson.get(TransactionDetail.KEY_XID_AS_BASE64) + "</td></tr>");
+ html.append("<tr><th>xid_format_id</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_XID_FORMAT_ID) + "</td>");
+ html.append("<th>xid_global_txid</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_XID_GLOBAL_TXID) + "</td>");
+ html.append("<th>xid_branch_qual</th>");
+ html.append("<td>" + txJson.get(TransactionDetail.KEY_XID_BRANCH_QUAL) + "</td></tr>");
+
+ html.append("<tr><th colspan=\"6\">Message List</th></tr>");
+ html.append("<tr><td colspan=\"6\">");
+ html.append("<table border=\"1\" cellspacing=\"0\" cellpadding=\"0\">");
+
+ JSONArray msgs = txJson.getJSONArray(TransactionDetail.KEY_TX_RELATED_MESSAGES);
+ for(int i=0; i<msgs.length(); i++)
+ {
+ JSONObject msgJson = msgs.getJSONObject(i);
+ JSONObject props = msgJson.getJSONObject(TransactionDetail.KEY_MSG_PROPERTIES);
+ StringBuilder propstr = new StringBuilder();
+ @SuppressWarnings("unchecked")
+ Iterator<String> propkeys = props.keys();
+ while(propkeys.hasNext())
+ {
+ String key = propkeys.next();
+ propstr.append(key);
+ propstr.append("=");
+ propstr.append(props.get(key));
+ propstr.append(", ");
+ }
+
+ html.append("<th>operation_type</th>");
+ html.append("<td>" + msgJson.get(TransactionDetail.KEY_MSG_OP_TYPE) + "</th>");
+ html.append("<th>message_type</th>");
+ html.append("<td>" + msgJson.get(TransactionDetail.KEY_MSG_TYPE) + "</td></tr>");
+ html.append("<tr><th>properties</th>");
+ html.append("<td colspan=\"3\">" + propstr.toString() + "</td></tr>");
+ html.append("<tr><th colspan=\"4\">payload</th></tr>");
+ html.append("<tr><td colspan=\"4\">" + msgJson.get(TransactionDetail.KEY_MSG_PAYLOAD) + "</td></tr>");
+ }
+ html.append("</table></td></tr>");
+ html.append("</table><br/>");
+ }
+
+ return html.toString();
+ }
+
// Public --------------------------------------------------------
// Private -------------------------------------------------------
Added: trunk/src/main/org/hornetq/jms/transaction/JMSTransactionDetail.java
===================================================================
--- trunk/src/main/org/hornetq/jms/transaction/JMSTransactionDetail.java (rev 0)
+++ trunk/src/main/org/hornetq/jms/transaction/JMSTransactionDetail.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.jms.transaction;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionDetail;
+import org.hornetq.jms.client.HornetQBytesMessage;
+import org.hornetq.jms.client.HornetQMapMessage;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.HornetQObjectMessage;
+import org.hornetq.jms.client.HornetQStreamMessage;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.utils.TypedProperties;
+
+/**
+ * A JMSTransactionDetail
+ *
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ *
+ *
+ */
+public class JMSTransactionDetail extends TransactionDetail
+{
+ public JMSTransactionDetail(Xid xid, Transaction tx, Long creation) throws Exception
+ {
+ super(xid,tx,creation);
+ }
+
+ @Override
+ public String decodeMessageType(ServerMessage msg)
+ {
+ int type = msg.getType();
+ switch (type)
+ {
+ case HornetQMessage.TYPE: // 0
+ return "Default";
+ case HornetQObjectMessage.TYPE: // 2
+ return "ObjectMessage";
+ case HornetQTextMessage.TYPE: // 3
+ return "TextMessage";
+ case HornetQBytesMessage.TYPE: // 4
+ return "ByteMessage";
+ case HornetQMapMessage.TYPE: // 5
+ return "MapMessage";
+ case HornetQStreamMessage.TYPE: // 6
+ return "StreamMessage";
+ default:
+ return "(Unknown Type)";
+ }
+ }
+
+ @Override
+ public String decodeMessagePayload(ServerMessage msg)
+ {
+ int type = msg.getType();
+ HornetQBuffer bodyBuffer = msg.getBodyBuffer();
+
+ try
+ {
+ switch (type)
+ {
+ case HornetQObjectMessage.TYPE: // 2
+ int len = bodyBuffer.readInt();
+ byte[] data = new byte[len];
+ bodyBuffer.readBytes(data);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ObjectInputStream ois = new org.hornetq.utils.ObjectInputStreamWithClassLoader(bais);
+ Serializable object = (Serializable)ois.readObject();
+ return object.toString();
+
+ case HornetQTextMessage.TYPE: // 3
+ return bodyBuffer.readNullableSimpleString().toString();
+
+ case HornetQMapMessage.TYPE: // 5
+ TypedProperties pmap = new TypedProperties();
+ pmap.decode(msg.getBodyBuffer());
+ return pmap.toString();
+ default:
+ return "(Not Available)";
+ }
+ }
+ catch(Throwable t)
+ {
+ return "(Not Available)";
+ }
+ }
+
+ @Override
+ public Map<String, Object> decodeMessageProperties(ServerMessage msg)
+ {
+ try
+ {
+ return HornetQMessage.coreMaptoJMSMap(msg.toMap());
+ }
+ catch (Throwable t)
+ {
+ return null;
+ }
+ }
+}
Property changes on: trunk/src/main/org/hornetq/jms/transaction/JMSTransactionDetail.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -23,9 +23,15 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import junit.framework.Assert;
@@ -46,6 +52,7 @@
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQConnection;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.persistence.JMSStorageManager;
@@ -450,7 +457,98 @@
}
});
}
+
+ public void testListPreparedTransactionDetails() throws Exception
+ {
+ Xid xid = newXID();
+ JMSServerControl control = createManagementControl();
+ TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getName());
+ String cfJNDIBinding = "/cf";
+ String cfName = "cf";
+
+ control.createConnectionFactory(cfName,
+ tc.getFactoryClassName(),
+ null,
+ tc.getFactoryClassName(),
+ null,
+ cfJNDIBinding);
+ control.createQueue("q","/q");
+
+ ConnectionFactory cf = (ConnectionFactory)context.lookup("/cf");
+ Destination dest = (Destination)context.lookup("/q");
+ HornetQConnection conn = (HornetQConnection)cf.createConnection();
+ XASession ss = conn.createXASession();
+ TextMessage m1 = ss.createTextMessage("m1");
+ TextMessage m2 = ss.createTextMessage("m2");
+ TextMessage m3 = ss.createTextMessage("m3");
+ TextMessage m4 = ss.createTextMessage("m4");
+ MessageProducer mp = ss.createProducer(dest);
+ XAResource xa = ss.getXAResource();
+ xa.start(xid, XAResource.TMNOFLAGS);
+ mp.send(m1);
+ mp.send(m2);
+ mp.send(m3);
+ mp.send(m4);
+ xa.end(xid, XAResource.TMSUCCESS);
+ xa.prepare(xid);
+
+ ss.close();
+
+ String txDetails = control.listPreparedTransactionDetailsAsJSON();
+
+ Assert.assertTrue(txDetails.matches(".*m1.*"));
+ Assert.assertTrue(txDetails.matches(".*m2.*"));
+ Assert.assertTrue(txDetails.matches(".*m3.*"));
+ Assert.assertTrue(txDetails.matches(".*m4.*"));
+ }
+
+ public void testListPreparedTranscationDetailsAsHTML() throws Exception
+ {
+ Xid xid = newXID();
+
+ JMSServerControl control = createManagementControl();
+ TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getName());
+ String cfJNDIBinding = "/cf";
+ String cfName = "cf";
+
+ control.createConnectionFactory(cfName,
+ tc.getFactoryClassName(),
+ null,
+ tc.getFactoryClassName(),
+ null,
+ cfJNDIBinding);
+ control.createQueue("q","/q");
+
+ ConnectionFactory cf = (ConnectionFactory)context.lookup("/cf");
+ Destination dest = (Destination)context.lookup("/q");
+ HornetQConnection conn = (HornetQConnection)cf.createConnection();
+ XASession ss = conn.createXASession();
+ TextMessage m1 = ss.createTextMessage("m1");
+ TextMessage m2 = ss.createTextMessage("m2");
+ TextMessage m3 = ss.createTextMessage("m3");
+ TextMessage m4 = ss.createTextMessage("m4");
+ MessageProducer mp = ss.createProducer(dest);
+ XAResource xa = ss.getXAResource();
+ xa.start(xid, XAResource.TMNOFLAGS);
+ mp.send(m1);
+ mp.send(m2);
+ mp.send(m3);
+ mp.send(m4);
+ xa.end(xid, XAResource.TMSUCCESS);
+ xa.prepare(xid);
+
+ ss.close();
+
+ String html = control.listPreparedTransactionDetailsAsHTML();
+
+ Assert.assertTrue(html.matches(".*m1.*"));
+ Assert.assertTrue(html.matches(".*m2.*"));
+ Assert.assertTrue(html.matches(".*m3.*"));
+ Assert.assertTrue(html.matches(".*m4.*"));
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -275,6 +275,16 @@
return (Boolean)proxy.invokeOperation("createTopic", name, jndiBinding);
}
+ public String listPreparedTransactionDetailsAsJSON() throws Exception
+ {
+ return (String)proxy.invokeOperation("listPreparedTransactionDetailsAsJSON");
+ }
+
+ public String listPreparedTransactionDetailsAsHTML() throws Exception
+ {
+ return (String)proxy.invokeOperation("listPreparedTransactionDetailsAsHTML");
+ }
+
};
}
// Public --------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -16,6 +16,9 @@
import java.util.HashMap;
import java.util.Map;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
@@ -41,7 +44,9 @@
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -671,6 +676,80 @@
session.close();
}
+ public void testListPreparedTransactionDetails() throws Exception
+ {
+ SimpleString atestq = new SimpleString("BasicXaTestq");
+ Xid xid = newXID();
+
+ ClientSessionFactory csf = HornetQClient.createClientSessionFactory(new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName()));
+ ClientSession clientSession = csf.createSession(true, false, false);
+ clientSession.createQueue(atestq, atestq, null, true);
+
+ ClientMessage m1 = createTextMessage(clientSession, "");
+ ClientMessage m2 = createTextMessage(clientSession, "");
+ ClientMessage m3 = createTextMessage(clientSession, "");
+ ClientMessage m4 = createTextMessage(clientSession, "");
+ m1.putStringProperty("m1", "m1");
+ m2.putStringProperty("m2", "m2");
+ m3.putStringProperty("m3", "m3");
+ m4.putStringProperty("m4", "m4");
+ ClientProducer clientProducer = clientSession.createProducer(atestq);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ clientSession.close();
+
+ HornetQServerControl serverControl = createManagementControl();
+ String txDetails = serverControl.listPreparedTransactionDetailsAsJSON();
+
+ Assert.assertTrue(txDetails.matches(".*m1.*"));
+ Assert.assertTrue(txDetails.matches(".*m2.*"));
+ Assert.assertTrue(txDetails.matches(".*m3.*"));
+ Assert.assertTrue(txDetails.matches(".*m4.*"));
+ }
+
+ public void testListPreparedTransactionDetailsAsHTML() throws Exception
+ {
+ SimpleString atestq = new SimpleString("BasicXaTestq");
+ Xid xid = newXID();
+
+ ClientSessionFactory csf = HornetQClient.createClientSessionFactory(new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName()));
+ ClientSession clientSession = csf.createSession(true, false, false);
+ clientSession.createQueue(atestq, atestq, null, true);
+
+ ClientMessage m1 = createTextMessage(clientSession, "");
+ ClientMessage m2 = createTextMessage(clientSession, "");
+ ClientMessage m3 = createTextMessage(clientSession, "");
+ ClientMessage m4 = createTextMessage(clientSession, "");
+ m1.putStringProperty("m1", "m1");
+ m2.putStringProperty("m2", "m2");
+ m3.putStringProperty("m3", "m3");
+ m4.putStringProperty("m4", "m4");
+ ClientProducer clientProducer = clientSession.createProducer(atestq);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ clientSession.close();
+
+ HornetQServerControl serverControl = createManagementControl();
+ String html = serverControl.listPreparedTransactionDetailsAsHTML();
+
+ Assert.assertTrue(html.matches(".*m1.*"));
+ Assert.assertTrue(html.matches(".*m2.*"));
+ Assert.assertTrue(html.matches(".*m3.*"));
+ Assert.assertTrue(html.matches(".*m4.*"));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -383,6 +383,16 @@
return (String[])proxy.invokeOperation("listPreparedTransactions");
}
+ public String listPreparedTransactionDetailsAsJSON() throws Exception
+ {
+ return (String)proxy.invokeOperation("listPreparedTransactionDetailsAsJSON");
+ }
+
+ public String listPreparedTransactionDetailsAsHTML() throws Exception
+ {
+ return (String)proxy.invokeOperation("listPreparedTransactionDetailsAsHTML");
+ }
+
public String[] listHeuristicCommittedTransactions() throws Exception
{
return (String[])proxy.invokeOperation("listHeuristicCommittedTransactions");
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -15,6 +15,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
@@ -545,5 +547,10 @@
{
return Collections.emptySet();
}
+
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-11-03 03:13:23 UTC (rev 9834)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-11-03 08:37:11 UTC (rev 9835)
@@ -15,6 +15,7 @@
import java.io.InputStream;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -348,6 +349,14 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#getAllOperations()
+ */
+ public List<TransactionOperation> getAllOperations()
+ {
+ return null;
+ }
+
}
class FakeMessage implements ServerMessage
14 years, 1 month
JBoss hornetq SVN: r9834 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-11-02 23:13:23 -0400 (Tue, 02 Nov 2010)
New Revision: 9834
Added:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
Modified:
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
renaming a few classes.. some renames.. etc
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2010-11-03 03:13:23 UTC (rev 9834)
@@ -53,7 +53,7 @@
PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
- Pair<PagePosition, PagedMessage> getNext(PageSubscription cursor, PagePosition pos) throws Exception;
+ PagedReferenceImpl getNext(PageSubscription cursor, PagePosition pos) throws Exception;
PagedMessage getMessage(PagePosition pos) throws Exception;
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2010-11-03 03:13:23 UTC (rev 9834)
@@ -39,7 +39,7 @@
boolean isPersistent();
- public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator();
+ public LinkedListIterator<PagedReferenceImpl> iterator();
// To be called when the cursor is closed for good. Most likely when the queue is deleted
void close() throws Exception;
@@ -52,8 +52,14 @@
void enableAutoCleanup();
- void ack(PagePosition position) throws Exception;
+ void ack(PagedReference ref) throws Exception;
+ // for internal (cursor) classes
+ void ack(PagePosition ref) throws Exception;
+
+ void ackTx(Transaction tx, PagedReference position) throws Exception;
+
+ // for internal (cursor) classes
void ackTx(Transaction tx, PagePosition position) throws Exception;
/**
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReference.java 2010-11-03 03:13:23 UTC (rev 9834)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.server.MessageReference;
+
+/**
+ * A PagedReference
+ *
+ * @author clebert
+ *
+ *
+ */
+public interface PagedReference extends MessageReference
+{
+ PagePosition getPosition();
+
+ PagedMessage getPagedMessage();
+}
Added: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.cursor;
+
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ * A InternalReference
+ *
+ * @author clebert
+ *
+ *
+ */
+public class PagedReferenceImpl implements PagedReference
+{
+
+ private static final long serialVersionUID = -8640232251318264710L;
+
+ private PagePosition a;
+ private PagedMessage b;
+
+
+ public ServerMessage getMessage()
+ {
+ return b.getMessage();
+ }
+
+ public PagedMessage getPagedMessage()
+ {
+ return b;
+ }
+
+ public PagePosition getPosition()
+ {
+ return a;
+ }
+
+ public PagedReferenceImpl(PagePosition a, PagedMessage b)
+ {
+ this.a = a;
+ this.b = b;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
+ */
+ public MessageReference copy(Queue queue)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getScheduledDeliveryTime()
+ */
+ public long getScheduledDeliveryTime()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#setScheduledDeliveryTime(long)
+ */
+ public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getDeliveryCount()
+ */
+ public int getDeliveryCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#setDeliveryCount(int)
+ */
+ public void setDeliveryCount(int deliveryCount)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#incrementDeliveryCount()
+ */
+ public void incrementDeliveryCount()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#decrementDeliveryCount()
+ */
+ public void decrementDeliveryCount()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getQueue()
+ */
+ public Queue getQueue()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#handled()
+ */
+ public void handled()
+ {
+ // TODO Auto-generated method stub
+
+ }
+}
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
@@ -14,13 +14,11 @@
package org.hornetq.core.paging.cursor.impl;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import org.hornetq.api.core.Pair;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
@@ -29,12 +27,11 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.utils.ConcurrentHashSet;
-import org.hornetq.utils.ConcurrentSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
@@ -124,12 +121,12 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
*/
- public Pair<PagePosition, PagedMessage> getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
+ public PagedReferenceImpl getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
{
while (true)
{
- Pair<PagePosition, PagedMessage> retPos = internalGetNext(cursorPos);
+ PagedReferenceImpl retPos = internalGetNext(cursorPos);
if (retPos == null)
{
@@ -137,15 +134,15 @@
}
else if (retPos != null)
{
- cursorPos = retPos.a;
- if (retPos.b.getTransactionID() != 0)
+ cursorPos = retPos.getPosition();
+ if (retPos.getPagedMessage().getTransactionID() != 0)
{
- PageTransactionInfo tx = pagingManager.getTransaction(retPos.b.getTransactionID());
+ PageTransactionInfo tx = pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
if (tx == null)
{
- log.warn("Couldn't locate page transaction " + retPos.b.getTransactionID() +
+ log.warn("Couldn't locate page transaction " + retPos.getPagedMessage().getTransactionID() +
", ignoring message on position " +
- retPos.a);
+ retPos.getPosition());
cursor.positionIgnored(cursorPos);
}
else
@@ -164,7 +161,7 @@
}
}
- private Pair<PagePosition, PagedMessage> internalGetNext(final PagePosition pos)
+ private PagedReferenceImpl internalGetNext(final PagePosition pos)
{
PagePosition retPos = pos.nextMessage();
@@ -191,7 +188,7 @@
if (serverMessage != null)
{
- return new Pair<PagePosition, PagedMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
+ return new PagedReferenceImpl(retPos, cache.getMessage(retPos.getMessageNr()));
}
else
{
Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2010-11-03 03:13:23 UTC (rev 9834)
@@ -17,29 +17,26 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.api.core.Pair;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
@@ -158,7 +155,7 @@
ack(position);
}
- class CursorIterator implements LinkedListIterator<Pair<PagePosition, PagedMessage>>
+ class CursorIterator implements LinkedListIterator<PagedReferenceImpl>
{
PagePosition position = getLastPosition();
@@ -170,7 +167,7 @@
/** next element taken on hasNext test.
* it has to be delivered on next next operation */
- Pair<PagePosition, PagedMessage> cachedNext;
+ PagedReferenceImpl cachedNext;
public void repeat()
{
@@ -194,12 +191,12 @@
/* (non-Javadoc)
* @see java.util.Iterator#next()
*/
- public Pair<PagePosition, PagedMessage> next()
+ public PagedReferenceImpl next()
{
if (cachedNext != null)
{
- Pair<PagePosition, PagedMessage> retPos = cachedNext;
+ PagedReferenceImpl retPos = cachedNext;
cachedNext = null;
return retPos;
}
@@ -215,10 +212,10 @@
isredelivery = false;
}
- Pair<PagePosition, PagedMessage> nextPos = moveNext(position);
+ PagedReferenceImpl nextPos = moveNext(position);
if (nextPos != null)
{
- position = nextPos.a;
+ position = nextPos.getPosition();
}
return nextPos;
}
@@ -257,15 +254,15 @@
}
}
- private Pair<PagePosition, PagedMessage> getMessage(PagePosition pos) throws Exception
+ private PagedReferenceImpl getMessage(PagePosition pos) throws Exception
{
- return new Pair<PagePosition, PagedMessage>(pos, cursorProvider.getMessage(pos));
+ return new PagedReferenceImpl(pos, cursorProvider.getMessage(pos));
}
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#iterator()
*/
- public LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator()
+ public LinkedListIterator<PagedReferenceImpl> iterator()
{
return new CursorIterator();
}
@@ -275,11 +272,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
*/
- public synchronized Pair<PagePosition, PagedMessage> moveNext(PagePosition position) throws Exception
+ public synchronized PagedReferenceImpl moveNext(PagePosition position) throws Exception
{
boolean match = false;
- Pair<PagePosition, PagedMessage> message = null;
+ PagedReferenceImpl message = null;
PagePosition tmpPosition = position;
@@ -294,25 +291,24 @@
}
else
{
- PageCursorInfo info = getPageInfo(message.a, false);
- if (info != null && info.isRemoved(message.a))
+ PageCursorInfo info = getPageInfo(message.getPosition(), false);
+ if (info != null && info.isRemoved(message.getPosition()))
{
- tmpPosition = message.a;
+ tmpPosition = message.getPosition();
valid = false;
}
}
if (valid)
{
- tmpPosition = message.a;
+ tmpPosition = message.getPosition();
- match = match(message.b.getMessage());
+ match = match(message.getMessage());
if (!match)
{
- processACK(message.a);
+ processACK(message.getPosition());
}
}
-
}
while (message != null && !match);
@@ -337,9 +333,13 @@
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
*/
+ public void ack(final PagedReference position) throws Exception
+ {
+ ack(position.getPosition());
+ }
+
public void ack(final PagePosition position) throws Exception
{
-
// if we are dealing with a persistent cursor
if (persistent)
{
@@ -371,6 +371,12 @@
}
+
+ public void ackTx(final Transaction tx, final PagedReference position) throws Exception
+ {
+ ackTx(tx, position.getPosition());
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
*/
@@ -556,21 +562,21 @@
// looking for holes on the ack list for redelivery
while (true)
{
- Pair<PagePosition, PagedMessage> msgCheck = cursorProvider.getNext(this, tmpPos);
+ PagedReferenceImpl msgCheck = cursorProvider.getNext(this, tmpPos);
positions = getPageInfo(tmpPos);
// end of the hole, we can finish processing here
// It may be also that the next was just a next page, so we just ignore it
- if (msgCheck == null || msgCheck.a.equals(pos))
+ if (msgCheck == null || msgCheck.getPosition().equals(pos))
{
break;
}
else
{
- if (match(msgCheck.b.getMessage()))
+ if (match(msgCheck.getMessage()))
{
- redeliver(msgCheck.a);
+ redeliver(msgCheck.getPosition());
}
else
{
@@ -580,7 +586,7 @@
positions.confirmed.incrementAndGet();
}
}
- tmpPos = msgCheck.a;
+ tmpPos = msgCheck.getPosition();
}
}
}
Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-02 22:44:05 UTC (rev 9833)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java 2010-11-03 03:13:23 UTC (rev 9834)
@@ -21,7 +21,6 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
@@ -30,12 +29,13 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageCache;
-import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
-import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
+import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
+import org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -120,14 +120,14 @@
PageSubscription cursor = lookupPageStore(ADDRESS).getCursorProvier().getSubscription(queue.getID());
- Pair<PagePosition, PagedMessage> msg;
+ PagedReferenceImpl msg;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
int key = 0;
while ((msg = iterator.next()) != null)
{
- assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg.getPosition());
}
assertEquals(NUM_MESSAGES, key);
@@ -205,30 +205,30 @@
queue.getPageSubscription().close();
- Pair<PagePosition, PagedMessage> msg;
+ PagedReferenceImpl msg;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorEven = cursorEven.iterator();
+ LinkedListIterator<PagedReferenceImpl> iteratorEven = cursorEven.iterator();
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iteratorOdd = cursorOdd.iterator();
+ LinkedListIterator<PagedReferenceImpl> iteratorOdd = cursorOdd.iterator();
int key = 0;
while ((msg = iteratorEven.next()) != null)
{
System.out.println("Received" + msg);
- assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
- assertTrue(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+ assertTrue(msg.getMessage().getBooleanProperty("even").booleanValue());
key += 2;
- cursorEven.ack(msg.a);
+ cursorEven.ack(msg.getPosition());
}
assertEquals(NUM_MESSAGES, key);
key = 1;
while ((msg = iteratorOdd.next()) != null)
{
- assertEquals(key, msg.b.getMessage().getIntProperty("key").intValue());
- assertFalse(msg.b.getMessage().getBooleanProperty("even").booleanValue());
+ assertEquals(key, msg.getMessage().getIntProperty("key").intValue());
+ assertFalse(msg.getMessage().getBooleanProperty("even").booleanValue());
key += 2;
- cursorOdd.ack(msg.a);
+ cursorOdd.ack(msg.getPosition());
}
assertEquals(NUM_MESSAGES + 1, key);
@@ -285,18 +285,18 @@
System.out.println("Cursor: " + cursor);
cursorProvider.printDebug();
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < 1000; i++)
{
System.out.println("Reading Msg : " + i);
- Pair<PagePosition, PagedMessage> msg = iterator.next();
+ PagedReferenceImpl msg = iterator.next();
assertNotNull(msg);
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < firstPageSize)
{
- cursor.ack(msg.a);
+ cursor.ack(msg);
}
}
cursorProvider.printDebug();
@@ -319,11 +319,11 @@
for (int i = firstPageSize; i < NUM_MESSAGES; i++)
{
System.out.println("Received " + i);
- Pair<PagePosition, PagedMessage> msg = iterator.next();
+ PagedReferenceImpl msg = iterator.next();
assertNotNull(msg);
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ cursor.ack(msg);
OperationContextImpl.getContext(null).waitCompletion();
@@ -361,14 +361,14 @@
.getSubscription(queue.getID());
System.out.println("Cursor: " + cursor);
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
- cursor.ack(msg.a);
+ cursor.ack(msg);
}
}
@@ -383,16 +383,16 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
}
server.stop();
@@ -422,15 +422,15 @@
Transaction tx = new TransactionImpl(server.getStorageManager(), 60 * 1000);
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < 100; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
if (i < 10 || i > 20)
{
- cursor.ackTx(tx, msg.a);
+ cursor.ackTx(tx, msg);
}
}
@@ -449,16 +449,16 @@
for (int i = 10; i <= 20; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx, msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ackTx(tx, msg);
}
for (int i = 100; i < NUM_MESSAGES; i++)
{
- Pair<PagePosition, PagedMessage> msg = iterator.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ackTx(tx, msg.a);
+ PagedReferenceImpl msg = iterator.next();
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ackTx(tx, msg);
}
tx.commit();
@@ -490,7 +490,7 @@
System.out.println("Cursor: " + cursor);
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
for (int i = 0; i < NUM_MESSAGES; i++)
{
@@ -506,11 +506,11 @@
Assert.assertTrue(pageStore.page(msg));
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
assertNotNull(readMessage);
- assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
assertNull(iterator.next());
}
@@ -544,11 +544,11 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
assertNotNull(readMessage);
- assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
}
server.stop();
@@ -580,20 +580,20 @@
Assert.assertTrue(pageStore.page(msg));
}
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
assertNotNull(readMessage);
- cursor.ack(readMessage.a);
+ cursor.ack(readMessage);
- assertEquals(i, readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, readMessage.getMessage().getIntProperty("key").intValue());
}
- Pair<PagePosition, PagedMessage> readMessage = iterator.next();
+ PagedReferenceImpl readMessage = iterator.next();
- assertEquals(NUM_MESSAGES * 3, readMessage.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(NUM_MESSAGES * 3, readMessage.getMessage().getIntProperty("key").intValue());
- cursor.ack(readMessage.a);
+ cursor.ack(readMessage);
server.getStorageManager().waitOnOperations();
@@ -647,7 +647,7 @@
.getPageStore(ADDRESS)
.getCursorProvier()
.getSubscription(queue.getID());
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
System.out.println("Cursor: " + cursor);
@@ -676,10 +676,10 @@
// First consume what's already there without any tx as nothing was committed
for (int i = 300; i < 400; i++)
{
- Pair<PagePosition, PagedMessage> pos = iterator.next();
+ PagedReferenceImpl pos = iterator.next();
assertNotNull("Null at position " + i, pos);
- assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos.a);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
}
assertNull(iterator.next());
@@ -693,10 +693,10 @@
// Second:after pgtxCommit was done
for (int i = 200; i < 300; i++)
{
- Pair<PagePosition, PagedMessage> pos = iterator.next();
+ PagedReferenceImpl pos = iterator.next();
assertNotNull(pos);
- assertEquals(i, pos.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(pos.a);
+ assertEquals(i, pos.getMessage().getIntProperty("key").intValue());
+ cursor.ack(pos);
}
assertNull(iterator.next());
@@ -724,15 +724,15 @@
queue.getPageSubscription().close();
- Pair<PagePosition, PagedMessage> msg;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator2 = cursor.iterator();
+ PagedReferenceImpl msg;
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator2 = cursor.iterator();
int key = 0;
while ((msg = iterator.next()) != null)
{
- assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msg.a);
+ assertEquals(key++, msg.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msg);
}
assertEquals(NUM_MESSAGES, key);
@@ -741,7 +741,7 @@
for (int i = 0; i < 10; i++)
{
msg = iterator2.next();
- assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(i, msg.getMessage().getIntProperty("key").intValue());
}
assertSame(cursor2.getProvider(), cursorProvider);
@@ -803,13 +803,13 @@
msg = null;
cache = null;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
- Pair<PagePosition, PagedMessage> msgCursor = null;
+ PagedReferenceImpl msgCursor = null;
while ((msgCursor = iterator.next()) != null)
{
- assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msgCursor.a);
+ assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor);
}
assertEquals(NUM_MESSAGES, key);
@@ -848,12 +848,12 @@
cache = null;
- LinkedListIterator<Pair<PagePosition, PagedMessage>> iterator = cursor.iterator();
+ LinkedListIterator<PagedReferenceImpl> iterator = cursor.iterator();
- Pair<PagePosition, PagedMessage> msgCursor = null;
+ PagedReferenceImpl msgCursor = null;
while ((msgCursor = iterator.next()) != null)
{
- assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
+ assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
}
assertEquals(NUM_MESSAGES, key);
@@ -869,8 +869,8 @@
iterator = cursor.iterator();
while ((msgCursor = iterator.next()) != null)
{
- assertEquals(key++, msgCursor.b.getMessage().getIntProperty("key").intValue());
- cursor.ack(msgCursor.a);
+ assertEquals(key++, msgCursor.getMessage().getIntProperty("key").intValue());
+ cursor.ack(msgCursor);
}
forceGC();
@@ -902,29 +902,29 @@
PageSubscription cursor = cursorProvider.getSubscription(queue.getID());
- Iterator<Pair<PagePosition, PagedMessage>> iter = cursor.iterator();
+ Iterator<PagedReferenceImpl> iter = cursor.iterator();
- Iterator<Pair<PagePosition, PagedMessage>> iter2 = cursor.iterator();
+ Iterator<PagedReferenceImpl> iter2 = cursor.iterator();
assertTrue(iter.hasNext());
- Pair<PagePosition, PagedMessage> msg1 = iter.next();
+ PagedReferenceImpl msg1 = iter.next();
- Pair<PagePosition, PagedMessage> msg2 = iter2.next();
+ PagedReferenceImpl msg2 = iter2.next();
- assertEquals(tstProperty(msg1.b.getMessage()), tstProperty(msg2.b.getMessage()));
+ assertEquals(tstProperty(msg1.getMessage()), tstProperty(msg2.getMessage()));
- System.out.println("property = " + tstProperty(msg1.b.getMessage()));
+ System.out.println("property = " + tstProperty(msg1.getMessage()));
msg1 = iter.next();
- assertEquals(1, tstProperty(msg1.b.getMessage()));
+ assertEquals(1, tstProperty(msg1.getMessage()));
iter.remove();
msg2 = iter2.next();
- assertEquals(2, tstProperty(msg2.b.getMessage()));
+ assertEquals(2, tstProperty(msg2.getMessage()));
assertTrue(iter2.hasNext());
14 years, 1 month