JBoss hornetq SVN: r10926 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 12:59:31 -0400 (Tue, 05 Jul 2011)
New Revision: 10926
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
Log:
HORNETQ-720 Set 'backup' parameter to true. Fixes topology.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-05 16:45:00 UTC (rev 10925)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-05 16:59:31 UTC (rev 10926)
@@ -163,7 +163,7 @@
throw new RuntimeException(e);
}
server.getClusterManager().notifyNodeUp(msg.getNodeID(),
- getPairForNotification(msg.getConnector(), true), false, true);
+ getPairForNotification(msg.getConnector(), true), true, true);
}
}
13 years, 8 months
JBoss hornetq SVN: r10925 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 12:45:00 -0400 (Tue, 05 Jul 2011)
New Revision: 10925
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
Log:
HORNETQ-720 Notify node up to update topology.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-05 16:10:20 UTC (rev 10924)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-05 16:45:00 UTC (rev 10925)
@@ -143,16 +143,9 @@
{
NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
- Pair<TransportConfiguration, TransportConfiguration> pair;
- if (msg.isBackup())
- {
- pair = new Pair<TransportConfiguration, TransportConfiguration>(null, msg.getConnector());
- }
- else
- {
- pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
- }
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(),
+ getPairForNotification(msg.getConnector(), msg.isBackup()),
+ false, true);
}
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
{
@@ -169,8 +162,21 @@
e.printStackTrace();
throw new RuntimeException(e);
}
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(),
+ getPairForNotification(msg.getConnector(), true), false, true);
}
}
+
+ private
+ Pair<TransportConfiguration, TransportConfiguration>
+ getPairForNotification(TransportConfiguration conn, boolean isBackup)
+ {
+ if (isBackup)
+ {
+ return new Pair<TransportConfiguration, TransportConfiguration>(null, conn);
+ }
+ return new Pair<TransportConfiguration, TransportConfiguration>(conn, null);
+ }
});
@@ -178,12 +184,12 @@
return entry;
}
- public ServerSessionPacketHandler getSessionHandler(final String sessionName)
+ ServerSessionPacketHandler getSessionHandler(final String sessionName)
{
return sessionHandlers.get(sessionName);
}
- public void addSessionHandler(final String name, final ServerSessionPacketHandler handler)
+ void addSessionHandler(final String name, final ServerSessionPacketHandler handler)
{
sessionHandlers.put(name, handler);
}
13 years, 8 months
JBoss hornetq SVN: r10924 - in branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration: replication and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 12:10:20 -0400 (Tue, 05 Jul 2011)
New Revision: 10924
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
Log:
Clean up
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-05 16:09:43 UTC (rev 10923)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-05 16:10:20 UTC (rev 10924)
@@ -59,7 +59,7 @@
private static final Logger log = Logger.getLogger(FailoverTest.class);
private ServerLocator locator;
-
+ private ClientSessionFactoryInternal sf;
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -137,7 +137,7 @@
locator.setAckBatchSize(0);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -210,13 +210,9 @@
public void testNonTransacted() throws Exception
{
- ClientSessionFactoryInternal sf;
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
- sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -270,14 +266,19 @@
Assert.assertEquals(0, sf.numConnections());
}
- public void testConsumeTransacted() throws Exception
+ private void createSessionFactory() throws Exception
{
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ }
+ public void testConsumeTransacted() throws Exception
+ {
+ createSessionFactory();
+
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -362,7 +363,7 @@
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
// Crash live server
crash();
@@ -414,12 +415,8 @@
public void testTransactedMessagesSentSoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -477,12 +474,8 @@
*/
public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -548,12 +541,8 @@
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -620,12 +609,8 @@
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -700,12 +685,8 @@
public void testTransactedMessagesConsumedSoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -774,12 +755,8 @@
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -860,12 +837,8 @@
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -921,12 +894,8 @@
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -985,12 +954,8 @@
// This might happen if 1PC optimisation kicks in
public void testXAMessagesSentSoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1048,12 +1013,8 @@
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1128,12 +1089,8 @@
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1204,12 +1161,8 @@
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1283,11 +1236,7 @@
// 1PC optimisation
public void testXAMessagesConsumedSoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
-
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1366,7 +1315,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sendAndConsume(sf, true);
@@ -1391,12 +1340,8 @@
public void testFailoverMultipleSessionsWithConsumers() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
final int numSessions = 5;
final int numConsumersPerSession = 5;
@@ -1488,11 +1433,7 @@
*/
public void testFailWithBrowser() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
-
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ createSessionFactory();
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1558,12 +1499,8 @@
public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -1635,7 +1572,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = createSession(sf, true, true, 0);
@@ -1738,7 +1675,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = createSession(sf, true, true, 0);
@@ -1800,7 +1737,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
// Add an interceptor to delay the send method so we can get time to cause failover before it returns
@@ -1864,7 +1801,7 @@
locator.setReconnectAttempts(-1);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = createSession(sf, false, false);
@@ -2021,7 +1958,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = createSession(sf, false, false);
@@ -2148,7 +2085,7 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -2202,7 +2139,7 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -2258,7 +2195,7 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -2330,7 +2267,7 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 16:09:43 UTC (rev 10923)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 16:10:20 UTC (rev 10924)
@@ -286,57 +286,6 @@
System.out.println("sf.getBackupConnector() = " + sf.getBackupConnector());
}
- protected void waitForBackup(long seconds)
- {
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (!backupServer.isInitialised())
- {
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- //ignore
- }
- if (backupServer.isInitialised())
- {
- break;
- }
- else if (System.currentTimeMillis() > (time + toWait))
- {
- fail("backup server never started");
- }
- }
- }
-
- protected void waitForBackup(long seconds, TestableServer server)
- {
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (!server.isInitialised())
- {
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- //ignore
- }
- if (server.isInitialised())
- {
- break;
- }
- else if (System.currentTimeMillis() > (time + toWait))
- {
- fail("server never started");
- }
- }
- }
-
-
protected TransportConfiguration getInVMConnectorTransportConfiguration(final boolean live)
{
if (live)
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2011-07-05 16:09:43 UTC (rev 10923)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2011-07-05 16:10:20 UTC (rev 10924)
@@ -19,7 +19,13 @@
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
@@ -127,25 +133,25 @@
}
@Override
- protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
{
- Map<String, Object> server1Params = new HashMap<String, Object>();
- if (!live)
- {
- server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
- }
- return new TransportConfiguration(InVMAcceptorFactory.class.getName(), server1Params);
+ return createTransportConfiguration(InVMConnectorFactory.class.getName(), live);
}
@Override
- protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
{
- Map<String, Object> server1Params = new HashMap<String, Object>();
+ return createTransportConfiguration(InVMAcceptorFactory.class.getName(), live);
+ }
+
+ private static TransportConfiguration createTransportConfiguration(String name, final boolean live)
+ {
+ Map<String, Object> serverParams = new HashMap<String, Object>();
if (!live)
{
- server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ serverParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
}
- return new TransportConfiguration(InVMConnectorFactory.class.getName(), server1Params);
+ return new TransportConfiguration(name, serverParams);
}
}
13 years, 8 months
JBoss hornetq SVN: r10923 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 12:09:43 -0400 (Tue, 05 Jul 2011)
New Revision: 10923
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/config/Configuration.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Add ClusterConnectionConfiguration's to backup and live confs
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/config/Configuration.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/config/Configuration.java 2011-07-05 16:08:54 UTC (rev 10922)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/config/Configuration.java 2011-07-05 16:09:43 UTC (rev 10923)
@@ -341,6 +341,9 @@
/**
* Returns the cluster connections configured for this server.
+ * <p>
+ * Modifying the returned list will modify the list of {@link ClusterConnectionConfiguration}
+ * used by this configuration.
*/
List<ClusterConnectionConfiguration> getClusterConfigurations();
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 16:08:54 UTC (rev 10922)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 16:09:43 UTC (rev 10923)
@@ -121,6 +121,16 @@
return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
}
+ private ClusterConnectionConfiguration createClusterConnectionConf(String name, String... connectors)
+ {
+ List<String> conn = new ArrayList<String>(connectors.length);
+ for (String iConn : connectors)
+ {
+ conn.add(iConn);
+ }
+ return new ClusterConnectionConfiguration("cluster1", "jms", name, -1, false, false, 1, 1, conn, false);
+ }
+
/**
* @throws Exception
*/
@@ -139,11 +149,8 @@
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
- ArrayList<String> staticConnectors = new ArrayList<String>();
- staticConnectors.add(liveConnector.getName());
- ClusterConnectionConfiguration cccLive = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
- staticConnectors, false);
- backupConfig.getClusterConfigurations().add(cccLive);
+ backupConfig.getClusterConfigurations().add(createClusterConnectionConf(backupConnector.getName(),
+ liveConnector.getName()));
backupServer = createBackupServer();
liveConfig = super.createDefaultConfig();
@@ -152,10 +159,7 @@
liveConfig.setSecurityEnabled(false);
liveConfig.setSharedStore(true);
liveConfig.setClustered(true);
- List<String> pairs = null;
- ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
- pairs, false);
- liveConfig.getClusterConfigurations().add(ccc0);
+ liveConfig.getClusterConfigurations().add(createClusterConnectionConf(liveConnector.getName()));
liveConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
liveServer = createLiveServer();
}
@@ -164,34 +168,39 @@
{
nodeManager = new InVMNodeManager();
- Configuration config1 = super.createDefaultConfig();
- config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
- config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
- config1.setPagingDirectory(config1.getPagingDirectory() + "_backup");
- config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_backup");
- config1.getAcceptorConfigurations().clear();
- config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
- config1.getConnectorConfigurations().put(LIVE_NODE_NAME, getConnectorTransportConfiguration(true));
+ backupConfig = super.createDefaultConfig();
+ backupConfig.setBindingsDirectory(backupConfig.getBindingsDirectory() + "_backup");
+ backupConfig.setJournalDirectory(backupConfig.getJournalDirectory() + "_backup");
+ backupConfig.setPagingDirectory(backupConfig.getPagingDirectory() + "_backup");
+ backupConfig.setLargeMessagesDirectory(backupConfig.getLargeMessagesDirectory() + "_backup");
+ backupConfig.getAcceptorConfigurations().clear();
+ backupConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+ TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+ TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+ // probably not necessary...
+ backupConfig.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+ backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, liveConnector);
+ backupConfig.getClusterConfigurations()
+ .add(createClusterConnectionConf(backupConnector.getName(), LIVE_NODE_NAME));
+ backupConfig.getConnectorConfigurations().put(LIVE_NODE_NAME, getConnectorTransportConfiguration(true));
- //liveConfig.setBackupConnectorName("toBackup");
- config1.setSecurityEnabled(false);
- config1.setSharedStore(false);
- config1.setBackup(true);
- config1.setLiveConnectorName(LIVE_NODE_NAME);
- config1.setClustered(true);
- backupConfig = config1;
+ backupConfig.setSecurityEnabled(false);
+ backupConfig.setSharedStore(false);
+ backupConfig.setBackup(true);
+ backupConfig.setLiveConnectorName(LIVE_NODE_NAME);
+ backupConfig.setClustered(true);
backupServer = createBackupServer();
backupServer.getServer().setIdentity("id_backup");
- Configuration config0 = super.createDefaultConfig();
- config0.getAcceptorConfigurations().clear();
- config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
- config0.setName(LIVE_NODE_NAME);
- config0.setSecurityEnabled(false);
- config0.setSharedStore(false);
- config0.setClustered(true);
- liveConfig = config0;
+ liveConfig = super.createDefaultConfig();
+ liveConfig.getAcceptorConfigurations().clear();
+ liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
+ liveConfig.setName(LIVE_NODE_NAME);
+ liveConfig.setSecurityEnabled(false);
+ liveConfig.setSharedStore(false);
+ liveConfig.setClustered(true);
+ liveConfig.getClusterConfigurations().add(createClusterConnectionConf(liveConnector.getName()));
liveServer = createLiveServer();
liveServer.getServer().setIdentity("id_live");
@@ -203,15 +212,12 @@
protected void tearDown() throws Exception
{
backupServer.stop();
-
liveServer.stop();
Assert.assertEquals(0, InVMRegistry.instance.size());
backupServer = null;
-
liveServer = null;
-
nodeManager = null;
InVMConnector.failOnCreateConnection = false;
13 years, 8 months
JBoss hornetq SVN: r10922 - branches/HORNETQ-720_Replication/etc.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 12:08:54 -0400 (Tue, 05 Jul 2011)
New Revision: 10922
Modified:
branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
Log:
Adjust eclipse configuration
Modified: branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs
===================================================================
--- branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-07-05 14:57:35 UTC (rev 10921)
+++ branches/HORNETQ-720_Replication/etc/org.eclipse.jdt.core.prefs 2011-07-05 16:08:54 UTC (rev 10922)
@@ -1,4 +1,4 @@
-#Fri Jul 01 11:51:40 CEST 2011
+#Tue Jul 05 17:04:57 CEST 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.codeComplete.argumentPrefixes=
org.eclipse.jdt.core.codeComplete.argumentSuffixes=
@@ -22,10 +22,10 @@
org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning
org.eclipse.jdt.core.compiler.source=1.6
org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=82
+org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=18
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=16
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=82
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=82
+org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=18
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=18
org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
org.eclipse.jdt.core.formatter.alignment_for_assignment=16
13 years, 8 months
JBoss hornetq SVN: r10921 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 10:57:35 -0400 (Tue, 05 Jul 2011)
New Revision: 10921
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java
Log:
HORNETQ-720 Move nodeManager set-up to createReplicatedConfigs()
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 14:56:51 UTC (rev 10920)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 14:57:35 UTC (rev 10921)
@@ -162,6 +162,8 @@
protected void createReplicatedConfigs() throws Exception
{
+ nodeManager = new InVMNodeManager();
+
Configuration config1 = super.createDefaultConfig();
config1.setBindingsDirectory(config1.getBindingsDirectory() + "_backup");
config1.setJournalDirectory(config1.getJournalDirectory() + "_backup");
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java 2011-07-05 14:56:51 UTC (rev 10920)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java 2011-07-05 14:57:35 UTC (rev 10921)
@@ -13,7 +13,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.server.impl.InVMNodeManager;
/**
* A ReplicatedFailoverTest
@@ -40,20 +39,8 @@
// Protected -----------------------------------------------------
@Override
- protected void setUp() throws Exception
- {
- nodeManager = new InVMNodeManager();
- super.setUp();
- }
-
- @Override
protected void createConfigs() throws Exception
{
createReplicatedConfigs();
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
13 years, 8 months
JBoss hornetq SVN: r10920 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 10:56:51 -0400 (Tue, 05 Jul 2011)
New Revision: 10920
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
Check for desired result (receivedTopology) and not for control variable (toWait <=0)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-05 14:56:11 UTC (rev 10919)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-05 14:56:51 UTC (rev 10920)
@@ -663,7 +663,7 @@
start = now;
}
- if (toWait <= 0)
+ if (!receivedTopology)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology");
13 years, 8 months
JBoss hornetq SVN: r10919 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 10:56:11 -0400 (Tue, 05 Jul 2011)
New Revision: 10919
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Set clustered=true on each node configuration.
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-05 14:51:49 UTC (rev 10918)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-05 14:56:11 UTC (rev 10919)
@@ -48,9 +48,9 @@
import org.hornetq.tests.util.RandomUtil;
/**
- *
+ *
* A FailoverTest
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*/
@@ -2424,7 +2424,7 @@
/**
* @param i
* @param message
- * @throws Exception
+ * @throws Exception
*/
protected void setBody(final int i, final ClientMessage message) throws Exception
{
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 14:51:49 UTC (rev 10918)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-05 14:56:11 UTC (rev 10919)
@@ -176,6 +176,7 @@
config1.setSharedStore(false);
config1.setBackup(true);
config1.setLiveConnectorName(LIVE_NODE_NAME);
+ config1.setClustered(true);
backupConfig = config1;
backupServer = createBackupServer();
@@ -187,6 +188,7 @@
config0.setName(LIVE_NODE_NAME);
config0.setSecurityEnabled(false);
config0.setSharedStore(false);
+ config0.setClustered(true);
liveConfig = config0;
liveServer = createLiveServer();
liveServer.getServer().setIdentity("id_live");
@@ -245,8 +247,7 @@
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
- assertTrue(ok);
+ assertTrue("topology members expected " + topologyMembers, countDownLatch.await(5, TimeUnit.SECONDS));
return sf;
}
13 years, 8 months
JBoss hornetq SVN: r10918 - branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: gurkapa
Date: 2011-07-05 10:51:49 -0400 (Tue, 05 Jul 2011)
New Revision: 10918
Modified:
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java
Log:
HORNETQ-553 - work in progress but adding code for new ACK header, subscription; some interface changes not yet implemented for 1.1 and almost finished code for new command NACK
commiting before completed and tested since my computer is acting up and I don't want to lose anything.
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-07-05 14:08:46 UTC (rev 10917)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-07-05 14:51:49 UTC (rev 10918)
@@ -62,6 +62,8 @@
String ABORT = "ABORT";
String ACK = "ACK";
+
+ String NACK = "NACK";
}
public interface Responses
@@ -153,6 +155,8 @@
String AUTO = "auto";
String CLIENT = "client";
+
+ String CLIENT_INDIVIDUAL = "client-individual";
}
}
@@ -201,6 +205,15 @@
public interface Ack
{
String MESSAGE_ID = "message-id";
+
+ String SUBSCRIPTION = "subscription";
}
+
+ public interface Nack
+ {
+ String MESSAGE_ID = "message-id";
+
+ String SUBSCRIPTION = "subscription";
+ }
}
}
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-07-05 14:08:46 UTC (rev 10917)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-07-05 14:51:49 UTC (rev 10918)
@@ -205,6 +205,10 @@
{
response = onAck(request, conn);
}
+ else if (Stomp.Commands.NACK.equals(command))
+ {
+ response = onNack(request, conn);
+ }
else if (Stomp.Commands.BEGIN.equals(command))
{
response = onBegin(request, server, conn);
@@ -336,7 +340,12 @@
}
else
{
- if (destination == null)
+ if (Stomp.Versions.V11.equals(connection.getVersion()))
+ {
+ // Subscription id is mandatory in version 1.1 of STOMP
+ throw new StompException("Client must set id header to a SUBSCRIBE command");
+ }
+ else if (destination == null)
{
throw new StompException("Client must set destination or id header to a SUBSCRIBE command");
}
@@ -375,8 +384,13 @@
}
else
{
- if (destination == null)
+ if (Stomp.Versions.V11.equals(connection.getVersion()))
{
+ // Subscription id is mandatory in version 1.1 of STOMP
+ throw new StompException("Must specify the subscription's id you are unsubscribing from");
+ }
+ else if (destination == null)
+ {
throw new StompException("Must specify the subscription's id or the destination you are unsubscribing from");
}
subscriptionID = "subscription/" + destination;
@@ -397,16 +411,56 @@
String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
StompSession stompSession = null;
+ stompSession = getSession(connection);
+ if (connection.getVersion() == Stomp.Versions.V11)
+ {
+ String subscriptionID = (String)headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
+ if (subscriptionID == null)
+ {
+ throw new StompException("Subscription header is mandatory in ACK command when using STOMP 1.1");
+ }
+ if (!stompSession.containsSubscription(subscriptionID))
+ {
+ throw new StompException("No subscription with the given id was found in this session");
+ }
+ }
if (txID != null)
{
log.warn("Transactional acknowledgement is not supported");
}
- stompSession = getSession(connection);
stompSession.acknowledge(messageID);
return null;
}
+ private StompFrame onNack(StompFrame frame, StompConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String messageID = (String)headers.get(Stomp.Headers.Nack.MESSAGE_ID);
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+ StompSession stompSession = null;
+ stompSession = getSession(connection);
+ if (connection.getVersion() == Stomp.Versions.V11)
+ {
+ String subscriptionID = (String)headers.get(Stomp.Headers.Nack.SUBSCRIPTION);
+ if (subscriptionID == null)
+ {
+ throw new StompException("Subscription header is mandatory in NACK command");
+ }
+ if (!stompSession.containsSubscription(subscriptionID))
+ {
+ throw new StompException("No subscription with the given id was found in this session");
+ }
+ }
+ if (txID != null)
+ {
+ log.warn("Transactional acknowledgement is not supported");
+ }
+ stompSession.nacknowledge(messageID);
+
+ return null;
+ }
+
private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-07-05 14:08:46 UTC (rev 10917)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2011-07-05 14:51:49 UTC (rev 10918)
@@ -176,6 +176,19 @@
session.commit();
}
+ public void nacknowledge(String messageID) throws Exception
+ {
+ long id = Long.parseLong(messageID);
+ long consumerID = messagesToAck.remove(id);
+ StompSubscription subscription = subscriptions.get(consumerID);
+ boolean nackAllNonAckedMessages = true;
+ if (subscription.getAck() == Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)
+ {
+ nackAllNonAckedMessages = false;
+ }
+
+ }
+
public void addSubscription(long consumerID,
String subscriptionID,
String clientID,
13 years, 8 months
JBoss hornetq SVN: r10917 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/api/core/client and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-05 10:08:46 -0400 (Tue, 05 Jul 2011)
New Revision: 10917
Modified:
branches/HORNETQ-720_Replication/
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
merge from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-10906
+ /trunk:10878-10916
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ClientMessage.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -20,9 +20,9 @@
import org.hornetq.api.core.Message;
/**
- *
+ *
* A ClientMessage represents a message sent and/or received by HornetQ.
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
@@ -36,22 +36,20 @@
/**
* Set the delivery count for this message.
- *
+ *
* This method is not meant to be called by HornetQ clients.
- *
+ *
* @param deliveryCount message delivery count
*/
void setDeliveryCount(int deliveryCount);
/**
* Acknowledge reception of this message.
- *
- * If the session responsible to acknowledge this message has {@code autoCommitAcks}
- * set to {@code true}, the transaction will automatically commit the current transaction.
- * Otherwise, this acknwoledgement will not be committed until the client commits the session transaction.
- *
- * @throws HornetQException if an error occured while acknowledging the message.
- *
+ * <p>
+ * If the session responsible to acknowledge this message has {@code autoCommitAcks} set to
+ * {@code true}, the transaction will automatically commit the current transaction. Otherwise,
+ * this acknowledgement will not be committed until the client commits the session transaction.
+ * @throws HornetQException if an error occurred while acknowledging the message.
* @see ClientSession#isAutoCommitAcks()
*/
void acknowledge() throws HornetQException;
@@ -61,20 +59,20 @@
*/
int getBodySize();
- /**
+ /**
* Sets the OutputStream that will receive the content of a message received in a non blocking way.
- * <br>
+ * <br>
* This method is used when consuming large messages
- *
+ *
* @throws HornetQException
*/
void setOutputStream(OutputStream out) throws HornetQException;
- /**
+ /**
* Saves the content of the message to the OutputStream.
* It will block until the entire content is transfered to the OutputStream.
- * <br>
- *
+ * <br>
+ *
* @throws HornetQException
*/
void saveToOutputStream(OutputStream out) throws HornetQException;
@@ -83,18 +81,18 @@
* Wait the outputStream completion of the message.
*
* This method is used when consuming large messages
- *
+ *
* @param timeMilliseconds - 0 means wait forever
* @return true if it reached the end
* @throws HornetQException
*/
boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
- /**
- * Sets the body's IntputStream.
- * <br>
+ /**
+ * Sets the body's IntputStream.
+ * <br>
* This method is used when sending large messages
- *
+ *
* @throws HornetQException
*/
void setBodyInputStream(InputStream bodyInputStream);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/SendAcknowledgementHandler.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -16,20 +16,20 @@
import org.hornetq.api.core.Message;
/**
- * A SendAcknowledgementHandler notifies a client when an message sent asynchronously has been received by the server.
- * <br />
- * If the session is not blocking when sending durable or non-durable messages, the session can
- * set a SendAcknowledgementHandler to be notified later when the messages
- * has been received by the server. The method {@code sendAcknowledged} will be called with the message that
- * was sent asynchronously.
- *
+ * A SendAcknowledgementHandler notifies a client when an message sent asynchronously has been
+ * received by the server.
+ * <p>
+ * If the session is not blocking when sending durable or non-durable messages, the session can set
+ * a SendAcknowledgementHandler to be notified later when the messages has been received by the
+ * server. The method {@link #sendAcknowledged(Message)} will be called with the message that was
+ * sent asynchronously.
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public interface SendAcknowledgementHandler
{
/**
* Notifies the client that a message sent asynchronously has been received by the server.
- *
+ *
* @param message message sent asynchronously
*/
void sendAcknowledged(Message message);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/client/ServerLocator.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -31,19 +31,19 @@
* This method will disable any checks when a GarbageCollection happens
* leaving connections open. The JMS Layer will make specific usage of this
* method, since the ConnectionFactory.finalize should release this.
- *
+ *
* Warning: You may leave resources unattended if you call this method and
* don't take care of cleaning the resources yourself.
*/
void disableFinalizeCheck();
-
+
/**
* Create a ClientSessionFactory using whatever load balancing policy is in force
* @return The ClientSessionFactory
* @throws Exception
*/
ClientSessionFactory createSessionFactory() throws Exception;
-
+
/**
* Create a ClientSessionFactory to a specific server. The server must already be known about by this ServerLocator.
* This method allows the user to make a connection to a specific server bypassing any load balancing policy in force
@@ -52,39 +52,39 @@
* @throws Exception if a failure happened in creating the ClientSessionFactory or the ServerLocator does not know about the passed in transportConfiguration
*/
ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception;
-
+
/**
* Returns the period used to check if a client has failed to receive pings from the server.
- *
+ *
* Period is in milliseconds, default value is {@link HornetQClient#DEFAULT_CLIENT_FAILURE_CHECK_PERIOD}.
- *
+ *
* @return the period used to check if a client has failed to receive pings from the server
*/
long getClientFailureCheckPeriod();
/**
* Sets the period (in milliseconds) used to check if a client has failed to receive pings from the server.
- *
+ *
* Value must be -1 (to disable) or greater than 0.
- *
+ *
* @param clientFailureCheckPeriod the period to check failure
*/
void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
/**
* When <code>true</code>, consumers created through this factory will create temporary files to cache large messages.
- *
+ *
* There is 1 temporary file created for each large message.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_CACHE_LARGE_MESSAGE_CLIENT}.
- *
+ *
* @return <code>true</code> if consumers created through this factory will cache large messages in temporary files, <code>false</code> else
*/
boolean isCacheLargeMessagesClient();
/**
* Sets whether large messages received by consumers created through this factory will be cached in temporary files or not.
- *
+ *
* @param cached <code>true</code> to cache large messages in temporary files, <code>false</code> else
*/
void setCacheLargeMessagesClient(boolean cached);
@@ -92,73 +92,73 @@
/**
* Returns the connection <em>time-to-live</em>.
* This TTL determines how long the server will keep a connection alive in the absence of any data arriving from the client.
- *
+ *
* Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_CONNECTION_TTL}.
- *
+ *
* @return the connection time-to-live in milliseconds
*/
long getConnectionTTL();
/**
* Sets this factory's connections <em>time-to-live</em>.
- *
+ *
* Value must be -1 (to disable) or greater or equals to 0.
- *
+ *
* @param connectionTTL period in milliseconds
*/
void setConnectionTTL(long connectionTTL);
/**
* Returns the blocking calls timeout.
- *
+ *
* If client's blocking calls to the server take more than this timeout, the call will throw a {@link HornetQException} with the code {@link HornetQException#CONNECTION_TIMEDOUT}.
* Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_CALL_TIMEOUT}.
- *
+ *
* @return the blocking calls timeout
*/
long getCallTimeout();
/**
* Sets the blocking call timeout.
- *
+ *
* Value must be greater or equals to 0
- *
+ *
* @param callTimeout blocking call timeout in milliseconds
*/
void setCallTimeout(long callTimeout);
/**
* Returns the large message size threshold.
- *
+ *
* Messages whose size is if greater than this value will be handled as <em>large messages</em>.
- *
+ *
* Value is in bytes, default value is {@link HornetQClient#DEFAULT_MIN_LARGE_MESSAGE_SIZE}.
- *
+ *
* @return the message size threshold to treat messages as large messages.
*/
int getMinLargeMessageSize();
/**
* Sets the large message size threshold.
- *
+ *
* Value must be greater than 0.
- *
+ *
* @param minLargeMessageSize large message size threshold in bytes
*/
void setMinLargeMessageSize(int minLargeMessageSize);
/**
* Returns the window size for flow control of the consumers created through this factory.
- *
+ *
* Value is in bytes, default value is {@link HornetQClient#DEFAULT_CONSUMER_WINDOW_SIZE}.
- *
+ *
* @return the window size used for consumer flow control
*/
int getConsumerWindowSize();
/**
* Sets the window size for flow control of the consumers created through this factory.
- *
+ *
* Value must be -1 (to disable flow control), 0 (to not buffer any messages) or greater than 0 (to set the maximum size of the buffer)
*
* @param consumerWindowSize window size (in bytes) used for consumer flow control
@@ -167,37 +167,37 @@
/**
* Returns the maximum rate of message consumption for consumers created through this factory.
- *
+ *
* This value controls the rate at which a consumer can consume messages. A consumer will never consume messages at a rate faster than the rate specified.
- *
+ *
* Value is -1 (to disable) or a positive integer corresponding to the maximum desired message consumption rate specified in units of messages per second.
* Default value is {@link HornetQClient#DEFAULT_CONSUMER_MAX_RATE}.
- *
+ *
* @return the consumer max rate
*/
int getConsumerMaxRate();
/**
* Sets the maximum rate of message consumption for consumers created through this factory.
- *
+ *
* Value must -1 (to disable) or a positive integer corresponding to the maximum desired message consumption rate specified in units of messages per second.
- *
+ *
* @param consumerMaxRate maximum rate of message consumption (in messages per seconds)
*/
void setConsumerMaxRate(int consumerMaxRate);
/**
* Returns the size for the confirmation window of clients using this factory.
- *
+ *
* Value is in bytes or -1 (to disable the window). Default value is {@link HornetQClient#DEFAULT_CONFIRMATION_WINDOW_SIZE}.
- *
+ *
* @return the size for the confirmation window of clients using this factory
*/
int getConfirmationWindowSize();
/**
* Sets the size for the confirmation window buffer of clients using this factory.
- *
+ *
* Value must be -1 (to disable the window) or greater than 0.
* @param confirmationWindowSize size of the confirmation window (in bytes)
@@ -206,40 +206,39 @@
/**
* Returns the window size for flow control of the producers created through this factory.
- *
+ *
* Value must be -1 (to disable flow control) or greater than 0 to determine the maximum amount of bytes at any give time (to prevent overloading the connection).
* Default value is {@link HornetQClient#DEFAULT_PRODUCER_WINDOW_SIZE}.
- *
+ *
* @return the window size for flow control of the producers created through this factory.
*/
int getProducerWindowSize();
/**
- * Returns the window size for flow control of the producers created through this factory.
- *
- * Value must be -1 (to disable flow control) or greater than 0.
- *
- * @param producerWindowSize window size (in bytest) for flow control of the producers created through this factory.
+ * Returns the window size for flow control of the producers created through this factory. Value
+ * must be -1 (to disable flow control) or greater than 0.
+ * @param producerWindowSize window size (in bytes) for flow control of the producers created
+ * through this factory.
*/
void setProducerWindowSize(int producerWindowSize);
/**
* Returns the maximum rate of message production for producers created through this factory.
- *
+ *
* This value controls the rate at which a producer can produce messages. A producer will never produce messages at a rate faster than the rate specified.
- *
+ *
* Value is -1 (to disable) or a positive integer corresponding to the maximum desired message production rate specified in units of messages per second.
* Default value is {@link HornetQClient#DEFAULT_PRODUCER_MAX_RATE}.
- *
+ *
* @return maximum rate of message production (in messages per seconds)
*/
int getProducerMaxRate();
/**
* Sets the maximum rate of message production for producers created through this factory.
- *
+ *
* Value must -1 (to disable) or a positive integer corresponding to the maximum desired message production rate specified in units of messages per second.
- *
+ *
* @param producerMaxRate maximum rate of message production (in messages per seconds)
*/
void setProducerMaxRate(int producerMaxRate);
@@ -247,9 +246,9 @@
/**
* Returns whether consumers created through this factory will block while
* sending message acknowledgments or do it asynchronously.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_ACKNOWLEDGE}.
- *
+ *
* @return whether consumers will block while sending message
* acknowledgments or do it asynchronously
*/
@@ -258,7 +257,7 @@
/**
* Sets whether consumers created through this factory will block while
* sending message acknowledgments or do it asynchronously.
- *
+ *
* @param blockOnAcknowledge
* <code>true</code> to block when sending message
* acknowledgments or <code>false</code> to send them
@@ -271,7 +270,7 @@
* <br>
* If the session is configured to send durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
* to be notified once the message has been handled by the server.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_DURABLE_SEND}.
*
* @return whether producers will block while sending persistent messages or do it asynchronously
@@ -280,7 +279,7 @@
/**
* Sets whether producers created through this factory will block while sending <em>durable</em> messages or do it asynchronously.
- *
+ *
* @param blockOnDurableSend <code>true</code> to block when sending durable messages or <code>false</code> to send them asynchronously
*/
void setBlockOnDurableSend(boolean blockOnDurableSend);
@@ -290,7 +289,7 @@
* <br>
* If the session is configured to send non-durable message asynchronously, the client can set a SendAcknowledgementHandler on the ClientSession
* to be notified once the message has been handled by the server.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_BLOCK_ON_NON_DURABLE_SEND}.
*
* @return whether producers will block while sending non-durable messages or do it asynchronously
@@ -299,7 +298,7 @@
/**
* Sets whether producers created through this factory will block while sending <em>non-durable</em> messages or do it asynchronously.
- *
+ *
* @param blockOnNonDurableSend <code>true</code> to block when sending non-durable messages or <code>false</code> to send them asynchronously
*/
void setBlockOnNonDurableSend(boolean blockOnNonDurableSend);
@@ -307,11 +306,11 @@
/**
* Returns whether producers created through this factory will automatically
* assign a group ID to the messages they sent.
- *
+ *
* if <code>true</code>, a random unique group ID is created and set on each message for the property
* {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
* Default value is {@link HornetQClient#DEFAULT_AUTO_GROUP}.
- *
+ *
* @return whether producers will automatically assign a group ID to their messages
*/
boolean isAutoGroup();
@@ -319,23 +318,23 @@
/**
* Sets whether producers created through this factory will automatically
* assign a group ID to the messages they sent.
- *
+ *
* @param autoGroup <code>true</code> to automatically assign a group ID to each messages sent through this factory, <code>false</code> else
*/
void setAutoGroup(boolean autoGroup);
/**
* Returns the group ID that will be eventually set on each message for the property {@link org.hornetq.api.core.Message#HDR_GROUP_ID}.
- *
+ *
* Default value is is <code>null</code> and no group ID will be set on the messages.
- *
+ *
* @return the group ID that will be eventually set on each message
*/
String getGroupID();
-
+
/**
* Sets the group ID that will be set on each message sent through this factory.
- *
+ *
* @param groupID the group ID to use
*/
void setGroupID(String groupID);
@@ -351,7 +350,7 @@
* Sets to <code>true</code> to pre-acknowledge consumed messages on the
* server before they are sent to consumers, else set to <code>false</code>
* to let clients acknowledge the message they consume.
- *
+ *
* @param preAcknowledge
* <code>true</code> to enable pre-acknowledgment,
* <code>false</code> else
@@ -360,18 +359,18 @@
/**
* Returns the acknowledgments batch size.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_ACK_BATCH_SIZE}.
- *
+ *
* @return the acknowledgments batch size
*/
int getAckBatchSize();
/**
* Sets the acknowledgments batch size.
- *
+ *
* Value must be equal or greater than 0.
- *
+ *
* @param ackBatchSize
* acknowledgments batch size
*/
@@ -392,9 +391,9 @@
/**
* Returns whether this factory will use global thread pools (shared among all the factories in the same JVM)
* or its own pools.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_USE_GLOBAL_POOLS}.
- *
+ *
* @return <code>true</code> if this factory uses global thread pools, <code>false</code> else
*/
boolean isUseGlobalPools();
@@ -402,117 +401,117 @@
/**
* Sets whether this factory will use global thread pools (shared among all the factories in the same JVM)
* or its own pools.
- *
+ *
* @param useGlobalPools <code>true</code> to let this factory uses global thread pools, <code>false</code> else
*/
void setUseGlobalPools(boolean useGlobalPools);
/**
* Returns the maximum size of the scheduled thread pool.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE}.
- *
+ *
* @return the maximum size of the scheduled thread pool.
*/
int getScheduledThreadPoolMaxSize();
/**
* Sets the maximum size of the scheduled thread pool.
- *
+ *
* This setting is relevant only if this factory does not use global pools.
* Value must be greater than 0.
- *
+ *
* @param scheduledThreadPoolMaxSize maximum size of the scheduled thread pool.
*/
void setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize);
/**
* Returns the maximum size of the thread pool.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_THREAD_POOL_MAX_SIZE}.
- *
+ *
* @return the maximum size of the thread pool.
*/
int getThreadPoolMaxSize();
/**
* Sets the maximum size of the thread pool.
- *
+ *
* This setting is relevant only if this factory does not use global pools.
* Value must be -1 (for unlimited thread pool) or greater than 0.
- *
+ *
* @param threadPoolMaxSize maximum size of the thread pool.
*/
void setThreadPoolMaxSize(int threadPoolMaxSize);
/**
- * Returns the time to retry connections created by this factory after failure.
- *
+ * Returns the time to retry connections created by this factory after failure.
+ *
* Value is in milliseconds, default is {@link HornetQClient#DEFAULT_RETRY_INTERVAL}.
- *
+ *
* @return the time to retry connections created by this factory after failure
*/
long getRetryInterval();
/**
* Sets the time to retry connections created by this factory after failure.
- *
+ *
* Value must be greater than 0.
- *
- * @param retryInterval time (in milliseconds) to retry connections created by this factory after failure
+ *
+ * @param retryInterval time (in milliseconds) to retry connections created by this factory after failure
*/
void setRetryInterval(long retryInterval);
/**
* Returns the multiplier to apply to successive retry intervals.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_RETRY_INTERVAL_MULTIPLIER}.
- *
+ *
* @return the multiplier to apply to successive retry intervals
*/
double getRetryIntervalMultiplier();
/**
* Sets the multiplier to apply to successive retry intervals.
- *
+ *
* Value must be positive.
- *
+ *
* @param retryIntervalMultiplier multiplier to apply to successive retry intervals
*/
void setRetryIntervalMultiplier(double retryIntervalMultiplier);
/**
* Returns the maximum retry interval (in the case a retry interval multiplier has been specified).
- *
+ *
* Value is in milliseconds, default value is {@link HornetQClient#DEFAULT_MAX_RETRY_INTERVAL}.
- *
+ *
* @return the maximum retry interval
*/
long getMaxRetryInterval();
/**
* Sets the maximum retry interval.
- *
+ *
* Value must be greater than 0.
- *
+ *
* @param maxRetryInterval maximum retry interval to apply in the case a retry interval multiplier has been specified
*/
void setMaxRetryInterval(long maxRetryInterval);
/**
* Returns the maximum number of attempts to retry connection in case of failure.
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_RECONNECT_ATTEMPTS}.
- *
+ *
* @return the maximum number of attempts to retry connection in case of failure.
*/
int getReconnectAttempts();
/**
* Sets the maximum number of attempts to retry connection in case of failure.
- *
+ *
* Value must be -1 (to retry infinitely), 0 (to never retry connection) or greater than 0.
- *
+ *
* @param reconnectAttempts maximum number of attempts to retry connection in case of failure
*/
void setReconnectAttempts(int reconnectAttempts);
@@ -523,66 +522,66 @@
/**
* Returns true if the client will automatically attempt to connect to the backup server if the initial
* connection to the live server fails
- *
+ *
* Default value is {@link HornetQClient#DEFAULT_FAILOVER_ON_INITIAL_CONNECTION}.
*/
boolean isFailoverOnInitialConnection();
-
+
/**
* Sets the value for FailoverOnInitialReconnection
- *
+ *
* @param failover
*/
void setFailoverOnInitialConnection(boolean failover);
/**
* Returns the class name of the connection load balancing policy.
- *
+ *
* Default value is "org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy".
- *
+ *
* @return the class name of the connection load balancing policy
*/
String getConnectionLoadBalancingPolicyClassName();
/**
* Sets the class name of the connection load balancing policy.
- *
+ *
* Value must be the name of a class implementing {@link ConnectionLoadBalancingPolicy}.
- *
+ *
* @param loadBalancingPolicyClassName class name of the connection load balancing policy
*/
void setConnectionLoadBalancingPolicyClassName(String loadBalancingPolicyClassName);
/**
* Returns the initial size of messages created through this factory.
- *
+ *
* Value is in bytes, default value is {@link HornetQClient#DEFAULT_INITIAL_MESSAGE_PACKET_SIZE}.
- *
+ *
* @return the initial size of messages created through this factory
*/
int getInitialMessagePacketSize();
/**
* Sets the initial size of messages created through this factory.
- *
+ *
* Value must be greater than 0.
- *
+ *
* @param size initial size of messages created through this factory.
*/
void setInitialMessagePacketSize(int size);
-
+
/**
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
- *
+ *
* @param interceptor an Interceptor
*/
void addInterceptor(Interceptor interceptor);
/**
* Removes an interceptor.
- *
+ *
* @param interceptor interceptor to remove
- *
+ *
* @return <code>true</code> if the interceptor is removed from this factory, <code>false</code> else
*/
boolean removeInterceptor(Interceptor interceptor);
@@ -593,9 +592,9 @@
void close();
boolean isHA();
-
+
boolean isCompressLargeMessage();
-
+
void setCompressLargeMessage(boolean compress);
void addClusterTopologyListener(ClusterTopologyListener listener);
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/BridgeControl.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -13,12 +13,13 @@
package org.hornetq.api.core.management;
+import org.hornetq.core.server.cluster.Bridge;
+
/**
* A BridgeControl is used to manage a Bridge.
- *
+ * @see Bridge
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
*/
public interface BridgeControl extends HornetQComponentControl
{
@@ -76,9 +77,9 @@
* Returns whether this bridge is using duplicate detection.
*/
boolean isUseDuplicateDetection();
-
+
/**
- * Returns whether this bridge is using high availability
+ * Returns whether this bridge is using high availability
*/
boolean isHA();
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/DivertControl.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -13,9 +13,10 @@
package org.hornetq.api.core.management;
+import org.hornetq.core.server.Divert;
+
/**
- * A DivertControl is used to manage a divert.
- *
+ * A DivertControl is used to manage a {@link Divert}.
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
public interface DivertControl
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/NotificationType.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -14,8 +14,7 @@
package org.hornetq.api.core.management;
/**
- * Types of notification emmitted by HornetQ servers.
- *
+ * Types of notification emitted by HornetQ servers.
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
public enum NotificationType
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/Operation.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -23,7 +23,7 @@
/**
* Info for a MBean Operation.
- * <b>
+ * <p>
* This annotation is used only for methods which can be invoked
* through a GUI.
*
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-07-05 14:00:57 UTC (rev 10916)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-07-05 14:08:46 UTC (rev 10917)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.concurrent.Executor;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
13 years, 8 months