JBoss hornetq SVN: r12146 - trunk/hornetq-core/src/test/resources.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-20 07:17:31 -0500 (Mon, 20 Feb 2012)
New Revision: 12146
Modified:
trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
Log:
HORNETQ-855 fix duplicate timeout configuration nodes
Modified: trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
===================================================================
--- trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2012-02-20 08:57:26 UTC (rev 12145)
+++ trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2012-02-20 12:17:31 UTC (rev 12146)
@@ -174,20 +174,19 @@
<check-period>331</check-period>
<connection-ttl>3370</connection-ttl>
<min-large-message-size>321</min-large-message-size>
- <call-timeout>123</call-timeout>
+ <call-timeout>123</call-timeout>
<retry-interval>3</retry-interval>
<retry-interval-multiplier>0.25</retry-interval-multiplier>
<max-retry-interval>10000</max-retry-interval>
<reconnect-attempts>72</reconnect-attempts>
- <use-duplicate-detection>true</use-duplicate-detection>
- <forward-when-no-consumers>false</forward-when-no-consumers>
- <max-hops>1</max-hops>
- <call-timeout>123</call-timeout>
- <call-failover-timeout>123</call-failover-timeout>
- <static-connectors>
- <connector-ref>connector1</connector-ref>
- <connector-ref>connector2</connector-ref>
- </static-connectors>
+ <use-duplicate-detection>true</use-duplicate-detection>
+ <forward-when-no-consumers>false</forward-when-no-consumers>
+ <max-hops>1</max-hops>
+ <call-failover-timeout>123</call-failover-timeout>
+ <static-connectors>
+ <connector-ref>connector1</connector-ref>
+ <connector-ref>connector2</connector-ref>
+ </static-connectors>
</cluster-connection>
<cluster-connection name="cluster-connection2">
<address>queues2</address>
@@ -197,7 +196,6 @@
<use-duplicate-detection>false</use-duplicate-detection>
<forward-when-no-consumers>true</forward-when-no-consumers>
<max-hops>2</max-hops>
- <call-timeout>456</call-timeout>
<call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
</cluster-connection>
12 years, 10 months
JBoss hornetq SVN: r12145 - trunk/hornetq-core/src/main/java/org/hornetq/core/replication.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2012-02-20 03:57:26 -0500 (Mon, 20 Feb 2012)
New Revision: 12145
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
Log:
https://community.jboss.org/thread/195519
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-17 22:47:37 UTC (rev 12144)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-02-20 08:57:26 UTC (rev 12145)
@@ -119,6 +119,9 @@
private boolean started;
private QuorumManager quorumManager;
+
+ //https://community.jboss.org/thread/195519
+ private Object stopLock = new Object();
// Constructors --------------------------------------------------
public ReplicationEndpoint(final HornetQServerImpl server, IOCriticalErrorListener criticalErrorListener)
@@ -156,67 +159,76 @@
try
{
- if (type == PacketImpl.REPLICATION_APPEND)
+ synchronized (stopLock)
{
- handleAppendAddRecord((ReplicationAddMessage)packet);
+ if (!started)
+ {
+ return;
+ }
+
+ if (type == PacketImpl.REPLICATION_APPEND)
+ {
+ handleAppendAddRecord((ReplicationAddMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_APPEND_TX)
+ {
+ handleAppendAddTXRecord((ReplicationAddTXMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE)
+ {
+ handleAppendDelete((ReplicationDeleteMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_DELETE_TX)
+ {
+ handleAppendDeleteTX((ReplicationDeleteTXMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PREPARE)
+ {
+ handlePrepare((ReplicationPrepareMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ {
+ handleCommitRollback((ReplicationCommitMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
+ {
+ handlePageWrite((ReplicationPageWriteMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
+ {
+ handlePageEvent((ReplicationPageEventMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
+ {
+ handleLargeMessageBegin((ReplicationLargeMessageBeingMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
+ {
+ handleLargeMessageWrite((ReplicationLargeMessageWriteMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
+ {
+ handleLargeMessageEnd((ReplicationLargeMessageEndMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
+ {
+ handleCompareDataMessage((ReplicationCompareDataMessage) packet);
+ response = new NullResponseMessage();
+ }
+ else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
+ {
+ handleStartReplicationSynchronization((ReplicationStartSyncMessage) packet);
+ }
+ else if (type == PacketImpl.REPLICATION_SYNC_FILE)
+ {
+ handleReplicationSynchronization((ReplicationSyncFileMessage) packet);
+ }
+ else
+ {
+ log.warn("Packet " + packet
+ + " can't be processed by the ReplicationEndpoint");
+ }
}
- else if (type == PacketImpl.REPLICATION_APPEND_TX)
- {
- handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE)
- {
- handleAppendDelete((ReplicationDeleteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_DELETE_TX)
- {
- handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PREPARE)
- {
- handlePrepare((ReplicationPrepareMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
- {
- handleCommitRollback((ReplicationCommitMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
- {
- handlePageWrite((ReplicationPageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
- {
- handlePageEvent((ReplicationPageEventMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
- {
- handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
- {
- handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
- {
- handleLargeMessageEnd((ReplicationLargeMessageEndMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
- {
- handleCompareDataMessage((ReplicationCompareDataMessage)packet);
- response = new NullResponseMessage();
- }
- else if (type == PacketImpl.REPLICATION_START_FINISH_SYNC)
- {
- handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
- }
- else if (type == PacketImpl.REPLICATION_SYNC_FILE)
- {
- handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
- }
- else
- {
- log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
- }
}
catch (HornetQException e)
{
@@ -280,64 +292,68 @@
public synchronized void stop() throws Exception
{
- if (!started)
+ synchronized (stopLock)
{
- return;
- }
+ if (!started)
+ {
+ return;
+ }
- // Channel may be null if there isn't a connection to a live server
- if (channel != null)
- {
- channel.close();
- }
+ // Channel may be null if there isn't a connection to a live server
+ if (channel != null)
+ {
+ channel.close();
+ }
- for (ConcurrentMap<Integer, Page> map : pageIndex.values())
- {
- for (Page page : map.values())
+ for (ConcurrentMap<Integer, Page> map : pageIndex.values())
{
- try
+ for (Page page : map.values())
{
- page.close();
+ try
+ {
+ page.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error while closing the page on backup", e);
+ }
}
- catch (Exception e)
- {
- log.warn("Error while closing the page on backup", e);
- }
}
- }
- pageIndex.clear();
+ pageIndex.clear();
- for (ReplicatedLargeMessage largeMessage : largeMessages.values())
- {
- largeMessage.releaseResources();
- }
- largeMessages.clear();
+ for (ReplicatedLargeMessage largeMessage : largeMessages.values())
+ {
+ largeMessage.releaseResources();
+ }
+ largeMessages.clear();
- for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry : filesReservedForSync.entrySet())
- {
- for (JournalSyncFile filesReserved : entry.getValue().values())
+ for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry : filesReservedForSync
+ .entrySet())
{
- filesReserved.close();
+ for (JournalSyncFile filesReserved : entry.getValue().values())
+ {
+ filesReserved.close();
+ }
}
- }
- filesReservedForSync.clear();
- if (journals != null)
- {
- for (Journal j : journals)
+ filesReservedForSync.clear();
+ if (journals != null)
{
- if (j instanceof FileWrapperJournal)
- j.stop();
+ for (Journal j : journals)
+ {
+ if (j instanceof FileWrapperJournal)
+ j.stop();
+ }
}
- }
- pageManager.stop();
+ pageManager.stop();
- // Storage needs to be the last to stop
- storage.stop();
+ // Storage needs to be the last to stop
+ storage.stop();
- started = false;
+ started = false;
+ }
}
12 years, 10 months
JBoss hornetq SVN: r12144 - trunk/hornetq-core/src/test/java/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-17 17:47:37 -0500 (Fri, 17 Feb 2012)
New Revision: 12144
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
Adding new method on waitForTopology
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2012-02-17 20:14:46 UTC (rev 12143)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2012-02-17 22:47:37 UTC (rev 12144)
@@ -34,6 +34,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -91,15 +92,20 @@
- protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
+ protected Topology waitForTopology(final HornetQServer server, final int nodes) throws Exception
{
- waitForTopology(server, nodes, WAIT_TIMEOUT);
+ return waitForTopology(server, nodes, -1, WAIT_TIMEOUT);
}
- protected void waitForTopology(final HornetQServer server, final int nodes, final long timeout) throws Exception
+ protected Topology waitForTopology(final HornetQServer server, final int nodes, final int backups) throws Exception
{
- log.debug("waiting for " + nodes + " on the topology for server = " + server);
+ return waitForTopology(server, nodes, backups, WAIT_TIMEOUT);
+ }
+ protected Topology waitForTopology(final HornetQServer server, final int liveNodes, final int backupNodes, final long timeout) throws Exception
+ {
+ log.debug("waiting for " + liveNodes + " on the topology for server = " + server);
+
long start = System.currentTimeMillis();
Set<ClusterConnection> ccs = server.getClusterManager().getClusterConnections();
@@ -109,24 +115,44 @@
throw new IllegalStateException("You need a single cluster connection on this version of waitForTopology on ServiceTestBase");
}
- Topology topology = ccs.iterator().next().getTopology();
+ Topology topology = server.getClusterManager().getDefaultConnection().getTopology();
+ int liveNodesCount = 0;
+
+ int backupNodesCount = 0;
+
+
do
{
- if (nodes == topology.getMembers().size())
+
+ liveNodesCount = 0;
+ backupNodesCount = 0;
+
+ for (TopologyMember member : topology.getMembers())
+ {
+ if (member.getA() != null)
+ {
+ liveNodesCount ++;
+ }
+ if (member.getB() != null)
+ {
+ backupNodesCount ++;
+ }
+ }
+
+ if ((liveNodes == -1 || liveNodes == liveNodesCount) && (backupNodes == -1 || backupNodes == backupNodesCount))
{
- return;
+ return topology;
}
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < timeout);
- String msg = "Timed out waiting for cluster topology of " + nodes +
- " (received " +
- topology.getMembers().size() +
+ String msg = "Timed out waiting for cluster topology of live=" + liveNodes + ",backup=" + backupNodes +
+ " (received live="+ liveNodesCount + ", backup=" + backupNodesCount +
") topology = " +
- topology +
+ topology.describe() +
")";
log.error(msg);
12 years, 10 months
JBoss hornetq SVN: r12143 - in branches/Branch_2_2_AS7: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-02-17 15:14:46 -0500 (Fri, 17 Feb 2012)
New Revision: 12143
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
[HORNETQ-859] Page files not deleted on rollback
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17 20:12:28 UTC (rev 12142)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17 20:14:46 UTC (rev 12143)
@@ -1012,7 +1012,7 @@
public void afterRollback(final Transaction tx)
{
- if (tx.getState() == State.PREPARED && pageTransaction != null)
+ if (pageTransaction != null)
{
pageTransaction.rollback();
}
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17 20:12:28 UTC (rev 12142)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17 20:14:46 UTC (rev 12143)
@@ -127,6 +127,140 @@
super.tearDown();
}
+ public void testPageCleanup() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server =
+ createServer(true, config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfMessages = 5000;
+
+ locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ final int MESSAGE_SIZE = 1024;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+ producer.send(session.createMessage(true));
+ session.rollback();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent (and rolled-back) 1 message.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ ClientMessage msg = null;
+
+ assertEquals(numberOfMessages * 2, queue.getMessageCount());
+ for (int i = 0; i < numberOfMessages * 2; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ //System.out.println("ack " + i);
+
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ consumer.close();
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 10000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+ }
+
public void testPreparePersistent() throws Exception
{
clearData();
12 years, 10 months
JBoss hornetq SVN: r12142 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-02-17 15:12:28 -0500 (Fri, 17 Feb 2012)
New Revision: 12142
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
[HORNETQ-859] Page files not deleted on rollback
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17 18:45:25 UTC (rev 12141)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17 20:12:28 UTC (rev 12142)
@@ -1012,7 +1012,7 @@
public void afterRollback(final Transaction tx)
{
- if (tx.getState() == State.PREPARED && pageTransaction != null)
+ if (pageTransaction != null)
{
pageTransaction.rollback();
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17 18:45:25 UTC (rev 12141)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17 20:12:28 UTC (rev 12142)
@@ -127,6 +127,140 @@
super.tearDown();
}
+ public void testPageCleanup() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server =
+ createServer(true, config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfMessages = 5000;
+
+ locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ final int MESSAGE_SIZE = 1024;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+ producer.send(session.createMessage(true));
+ session.rollback();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent (and rolled-back) 1 message.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ ClientMessage msg = null;
+
+ assertEquals(numberOfMessages * 2, queue.getMessageCount());
+ for (int i = 0; i < numberOfMessages * 2; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ //System.out.println("ack " + i);
+
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ consumer.close();
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 10000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+ }
+
public void testPreparePersistent() throws Exception
{
clearData();
12 years, 10 months
JBoss hornetq SVN: r12141 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: jbertram
Date: 2012-02-17 13:45:25 -0500 (Fri, 17 Feb 2012)
New Revision: 12141
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
Log:
[HORNETQ-859] Page files not deleted on rollback
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17 17:29:49 UTC (rev 12140)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-02-17 18:45:25 UTC (rev 12141)
@@ -1025,7 +1025,7 @@
public void afterRollback(final Transaction tx)
{
- if (tx.getState() == State.PREPARED && pageTransaction != null)
+ if (pageTransaction != null)
{
pageTransaction.rollback();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17 17:29:49 UTC (rev 12140)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2012-02-17 18:45:25 UTC (rev 12141)
@@ -119,6 +119,138 @@
locator = createInVMNonHALocator();
}
+ public void testPageCleanup() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ server =
+ createServer(true, config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfMessages = 5000;
+
+ locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+ producer.send(session.createMessage(true));
+ session.rollback();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent (and rolled-back) 1 message.");
+
+ session = sf.createSession(false, false, false);
+ producer = session.createProducer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ producer.close();
+ session.close();
+ //System.out.println("Just sent " + numberOfMessages + " messages.");
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ ClientMessage msg = null;
+
+ assertEquals(numberOfMessages * 2, queue.getMessageCount());
+ for (int i = 0; i < numberOfMessages * 2; i++)
+ {
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ //System.out.println("ack " + i);
+
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ consumer.close();
+ session.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 10000;
+ while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+ }
+
public void testPreparePersistent() throws Exception
{
clearData();
12 years, 10 months
JBoss hornetq SVN: r12140 - trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2012-02-17 12:29:49 -0500 (Fri, 17 Feb 2012)
New Revision: 12140
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
just docs
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-02-17 15:48:06 UTC (rev 12139)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-02-17 17:29:49 UTC (rev 12140)
@@ -3360,6 +3360,11 @@
}
+ /** This is only used when loading a transaction
+ it might be possible to merge the functionality of this class with {@link PagingStoreImpl.FinishPageMessageOperation}
+
+ */
+ // TODO: merge this class with the one on the PagingStoreImpl
private static class FinishPageMessageOperation implements TransactionOperation
{
12 years, 10 months
JBoss hornetq SVN: r12139 - trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-17 10:48:06 -0500 (Fri, 17 Feb 2012)
New Revision: 12139
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
No point in using ConcurrentHashSet if all accesses are guarded by 'synchronized'.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2012-02-17 15:39:03 UTC (rev 12138)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionImpl.java 2012-02-17 15:48:06 UTC (rev 12139)
@@ -84,7 +84,6 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.IDGenerator;
import org.hornetq.utils.SimpleIDGenerator;
import org.hornetq.utils.TokenBucketLimiterImpl;
@@ -128,19 +127,20 @@
private final boolean xa;
private final Executor executor;
-
+
// to be sent to consumers as consumers will need a separate consumer for flow control
private final Executor flowControlExecutor;
private volatile CoreRemotingConnection remotingConnection;
- private final Set<ClientProducerInternal> producers = new ConcurrentHashSet<ClientProducerInternal>();
+ /** All access to producers are guarded (i.e. synchronized) on itself. */
+ private final Set<ClientProducerInternal> producers = new HashSet<ClientProducerInternal>();
// Consumers must be an ordered map so if we fail we recreate them in the same order with the same ids
private final Map<Long, ClientConsumerInternal> consumers = new LinkedHashMap<Long, ClientConsumerInternal>();
private volatile boolean closed;
-
+
private volatile boolean closing;
private final boolean autoCommitAcks;
@@ -244,7 +244,7 @@
this.remotingConnection = remotingConnection;
this.executor = executor;
-
+
this.flowControlExecutor = flowControlExecutor;
this.xa = xa;
@@ -292,7 +292,7 @@
// ClientSession implementation
// -----------------------------------------------------------------
-
+
public Channel getChannel()
{
return channel;
@@ -536,7 +536,7 @@
{
log.trace("Sending commit");
}
-
+
if (rollbackOnly)
{
rollbackOnFailover();
@@ -592,7 +592,7 @@
stop();
}
-
+
// We need to make sure we don't get any inflight messages
for (ClientConsumerInternal consumer : cloneConsumers())
{
@@ -883,7 +883,7 @@
log.debug("Session was already closed, giving up now, this=" + this);
return;
}
-
+
if (log.isDebugEnabled())
{
log.debug("Calling close on session " + this);
@@ -1146,12 +1146,12 @@
}
HashMap<String, String> metaDataToSend;
-
+
synchronized (metadata)
{
metaDataToSend = new HashMap<String, String>(metadata);
}
-
+
// Resetting the metadata after failover
for (Map.Entry<String, String> entries : metaDataToSend.entrySet())
{
@@ -1176,7 +1176,7 @@
}
channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
}
-
+
public void addUniqueMetaData(String key, String data) throws HornetQException
{
channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data));
@@ -1280,7 +1280,7 @@
sendAckHandler.sendAcknowledged(scm.getMessage());
}
}
-
+
}
// XAResource implementation
@@ -1695,7 +1695,7 @@
{
return remotingConnection;
}
-
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@@ -1880,7 +1880,7 @@
throw new HornetQException(HornetQException.OBJECT_CLOSED, "Session is closed");
}
}
-
+
private ClassLoader lookupTCCL()
{
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
@@ -1892,7 +1892,7 @@
});
}
-
+
/**
* @param consumerID
* @return
@@ -1912,7 +1912,7 @@
{
remotingConnection.removeFailureListener(this);
}
-
+
if (log.isDebugEnabled())
{
log.debug("calling cleanup on " + this);
@@ -1923,7 +1923,7 @@
closed = true;
channel.close();
-
+
// if the server is sending a disconnect
// any pending blocked operation could hang without this
channel.returnBlocking();
@@ -1955,7 +1955,7 @@
private Set<ClientProducerInternal> cloneProducers()
{
Set<ClientProducerInternal> producersClone;
-
+
synchronized (producers)
{
producersClone = new HashSet<ClientProducerInternal>(producers);
12 years, 10 months
JBoss hornetq SVN: r12138 - in trunk/hornetq-core/src: main/resources/schema and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2012-02-17 10:39:03 -0500 (Fri, 17 Feb 2012)
New Revision: 12138
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd
trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java
trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
Log:
HORNETQ-855 Add missing conf file options present 2.2 and test them.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2012-02-17 15:31:33 UTC (rev 12137)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2012-02-17 15:39:03 UTC (rev 12138)
@@ -28,7 +28,13 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.config.*;
+import org.hornetq.core.config.BridgeConfiguration;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.ClusterConnectionConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.ConnectorServiceConfiguration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.config.impl.Validators;
Modified: trunk/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd
===================================================================
--- trunk/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd 2012-02-17 15:31:33 UTC (rev 12137)
+++ trunk/hornetq-core/src/main/resources/schema/hornetq-configuration.xsd 2012-02-17 15:39:03 UTC (rev 12138)
@@ -146,12 +146,10 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="create-bindings-dir" type="xsd:boolean">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="page-max-concurrent-io" type="xsd:int">
- </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="page-max-concurrent-io" type="xsd:int"/>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-directory" type="xsd:string">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="create-journal-dir" type="xsd:boolean">
- </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="create-journal-dir" type="xsd:boolean"/>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-type" type="journalType">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-buffer-timeout" type="xsd:long">
@@ -316,13 +314,19 @@
<xsd:element maxOccurs="1" minOccurs="0" name="transformer-class-name" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="min-large-message-size" type="xsd:int">
- </xsd:element>
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="check-period" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="retry-interval" type="xsd:long">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="retry-interval-multiplier" type="xsd:double">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="reconnect-attempts" type="xsd:int">
+ <xsd:element maxOccurs="1" minOccurs="0" name="max-retry-interval" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="reconnect-attempts" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="failover-on-server-shutdown" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="use-duplicate-detection" type="xsd:boolean">
@@ -358,9 +362,22 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="1" name="connector-ref" type="xsd:string">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="min-large-message-size" type="xsd:int"/>
+ <xsd:element maxOccurs="1" minOccurs="0" name="check-period" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="connection-ttl" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="min-large-message-size" type="xsd:int">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="call-timeout" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="retry-interval" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="retry-interval-multiplier" type="xsd:double">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="max-retry-interval" type="xsd:long">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="reconnect-attempts" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="use-duplicate-detection" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="forward-when-no-consumers" type="xsd:boolean">
@@ -369,7 +386,6 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="confirmation-window-size" type="xsd:int">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="call-timeout" type="xsd:long"/>
<xsd:element maxOccurs="1" minOccurs="0" name="call-failover-timeout" type="xsd:long"/>
<xsd:choice>
<xsd:element maxOccurs="1" minOccurs="0" name="static-connectors">
Modified: trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java 2012-02-17 15:31:33 UTC (rev 12137)
+++ trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java 2012-02-17 15:39:03 UTC (rev 12138)
@@ -201,11 +201,14 @@
Assert.assertEquals("bridge1", bc.getName());
Assert.assertEquals("queue1", bc.getQueueName());
Assert.assertEquals("minLargeMessageSize", 4, bc.getMinLargeMessageSize());
+ assertEquals("check-period", 31, bc.getClientFailureCheckPeriod());
+ assertEquals("connection time-to-live", 370, bc.getConnectionTTL());
Assert.assertEquals("bridge-forwarding-address1", bc.getForwardingAddress());
Assert.assertEquals("sku > 1", bc.getFilterString());
Assert.assertEquals("org.foo.BridgeTransformer", bc.getTransformerClassName());
Assert.assertEquals(3, bc.getRetryInterval());
- Assert.assertEquals(0.2, bc.getRetryIntervalMultiplier());
+ Assert.assertEquals(0.2, bc.getRetryIntervalMultiplier(), 0.0001);
+ assertEquals("max retry interval", 10002, bc.getMaxRetryInterval());
Assert.assertEquals(2, bc.getReconnectAttempts());
Assert.assertEquals(true, bc.isUseDuplicateDetection());
Assert.assertEquals("connector1", bc.getStaticConnectors().get(0));
@@ -230,6 +233,8 @@
{
Assert.assertEquals("cluster-connection1", ccc.getName());
Assert.assertEquals("clusterConnectionConf minLargeMessageSize", 321, ccc.getMinLargeMessageSize());
+ assertEquals("check-period", 331, ccc.getClientFailureCheckPeriod());
+ assertEquals("connection time-to-live", 3370, ccc.getConnectionTTL());
Assert.assertEquals("queues1", ccc.getAddress());
Assert.assertEquals(3, ccc.getRetryInterval());
Assert.assertEquals(true, ccc.isDuplicateDetection());
@@ -237,6 +242,9 @@
Assert.assertEquals(1, ccc.getMaxHops());
Assert.assertEquals(123, ccc.getCallTimeout());
Assert.assertEquals(123, ccc.getCallFailoverTimeout());
+ assertEquals("multiplier", 0.25, ccc.getRetryIntervalMultiplier(), 0.00001);
+ assertEquals("max retry interval", 10000, ccc.getMaxRetryInterval());
+ assertEquals(72, ccc.getReconnectAttempts());
Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
Assert.assertEquals(null, ccc.getDiscoveryGroupName());
Modified: trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
===================================================================
--- trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2012-02-17 15:31:33 UTC (rev 12137)
+++ trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2012-02-17 15:39:03 UTC (rev 12138)
@@ -35,8 +35,8 @@
<bindings-directory>somedir</bindings-directory>
<create-bindings-dir>false</create-bindings-dir>
<journal-directory>somedir2</journal-directory>
+ <create-journal-dir>false</create-journal-dir>
<page-max-concurrent-io>17</page-max-concurrent-io>
- <create-journal-dir>false</create-journal-dir>
<journal-type>NIO</journal-type>
<journal-compact-min-files>123</journal-compact-min-files>
<journal-compact-percentage>33</journal-compact-percentage>
@@ -149,8 +149,11 @@
<filter string="sku > 1"/>
<transformer-class-name>org.foo.BridgeTransformer</transformer-class-name>
<min-large-message-size>4</min-large-message-size>
+ <check-period>31</check-period>
+ <connection-ttl>370</connection-ttl>
<retry-interval>3</retry-interval>
<retry-interval-multiplier>0.2</retry-interval-multiplier>
+ <max-retry-interval>10002</max-retry-interval>
<reconnect-attempts>2</reconnect-attempts>
<failover-on-server-shutdown>false</failover-on-server-shutdown>
<use-duplicate-detection>true</use-duplicate-detection>
@@ -166,10 +169,16 @@
</bridges>
<cluster-connections>
<cluster-connection name="cluster-connection1">
- <address>queues1</address>
- <connector-ref>connector1</connector-ref>
- <min-large-message-size>321</min-large-message-size>
- <retry-interval>3</retry-interval>
+ <address>queues1</address>
+ <connector-ref>connector1</connector-ref>
+ <check-period>331</check-period>
+ <connection-ttl>3370</connection-ttl>
+ <min-large-message-size>321</min-large-message-size>
+ <call-timeout>123</call-timeout>
+ <retry-interval>3</retry-interval>
+ <retry-interval-multiplier>0.25</retry-interval-multiplier>
+ <max-retry-interval>10000</max-retry-interval>
+ <reconnect-attempts>72</reconnect-attempts>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
@@ -183,6 +192,7 @@
<cluster-connection name="cluster-connection2">
<address>queues2</address>
<connector-ref>connector2</connector-ref>
+ <call-timeout>456</call-timeout>
<retry-interval>4</retry-interval>
<use-duplicate-detection>false</use-duplicate-detection>
<forward-when-no-consumers>true</forward-when-no-consumers>
12 years, 10 months
JBoss hornetq SVN: r12137 - in trunk: hornetq-core/src/main/java/org/hornetq/core/deployers/impl and 6 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2012-02-17 10:31:33 -0500 (Fri, 17 Feb 2012)
New Revision: 12137
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java
trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java
trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
trunk/hornetq-jms/src/main/resources/schema/hornetq-jms.xsd
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
Log:
HORNETQ-853 - add callFailoverTimeout to cluster connection
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/config/ClusterConnectionConfiguration.java 2012-02-17 15:31:33 UTC (rev 12137)
@@ -48,6 +48,8 @@
private final long callTimeout;
+ private final long callFailoverTimeout;
+
private final boolean duplicateDetection;
private final boolean forwardWhenNoConsumers;
@@ -86,6 +88,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -106,6 +109,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final long callTimeout,
+ final long callFAiloverTimeout,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -125,6 +129,7 @@
this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
this.callTimeout = callTimeout;
+ this.callFailoverTimeout = callFAiloverTimeout;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
discoveryGroupName = null;
this.maxHops = maxHops;
@@ -155,6 +160,7 @@
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
ConfigurationImpl.DEFAULT_CLUSTER_RECONNECT_ATTEMPTS,
HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -174,6 +180,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final long callTimeout,
+ final long callFAiloverTimeout,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
@@ -190,6 +197,7 @@
this.maxRetryInterval = maxRetryInterval;
this.reconnectAttempts = reconnectAttempts;
this.callTimeout = callTimeout;
+ this.callFailoverTimeout = callFAiloverTimeout;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
@@ -254,7 +262,12 @@
{
return callTimeout;
}
-
+
+ public long getCallFailoverTimeout()
+ {
+ return callFailoverTimeout;
+ }
+
public String getConnectorName()
{
return connectorName;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2012-02-17 15:31:33 UTC (rev 12137)
@@ -1024,6 +1024,8 @@
Validators.GT_ZERO);
long callTimeout = XMLConfigurationUtil.getLong(e, "call-timeout", HornetQClient.DEFAULT_CALL_TIMEOUT, Validators.GT_ZERO);
+
+ long callFailoverTimeout = XMLConfigurationUtil.getLong(e, "call-failover-timeout", HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, Validators.MINUS_ONE_OR_GT_ZERO);
double retryIntervalMultiplier = XMLConfigurationUtil.getDouble(e, "retry-interval-multiplier",
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER, Validators.GT_ZERO);
@@ -1082,6 +1084,7 @@
maxRetryInterval,
reconnectAttempts,
callTimeout,
+ callFailoverTimeout,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
@@ -1102,6 +1105,7 @@
maxRetryInterval,
reconnectAttempts,
callTimeout,
+ callFailoverTimeout,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2012-02-17 15:31:33 UTC (rev 12137)
@@ -104,6 +104,8 @@
private final long callTimeout;
+ private final long callFailoverTimeout;
+
private final double retryIntervalMultiplier;
private final long maxRetryInterval;
@@ -168,6 +170,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final long callTimeout,
+ final long callFailoverTimeout,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -242,6 +245,8 @@
this.manager = manager;
this.callTimeout = callTimeout;
+
+ this.callFailoverTimeout = callFailoverTimeout;
this.minLargeMessageSize = minLargeMessageSize;
@@ -280,6 +285,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final long callTimeout,
+ final long callFailoverTimeout,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -325,6 +331,8 @@
this.callTimeout = callTimeout;
+ this.callFailoverTimeout = callFailoverTimeout;
+
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2012-02-17 15:31:33 UTC (rev 12137)
@@ -669,6 +669,7 @@
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
config.getCallTimeout(),
+ config.getCallFailoverTimeout(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
@@ -707,6 +708,7 @@
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
config.getCallTimeout(),
+ config.getCallFailoverTimeout(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
config.getConfirmationWindowSize(),
Modified: trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/hornetq-core/src/test/java/org/hornetq/core/config/impl/FileConfigurationTest.java 2012-02-17 15:31:33 UTC (rev 12137)
@@ -236,6 +236,7 @@
Assert.assertEquals(false, ccc.isForwardWhenNoConsumers());
Assert.assertEquals(1, ccc.getMaxHops());
Assert.assertEquals(123, ccc.getCallTimeout());
+ Assert.assertEquals(123, ccc.getCallFailoverTimeout());
Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
Assert.assertEquals(null, ccc.getDiscoveryGroupName());
@@ -246,6 +247,7 @@
Assert.assertEquals("queues2", ccc.getAddress());
Assert.assertEquals(4, ccc.getRetryInterval());
Assert.assertEquals(456, ccc.getCallTimeout());
+ Assert.assertEquals(456, ccc.getCallFailoverTimeout());
Assert.assertEquals(false, ccc.isDuplicateDetection());
Assert.assertEquals(true, ccc.isForwardWhenNoConsumers());
Assert.assertEquals(2, ccc.getMaxHops());
Modified: trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml
===================================================================
--- trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/hornetq-core/src/test/resources/ConfigurationTest-full-config.xml 2012-02-17 15:31:33 UTC (rev 12137)
@@ -174,6 +174,7 @@
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<call-timeout>123</call-timeout>
+ <call-failover-timeout>123</call-failover-timeout>
<static-connectors>
<connector-ref>connector1</connector-ref>
<connector-ref>connector2</connector-ref>
@@ -187,6 +188,7 @@
<forward-when-no-consumers>true</forward-when-no-consumers>
<max-hops>2</max-hops>
<call-timeout>456</call-timeout>
+ <call-failover-timeout>456</call-failover-timeout>
<discovery-group-ref discovery-group-name="dg1"/>
</cluster-connection>
</cluster-connections>
Modified: trunk/hornetq-jms/src/main/resources/schema/hornetq-jms.xsd
===================================================================
--- trunk/hornetq-jms/src/main/resources/schema/hornetq-jms.xsd 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/hornetq-jms/src/main/resources/schema/hornetq-jms.xsd 2012-02-17 15:31:33 UTC (rev 12137)
@@ -58,6 +58,9 @@
<xsd:element name="call-timeout" type="xsd:long"
maxOccurs="1" minOccurs="0">
</xsd:element>
+ <xsd:element name="call-failover-timeout" type="xsd:long"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
<xsd:element name="consumer-window-size" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2012-02-17 15:31:33 UTC (rev 12137)
@@ -1906,7 +1906,7 @@
retryInterval,
ConfigurationImpl.DEFAULT_CLUSTER_RETRY_INTERVAL_MULTIPLIER,
ConfigurationImpl.DEFAULT_CLUSTER_MAX_RETRY_INTERVAL,
- reconnectAttempts, 1000, true, forwardWhenNoConsumers, maxHops,
+ reconnectAttempts, 1000, 1000, true, forwardWhenNoConsumers, maxHops,
1024, pairs, false);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2012-02-17 15:03:28 UTC (rev 12136)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java 2012-02-17 15:31:33 UTC (rev 12137)
@@ -68,6 +68,7 @@
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
@@ -123,6 +124,7 @@
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
@@ -164,6 +166,7 @@
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
@@ -203,6 +206,7 @@
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
@@ -243,6 +247,7 @@
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
HornetQClient.DEFAULT_CONNECTION_TTL,
HornetQClient.DEFAULT_CALL_TIMEOUT,
+ HornetQClient.DEFAULT_CALL_FAILOVER_TIMEOUT,
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
@@ -618,6 +623,7 @@
final long clientFailureCheckPeriod,
final long connectionTTL,
final long callTimeout,
+ final long callFailoverTimeout,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -657,6 +663,7 @@
Assert.assertEquals(cf.getClientFailureCheckPeriod(), clientFailureCheckPeriod);
Assert.assertEquals(cf.getConnectionTTL(), connectionTTL);
Assert.assertEquals(cf.getCallTimeout(), callTimeout);
+ Assert.assertEquals(cf.getCallFailoverTimeout(), callFailoverTimeout);
Assert.assertEquals(cf.getMinLargeMessageSize(), minLargeMessageSize);
Assert.assertEquals(cf.getConsumerWindowSize(), consumerWindowSize);
Assert.assertEquals(cf.getConsumerMaxRate(), consumerMaxRate);
12 years, 10 months