JBoss hornetq SVN: r11783 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-11-28 23:09:29 -0500 (Mon, 28 Nov 2011)
New Revision: 11783
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
increasing timeout to avoid false test failure.
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-11-29 04:01:26 UTC (rev 11782)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-11-29 04:09:29 UTC (rev 11783)
@@ -4536,7 +4536,7 @@
for (int i = 0; i < 500; i++)
{
log.info("Received message " + i);
- message = cons.receive(5000);
+ message = cons.receive(10000);
assertNotNull(message);
message.acknowledge();
13 years, 3 months
JBoss hornetq SVN: r11782 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic
Date: 2011-11-28 23:01:26 -0500 (Mon, 28 Nov 2011)
New Revision: 11782
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
JBPAPP-7606 - fixing NPE that happened on the replicator
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-28 19:19:19 UTC (rev 11781)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-11-29 04:01:26 UTC (rev 11782)
@@ -58,6 +58,7 @@
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
{
/*needed for backward compatibility*/
+ @SuppressWarnings("unused")
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
/*end of compatibility fixes*/
@@ -82,7 +83,7 @@
private final Set<ClientSessionFactoryInternal> connectingFactories = new HashSet<ClientSessionFactoryInternal>();
- private TransportConfiguration[] initialConnectors;
+ private volatile TransportConfiguration[] initialConnectors;
private DiscoveryGroupConfiguration discoveryGroupConfiguration;
@@ -583,12 +584,15 @@
public ClientSessionFactoryInternal connect() throws Exception
{
- // static list of initial connectors
- if (initialConnectors != null && discoveryGroup == null)
+ synchronized (this)
{
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
- addFactory(sf);
- return sf;
+ // static list of initial connectors
+ if (initialConnectors != null && discoveryGroup == null)
+ {
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)staticConnector.connect();
+ addFactory(sf);
+ return sf;
+ }
}
// wait for discovery group to get the list of initial connectors
return (ClientSessionFactoryInternal)createSessionFactory();
@@ -1454,6 +1458,7 @@
"]";
}
+ @SuppressWarnings("unchecked")
private synchronized void updateArraysAndPairs()
{
Collection<TopologyMember> membersCopy = topology.getMembers();
@@ -1472,13 +1477,14 @@
{
List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+
+ TransportConfiguration[] newInitialconnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
newConnectors.size());
int count = 0;
for (DiscoveryEntry entry : newConnectors)
{
- this.initialConnectors[count++] = entry.getConnector();
+ newInitialconnectors[count++] = entry.getConnector();
if (ha && topology.getMember(entry.getNodeID()) == null)
{
@@ -1487,18 +1493,35 @@
topology.updateMember(0, entry.getNodeID(), member);
}
}
+
+ this.initialConnectors = newInitialconnectors;
if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
{
- // FIXME the node is alone in the cluster. We create a connection to the new node
+ // The node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
- try
+
+ Runnable connectRunnable = new Runnable()
{
- connect();
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ };
+ if (startExecutor != null)
+ {
+ startExecutor.execute(connectRunnable);
}
- catch (Exception e)
+ else
{
- e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+ connectRunnable.run();
}
}
}
13 years, 3 months
JBoss hornetq SVN: r11781 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-28 14:19:19 -0500 (Mon, 28 Nov 2011)
New Revision: 11781
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5747 - synchronizing access on consumers and producers avoiding close / rollback through multiple threads from unbehaved applications
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 17:03:24 UTC (rev 11780)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 19:19:19 UTC (rev 11781)
@@ -594,13 +594,11 @@
stop();
}
- synchronized (consumers)
+
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.clear(true);
- }
+ consumer.clear(true);
}
// Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -675,7 +673,7 @@
if (!started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
clientConsumerInternal.start();
}
@@ -697,12 +695,9 @@
if (started)
{
- synchronized (consumers)
+ for (ClientConsumerInternal clientConsumerInternal : cloneConsumers())
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
- {
- clientConsumerInternal.stop(waitForOnMessage);
- }
+ clientConsumerInternal.stop(waitForOnMessage);
}
channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
@@ -1102,7 +1097,7 @@
// Now start the session if it was already started
if (started)
{
- for (ClientConsumerInternal consumer : consumers.values())
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
consumer.clearAtFailover();
consumer.start();
@@ -1538,13 +1533,10 @@
stop(false);
}
- synchronized (consumers)
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.clear(false);
- }
+ consumer.clear(false);
}
flushAcks();
@@ -1928,51 +1920,57 @@
private void cleanUpChildren() throws Exception
{
- Set<ClientConsumerInternal> consumersClone;
-
- synchronized (consumers)
- {
- consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
- }
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumerInternal consumer : consumersClone)
{
consumer.cleanUp();
}
+ Set<ClientProducerInternal> producersClone = cloneProducers();
+
+ for (ClientProducerInternal producer : producersClone)
+ {
+ producer.cleanUp();
+ }
+ }
+
+ /**
+ * @return
+ */
+ private Set<ClientProducerInternal> cloneProducers()
+ {
Set<ClientProducerInternal> producersClone;
synchronized (producers)
{
producersClone = new HashSet<ClientProducerInternal>(producers);
}
+ return producersClone;
+ }
- for (ClientProducerInternal producer : producersClone)
+ /**
+ * @return
+ */
+ private Set<ClientConsumerInternal> cloneConsumers()
+ {
+ synchronized (consumers)
{
- producer.cleanUp();
+ return new HashSet<ClientConsumerInternal>(consumers.values());
}
}
private void closeChildren() throws HornetQException
{
- Set<ClientConsumer> consumersClone;
- synchronized (consumers)
- {
- consumersClone = new HashSet<ClientConsumer>(consumers.values());
- }
+ Set<ClientConsumerInternal> consumersClone = cloneConsumers();
for (ClientConsumer consumer : consumersClone)
{
consumer.close();
}
- Set<ClientProducer> producersClone;
+ Set<ClientProducerInternal> producersClone = cloneProducers();
- synchronized (producers)
- {
- producersClone = new HashSet<ClientProducer>(producers);
- }
-
for (ClientProducer producer : producersClone)
{
producer.close();
@@ -1981,12 +1979,9 @@
private void flushAcks() throws HornetQException
{
- synchronized (consumers)
+ for (ClientConsumerInternal consumer : cloneConsumers())
{
- for (ClientConsumerInternal consumer : consumers.values())
- {
- consumer.flushAcks();
- }
+ consumer.flushAcks();
}
}
13 years, 3 months
JBoss hornetq SVN: r11780 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-11-28 12:03:24 -0500 (Mon, 28 Nov 2011)
New Revision: 11780
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-5747 - synchronizing access on consumers and producers avoiding close / rollback through multiple threads from unbehaved applications
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 12:55:00 UTC (rev 11779)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-11-28 17:03:24 UTC (rev 11780)
@@ -594,10 +594,13 @@
stop();
}
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ synchronized (consumers)
{
- consumer.clear(true);
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clear(true);
+ }
}
// Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -694,9 +697,12 @@
if (started)
{
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ synchronized (consumers)
{
- clientConsumerInternal.stop(waitForOnMessage);
+ for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ {
+ clientConsumerInternal.stop(waitForOnMessage);
+ }
}
channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
@@ -816,7 +822,10 @@
public void addProducer(final ClientProducerInternal producer)
{
- producers.add(producer);
+ synchronized (producers)
+ {
+ producers.add(producer);
+ }
}
public void removeConsumer(final ClientConsumerInternal consumer) throws HornetQException
@@ -829,12 +838,15 @@
public void removeProducer(final ClientProducerInternal producer)
{
- producers.remove(producer);
+ synchronized (producers)
+ {
+ producers.remove(producer);
+ }
}
public void handleReceiveMessage(final long consumerID, final SessionReceiveMessage message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -850,7 +862,7 @@
public void handleReceiveLargeMessage(final long consumerID, final SessionReceiveLargeMessage message) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -860,7 +872,7 @@
public void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception
{
- ClientConsumerInternal consumer = consumers.get(consumerID);
+ ClientConsumerInternal consumer = getConsumer(consumerID);
if (consumer != null)
{
@@ -1526,10 +1538,13 @@
stop(false);
}
- // We need to make sure we don't get any inflight messages
- for (ClientConsumerInternal consumer : consumers.values())
+ synchronized (consumers)
{
- consumer.clear(false);
+ // We need to make sure we don't get any inflight messages
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clear(false);
+ }
}
flushAcks();
@@ -1871,6 +1886,19 @@
});
}
+
+ /**
+ * @param consumerID
+ * @return
+ */
+ private ClientConsumerInternal getConsumer(final long consumerID)
+ {
+ synchronized (consumers)
+ {
+ ClientConsumerInternal consumer = consumers.get(consumerID);
+ return consumer;
+ }
+ }
private void doCleanup(boolean failingOver)
{
@@ -1900,14 +1928,24 @@
private void cleanUpChildren() throws Exception
{
- Set<ClientConsumerInternal> consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
+ Set<ClientConsumerInternal> consumersClone;
+
+ synchronized (consumers)
+ {
+ consumersClone = new HashSet<ClientConsumerInternal>(consumers.values());
+ }
for (ClientConsumerInternal consumer : consumersClone)
{
consumer.cleanUp();
}
- Set<ClientProducerInternal> producersClone = new HashSet<ClientProducerInternal>(producers);
+ Set<ClientProducerInternal> producersClone;
+
+ synchronized (producers)
+ {
+ producersClone = new HashSet<ClientProducerInternal>(producers);
+ }
for (ClientProducerInternal producer : producersClone)
{
@@ -1917,15 +1955,24 @@
private void closeChildren() throws HornetQException
{
- Set<ClientConsumer> consumersClone = new HashSet<ClientConsumer>(consumers.values());
+ Set<ClientConsumer> consumersClone;
+ synchronized (consumers)
+ {
+ consumersClone = new HashSet<ClientConsumer>(consumers.values());
+ }
for (ClientConsumer consumer : consumersClone)
{
consumer.close();
}
- Set<ClientProducer> producersClone = new HashSet<ClientProducer>(producers);
+ Set<ClientProducer> producersClone;
+ synchronized (producers)
+ {
+ producersClone = new HashSet<ClientProducer>(producers);
+ }
+
for (ClientProducer producer : producersClone)
{
producer.close();
13 years, 3 months
JBoss hornetq SVN: r11779 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-11-28 07:55:00 -0500 (Mon, 28 Nov 2011)
New Revision: 11779
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
Log:
Test problem
1. HornetQServerImpl is started twice in the setUp() method of the test
2. Add a check in HornetQServerImpl.start() to avoid being started twice
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-28 12:48:14 UTC (rev 11778)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-28 12:55:00 UTC (rev 11779)
@@ -335,6 +335,13 @@
public synchronized void start() throws Exception
{
+ if (started)
+ {
+ log.debug("Server already started!");
+ return;
+ }
+
+ log.debug("Starting server " + this);
OperationContextImpl.clearContext();
try
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-11-28 12:48:14 UTC (rev 11778)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-11-28 12:55:00 UTC (rev 11779)
@@ -1177,7 +1177,6 @@
conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
conf.setFileDeploymentEnabled(false);
server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
- server.start();
serverManager = new JMSServerManagerImpl(server);
context = new InVMContext();
13 years, 3 months
JBoss hornetq SVN: r11778 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-28 07:48:14 -0500 (Mon, 28 Nov 2011)
New Revision: 11778
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Reduce code duplication.
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-11-28 12:47:51 UTC (rev 11777)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-11-28 12:48:14 UTC (rev 11778)
@@ -708,9 +708,9 @@
}
}
-
- protected final void receiveMessagesAndAck(ClientConsumer consumer, final int start, int msgCount)
- throws HornetQException
+ protected final void
+ receiveMessages(ClientConsumer consumer, final int start, final int msgCount, final boolean ack)
+ throws HornetQException
{
for (int i = start; i < msgCount; i++)
{
@@ -718,7 +718,8 @@
Assert.assertNotNull("Expecting a message " + i, message);
assertMessageBody(i, message);
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- message.acknowledge();
+ if (ack)
+ message.acknowledge();
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-11-28 12:47:51 UTC (rev 11777)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-11-28 12:48:14 UTC (rev 11778)
@@ -182,7 +182,7 @@
{
session.start();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- receiveMessagesAndAck(consumer, start, end);
+ receiveMessages(consumer, start, end, true);
consumer.close();
session.commit();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-28 12:47:51 UTC (rev 11777)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-28 12:48:14 UTC (rev 11778)
@@ -1077,17 +1077,8 @@
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer, 0, NUM_MESSAGES, false);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- }
-
crash(session);
receiveDurableMessages(consumer);
@@ -1209,7 +1200,7 @@
}
// Should get the same ones after failover since we didn't ack
- receiveMessagesAndAck(consumer, NUM_MESSAGES, NUM_MESSAGES * 2);
+ receiveMessages(consumer, NUM_MESSAGES, NUM_MESSAGES * 2, true);
session.close();
@@ -1218,7 +1209,7 @@
private void receiveMessages(ClientConsumer consumer) throws HornetQException
{
- receiveMessagesAndAck(consumer, 0, NUM_MESSAGES);
+ receiveMessages(consumer, 0, NUM_MESSAGES, true);
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
13 years, 3 months
JBoss hornetq SVN: r11777 - trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-28 07:47:51 -0500 (Mon, 28 Nov 2011)
New Revision: 11777
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 Do not replicate non-durable paged messages.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-28 12:47:33 UTC (rev 11776)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-28 12:47:51 UTC (rev 11777)
@@ -142,7 +142,7 @@
// Message journal record types
// This is used when a large message is created but not yet stored on the system.
- // We use this to avoid temporary files missing
+ // Used to avoid temporary files missing
public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
private static final byte ADD_LARGE_MESSAGE = 30;
@@ -543,7 +543,8 @@
return datafiles;
}
- public void waitOnOperations() throws Exception
+ @Override
+ public void waitOnOperations() throws Exception
{
if (!started)
{
@@ -568,24 +569,52 @@
public void pageClosed(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
- {
- replicator.pageClosed(storeName, pageNumber);
- }
+ {
+ readLock();
+ try
+ {
+ replicator.pageClosed(storeName, pageNumber);
+ }
+ finally
+ {
+ readUnLock();
+ }
+ }
}
- public void pageDeleted(final SimpleString storeName, final int pageNumber)
- {
- if (isReplicated())
- {
- replicator.pageDeleted(storeName, pageNumber);
- }
- }
+ @Override
+ public void pageDeleted(final SimpleString storeName, final int pageNumber)
+ {
+ if (isReplicated())
+ {
+ readLock();
+ try
+ {
+ replicator.pageDeleted(storeName, pageNumber);
+ }
+ finally
+ {
+ readUnLock();
+ }
+ }
+ }
+ @Override
public void pageWrite(final PagedMessage message, final int pageNumber)
{
if (isReplicated())
{
- replicator.pageWrite(message, pageNumber);
+ if (!message.getMessage().isDurable())
+ return;
+ readLock();
+ try
+ {
+ replicator.pageWrite(message, pageNumber);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
}
13 years, 3 months
JBoss hornetq SVN: r11776 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster: util and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-28 07:47:33 -0500 (Mon, 28 Nov 2011)
New Revision: 11776
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
Improve test failure messages
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-28 12:47:17 UTC (rev 11775)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-28 12:47:33 UTC (rev 11776)
@@ -428,7 +428,7 @@
message = consumer.receiveImmediate();
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting a message", message);
Assert.assertEquals(counter, message.getIntProperty("counter").intValue());
session.close();
@@ -1427,7 +1427,7 @@
committer.join();
- Assert.assertFalse(committer.failed);
+ Assert.assertFalse("second attempt succeed?", committer.failed);
session.close();
@@ -1462,13 +1462,13 @@
}
catch (HornetQException e)
{
- assertEquals(HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
+ assertEquals(e.getMessage(), HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
}
- ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
session2.start();
+ ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
receiveMessages(consumer);
ClientMessage message = consumer.receiveImmediate();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-28 12:47:17 UTC (rev 11775)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-11-28 12:47:33 UTC (rev 11776)
@@ -165,7 +165,7 @@
for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
{
- Assert.assertTrue("expecting " + LARGE_MESSAGE_SIZE + " bytes, got " + j, buffer.readable());
+ Assert.assertTrue("msg " + i + ", expecting " + LARGE_MESSAGE_SIZE + " bytes, got " + j, buffer.readable());
Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-11-28 12:47:17 UTC (rev 11775)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2011-11-28 12:47:33 UTC (rev 11776)
@@ -100,7 +100,8 @@
{
// Wait to be informed of failure
boolean ok = listener.getLatch().await(10000, TimeUnit.MILLISECONDS);
- Assert.assertTrue("Failed to stop the server! Latch count is " + listener.getLatch().getCount(), ok);
+ Assert.assertTrue("Failed to stop the server! Latch count is " + listener.getLatch().getCount() + " out of " +
+ sessions.length, ok);
}
}
13 years, 3 months
JBoss hornetq SVN: r11775 - trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-28 07:47:17 -0500 (Mon, 28 Nov 2011)
New Revision: 11775
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Remove "(non-javadoc)" method comments.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-28 12:47:03 UTC (rev 11774)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-28 12:47:17 UTC (rev 11775)
@@ -553,9 +553,7 @@
waitOnOperations(0);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
- */
+ @Override
public boolean waitOnOperations(final long timeout) throws Exception
{
if (!started)
@@ -566,12 +564,8 @@
return getContext().waitCompletion(timeout);
}
- /*
- *
- * (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString, int)
- */
- public void pageClosed(final SimpleString storeName, final int pageNumber)
+ @Override
+ public void pageClosed(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
{
@@ -579,9 +573,6 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString, int)
- */
public void pageDeleted(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
@@ -590,11 +581,6 @@
}
}
- /*
- * (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString,
- * int, org.hornetq.api.core.buffers.ChannelBuffer)
- */
public void pageWrite(final PagedMessage message, final int pageNumber)
{
if (isReplicated())
@@ -603,9 +589,6 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#getContext()
- */
public OperationContext getContext()
{
return OperationContextImpl.getContext(executorFactory);
@@ -626,9 +609,6 @@
return newContext(singleThreadExecutor);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#newContext()
- */
public OperationContext newContext(final Executor executor)
{
return new OperationContextImpl(executor);
@@ -4074,16 +4054,10 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
- */
public void beforeRollback(Transaction tx) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
- */
public void afterRollback(Transaction tx)
{
for (Long msg : confirmedMessages)
13 years, 3 months
JBoss hornetq SVN: r11774 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-11-28 07:47:03 -0500 (Mon, 28 Nov 2011)
New Revision: 11774
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Make sure test won't hide errors.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-28 12:42:57 UTC (rev 11773)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-11-28 12:47:03 UTC (rev 11774)
@@ -1342,6 +1342,7 @@
locator.setReconnectAttempts(-1);
locator.setBlockOnAcknowledge(true);
+
sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = createSession(sf, false, false);
@@ -1400,7 +1401,7 @@
}
catch (HornetQException e2)
{
-
+ throw new RuntimeException(e2);
}
}
@@ -1457,6 +1458,7 @@
try
{
session2.commit();
+ fail("expecting DUPLICATE_ID_REJECTED exception");
}
catch (HornetQException e)
{
13 years, 3 months