JBoss hornetq SVN: r10155 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-27 22:43:14 -0500 (Thu, 27 Jan 2011)
New Revision: 10155
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
Log:
upload release
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-01-28 02:08:50 UTC (rev 10154)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-01-28 03:43:14 UTC (rev 10155)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.EAP-QA-10148"/>
+ <property name="hornetq.version" value="2.2.0.EAP-QA-10154"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-01-28 02:08:50 UTC (rev 10154)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-01-28 03:43:14 UTC (rev 10155)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.0.EAP-QA-10148</hornetq.version>
+ <hornetq.version>2.2.0.EAP-QA-10154</hornetq.version>
</properties>
<licenses>
13 years, 11 months
JBoss hornetq SVN: r10154 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-27 21:08:50 -0500 (Thu, 27 Jan 2011)
New Revision: 10154
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
JBPAPP-5523 - Paging settings
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-01-28 02:08:50 UTC (rev 10154)
@@ -114,7 +114,6 @@
final NotificationBroadcasterSupport broadcaster) throws Exception
{
super(HornetQServerControl.class, storageManager);
- new Exception(".....").printStackTrace();
this.postOffice = postOffice;
this.configuration = configuration;
this.resourceManager = resourceManager;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-01-28 02:08:50 UTC (rev 10154)
@@ -155,7 +155,6 @@
final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
- new Exception("new pageStore for " + address).printStackTrace();
if (pagingManager == null)
{
throw new IllegalStateException("Paging Manager can't be null");
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-01-28 02:07:00 UTC (rev 10153)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-01-28 02:08:50 UTC (rev 10154)
@@ -204,7 +204,7 @@
final int maxAddressSize,
final Map<String, AddressSettings> settings)
{
- return createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
+ return createServer(realFiles, configuration, pageSize, maxAddressSize, AddressFullMessagePolicy.PAGE, settings);
}
protected HornetQServer createServer(final boolean realFiles,
13 years, 11 months
JBoss hornetq SVN: r10153 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-27 21:07:00 -0500 (Thu, 27 Jan 2011)
New Revision: 10153
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
JBPAPP-5523 - Paging settings
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -114,6 +114,7 @@
final NotificationBroadcasterSupport broadcaster) throws Exception
{
super(HornetQServerControl.class, storageManager);
+ new Exception(".....").printStackTrace();
this.postOffice = postOffice;
this.configuration = configuration;
this.resourceManager = resourceManager;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingManager.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -19,6 +19,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
/**
*
@@ -44,7 +45,7 @@
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*
*/
-public interface PagingManager extends HornetQComponent
+public interface PagingManager extends HornetQComponent, HierarchicalRepositoryChangeListener
{
/** To return the PageStore associated with the address */
PagingStore getPageStore(SimpleString address) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PagingStore.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -20,6 +20,7 @@
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
/**
*
@@ -54,6 +55,8 @@
long getAddressSize();
long getMaxSize();
+
+ void applySetting(AddressSettings addressSettings);
boolean isPaging();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -69,15 +69,38 @@
{
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
+ addressSettingsRepository.registerListener(this);
this.storageManager = storageManager;
}
// Public
// ---------------------------------------------------------------------------------------------------------------------------
+
+ // Hierarchical changes listener
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ reaplySettings();
+ }
+
+
+
// PagingManager implementation
// -----------------------------------------------------------------------------------------------------
+ public void reaplySettings()
+ {
+ for (PagingStore store : stores.values())
+ {
+ AddressSettings settings = this.addressSettingsRepository.getMatch(store.getAddress().toString());
+ store.applySetting(settings);
+ }
+ }
+
public SimpleString[] getStoreNames()
{
Set<SimpleString> names = stores.keySet();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -22,7 +22,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -45,7 +44,6 @@
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.RouteContextList;
@@ -97,11 +95,11 @@
// Used to schedule sync threads
private final PageSyncTimer syncTimer;
- private final long maxSize;
+ private long maxSize;
- private final long pageSize;
+ private long pageSize;
- private final AddressFullMessagePolicy addressFullMessagePolicy;
+ private AddressFullMessagePolicy addressFullMessagePolicy;
private boolean printedDropMessagesWarning;
@@ -112,8 +110,6 @@
// Bytes consumed by the queue on the memory
private final AtomicLong sizeInBytes = new AtomicLong();
- private final AtomicBoolean depaging = new AtomicBoolean(false);
-
private volatile int numberOfPages;
private volatile int firstPageId;
@@ -124,9 +120,6 @@
private volatile boolean paging = false;
- /** duplicate cache used at this address */
- private final DuplicateIDCache duplicateCache;
-
private final PageCursorProvider cursorProvider;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -162,6 +155,7 @@
final ExecutorFactory executorFactory,
final boolean syncNonTransactional)
{
+ new Exception("new pageStore for " + address).printStackTrace();
if (pagingManager == null)
{
throw new IllegalStateException("Paging Manager can't be null");
@@ -175,12 +169,8 @@
this.storeName = storeName;
- maxSize = addressSettings.getMaxSizeBytes();
+ applySetting(addressSettings);
- pageSize = addressSettings.getPageSizeBytes();
-
- addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
-
if (addressFullMessagePolicy == AddressFullMessagePolicy.PAGE && maxSize != -1 && pageSize >= maxSize)
{
throw new IllegalStateException("pageSize for address " + address +
@@ -212,16 +202,18 @@
this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
- // Post office could be null on the backup node
- if (postOffice == null)
- {
- this.duplicateCache = null;
- }
- else
- {
- this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
- }
+ }
+ /**
+ * @param addressSettings
+ */
+ public void applySetting(final AddressSettings addressSettings)
+ {
+ maxSize = addressSettings.getMaxSizeBytes();
+
+ pageSize = addressSettings.getPageSizeBytes();
+
+ addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
}
// Public --------------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -17,17 +17,29 @@
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -36,7 +48,11 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.jms.client.HornetQJMSConnectionFactory;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -307,7 +323,7 @@
assertEquals(i, msg.getIntProperty("id").intValue());
msg.acknowledge();
}
-
+
assertNull(cons.receiveImmediate());
sess.close();
sl.close();
@@ -348,22 +364,20 @@
assertEquals(numberOfMessages, q2.getMessageCount());
assertEquals(0, q1.getMessageCount());
-
-
+
session.close();
sf.close();
locator.close();
-
+
server.stop();
-
-
+
server.start();
-
+
Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
-
+
q1 = null;
q2 = null;
-
+
for (Binding bind : bindings.getBindings())
{
if (bind instanceof LocalQueueBinding)
@@ -373,25 +387,20 @@
{
q1 = qb.getQueue();
}
-
+
if (qb.getQueue().getName().equals(new SimpleString("inactive")))
{
q2 = qb.getQueue();
}
}
}
-
+
assertNotNull(q1);
-
+
assertNotNull(q2);
-
assertEquals(numberOfMessages, q2.getMessageCount());
assertEquals(0, q1.getMessageCount());
-
-
-
-
}
catch (Throwable e)
@@ -510,7 +519,7 @@
}
session.commit();
-
+
q1.getMessageCount();
t1.start();
@@ -518,10 +527,10 @@
assertEquals(0, errors.get());
long timeout = System.currentTimeMillis() + 10000;
- while (numberOfMessages -100 != q1.getMessageCount() && System.currentTimeMillis() < timeout)
+ while (numberOfMessages - 100 != q1.getMessageCount() && System.currentTimeMillis() < timeout)
{
Thread.sleep(500);
-
+
}
assertEquals(numberOfMessages, q2.getMessageCount());
@@ -833,8 +842,154 @@
{
}
}
+ }
+ public void testPagingOverCreatedDestinationTopics() throws Exception
+ {
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+ JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+ InVMContext context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ jmsServer.createTopic(true, "tt", "/topic/TT");
+
+ server.getHornetQServerControl().addAddressSettings("jms.topic.TT",
+ "DLQ",
+ "DLQ",
+ false,
+ 5,
+ 1024 * 1024,
+ 1024 * 10,
+ 5,
+ 0,
+ false,
+ "PAGE");
+
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ Connection conn = cf.createConnection();
+ conn.setClientID("tst");
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup("/topic/TT");
+ sess.createDurableSubscriber(topic, "t1");
+
+ MessageProducer prod = sess.createProducer(topic);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage txt = sess.createTextMessage("TST");
+ prod.send(txt);
+
+ PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.topic.TT"));
+
+ assertEquals(1024 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+
+ jmsServer.stop();
+
+ server = createServer(true, config, PAGE_SIZE, -1, new HashMap<String, AddressSettings>());
+
+ jmsServer = new JMSServerManagerImpl(server);
+ context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.topic.TT");
+
+ assertEquals(1024 * 1024, settings.getMaxSizeBytes());
+ assertEquals(10 * 1024, settings.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+ store = server.getPagingManager().getPageStore(new SimpleString("TT"));
+
+ server.stop();
+
}
+
+ public void testPagingOverCreatedDestinationQueues() throws Exception
+ {
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, -1, -1, AddressFullMessagePolicy.BLOCK, new HashMap<String, AddressSettings>());
+
+ JMSServerManagerImpl jmsServer = new JMSServerManagerImpl(server);
+ InVMContext context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ server.getHornetQServerControl().addAddressSettings("jms.queue.Q1",
+ "DLQ",
+ "DLQ",
+ false,
+ 5,
+ 100 * 1024,
+ 10 * 1024,
+ 5,
+ 0,
+ false,
+ "PAGE");
+
+ jmsServer.createQueue(true, "Q1", null, true, "/queue/Q1");
+
+ HornetQJMSConnectionFactory cf = (HornetQJMSConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ Connection conn = cf.createConnection();
+ conn.setClientID("tst");
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = (javax.jms.Queue)context.lookup("/queue/Q1");
+
+ MessageProducer prod = sess.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ BytesMessage bmt = sess.createBytesMessage();
+
+ bmt.writeBytes(new byte[1024]);
+
+ for (int i = 0 ; i < 500; i++)
+ {
+ prod.send(bmt);
+ }
+
+ PagingStore store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+
+ assertEquals(100 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+
+ jmsServer.stop();
+
+ server = createServer(true, config, -1, -1, AddressFullMessagePolicy.BLOCK, new HashMap<String, AddressSettings>());
+
+ jmsServer = new JMSServerManagerImpl(server);
+ context = new InVMContext();
+ jmsServer.setContext(context);
+ jmsServer.start();
+
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch("jms.queue.Q1");
+
+ assertEquals(100 * 1024, settings.getMaxSizeBytes());
+ assertEquals(10 * 1024, settings.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, settings.getAddressFullMessagePolicy());
+
+ store = server.getPagingManager().getPageStore(new SimpleString("jms.queue.Q1"));
+ assertEquals(100 * 1024, store.getMaxSize());
+ assertEquals(10 * 1024, store.getPageSizeBytes());
+ assertEquals(AddressFullMessagePolicy.PAGE, store.getAddressFullMessagePolicy());
+
+
+ server.stop();
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -24,15 +24,12 @@
import junit.framework.Assert;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ClusterConnectionControl;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -1110,6 +1110,13 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ }
+
}
class FakeStorageManager implements StorageManager
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -333,6 +333,13 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ }
+
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-01-27 18:36:57 UTC (rev 10152)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-01-28 02:07:00 UTC (rev 10153)
@@ -42,6 +42,7 @@
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
@@ -203,6 +204,16 @@
final int maxAddressSize,
final Map<String, AddressSettings> settings)
{
+ return createServer(realFiles, configuration, pageSize, maxAddressSize, settings);
+ }
+
+ protected HornetQServer createServer(final boolean realFiles,
+ final Configuration configuration,
+ final int pageSize,
+ final int maxAddressSize,
+ final AddressFullMessagePolicy fullPolicy,
+ final Map<String, AddressSettings> settings)
+ {
HornetQServer server;
if (realFiles)
@@ -222,6 +233,7 @@
AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(pageSize);
defaultSetting.setMaxSizeBytes(maxAddressSize);
+ defaultSetting.setAddressFullMessagePolicy(fullPolicy);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
13 years, 11 months
JBoss hornetq SVN: r10152 - branches/Branch_2_2_EAP/docs/user-manual/en.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-27 13:36:57 -0500 (Thu, 27 Jan 2011)
New Revision: 10152
Modified:
branches/Branch_2_2_EAP/docs/user-manual/en/large-messages.xml
Log:
small tweak on doc for large message
Modified: branches/Branch_2_2_EAP/docs/user-manual/en/large-messages.xml
===================================================================
--- branches/Branch_2_2_EAP/docs/user-manual/en/large-messages.xml 2011-01-27 18:34:24 UTC (rev 10151)
+++ branches/Branch_2_2_EAP/docs/user-manual/en/large-messages.xml 2011-01-27 18:36:57 UTC (rev 10152)
@@ -176,6 +176,9 @@
msg.setInputStream(dataInputStream);
...
</programlisting>
+
+ <para>Notice also that for messages with more than 2GiB the getBodySize() will return invalid values since
+ this is an integer (which is also exposed to the JMS API). On those cases you can use the message property _HQ_LARGE_SIZE.</para>
</section>
<section id="large-messages.streaming.over.jms">
<title>Streaming over JMS</title>
13 years, 11 months
JBoss hornetq SVN: r10151 - branches/Branch_2_2_EAP/examples/jms/large-message/src/org/hornetq/jms/example.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-27 13:34:24 -0500 (Thu, 27 Jan 2011)
New Revision: 10151
Modified:
branches/Branch_2_2_EAP/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
Log:
small tweak
Modified: branches/Branch_2_2_EAP/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- branches/Branch_2_2_EAP/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-27 14:06:20 UTC (rev 10150)
+++ branches/Branch_2_2_EAP/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2011-01-27 18:34:24 UTC (rev 10151)
@@ -150,7 +150,7 @@
// an empty body.
BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
- System.out.println("Received message with: " + messageReceived.getBodyLength() +
+ System.out.println("Received message with: " + messageReceived.getLongProperty("_HQ_LARGE_SIZE") +
" bytes. Now streaming to file on disk.");
// Step 13. We set an OutputStream on the message. This causes the message body to be written to the
13 years, 11 months
JBoss hornetq SVN: r10150 - in branches/HORNETQ-316: src/main/org/hornetq/api/core/client and 7 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-01-27 09:06:20 -0500 (Thu, 27 Jan 2011)
New Revision: 10150
Added:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
Removed:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Modified:
branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
Log:
https://issues.jboss.org/browse/HORNETQ-316
first commit. just implemented pluggable discovery strategy and removed compile errors. not yet debugged.
Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 Red Hat, Inc.
+ * Copyright 2010 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -14,169 +14,45 @@
package org.hornetq.api.core;
import java.io.Serializable;
-
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.UUIDGenerator;
-
+import java.util.Map;
+import java.util.Properties;
/**
* A DiscoveryGroupConfiguration
*
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 18 Nov 2008 08:47:30
- *
- *
+ * @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
*/
public class DiscoveryGroupConfiguration implements Serializable
{
- private static final long serialVersionUID = 8657206421727863400L;
-
- private static final Logger log = Logger.getLogger(DiscoveryGroupConfiguration.class);
+ private static final long serialVersionUID = 2877108926493109407L;
-
- private String name;
+ private final String serverLocatorClassName;
- private String localBindAddress;
-
- private String groupAddress;
-
- private int groupPort;
-
- private long refreshTimeout;
+ private final String name;
- private long discoveryInitialWaitTimeout;
-
- public DiscoveryGroupConfiguration(final String name,
- final String localBindAddress,
- final String groupAddress,
- final int groupPort,
- final long refreshTimeout,
- final long discoveryInitialWaitTimeout)
- {
- this.name = name;
- this.groupAddress = groupAddress;
- this.localBindAddress = localBindAddress;
- this.groupPort = groupPort;
- this.refreshTimeout = refreshTimeout;
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
- }
-
- public DiscoveryGroupConfiguration(final String groupAddress,
- final int groupPort)
- {
- this(UUIDGenerator.getInstance().generateStringUUID(), null, groupAddress, groupPort, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
- }
-
- public String getName()
- {
- return name;
- }
+ private final Map<String, Object> params;
- public String getLocalBindAddress()
+ public DiscoveryGroupConfiguration(final String clazz, final Map<String, Object> params, final String name)
{
- return localBindAddress;
- }
-
- public String getGroupAddress()
- {
- return groupAddress;
- }
-
- public int getGroupPort()
- {
- return groupPort;
- }
-
- public long getRefreshTimeout()
- {
- return refreshTimeout;
- }
-
- /**
- * @param name the name to set
- */
- public void setName(final String name)
- {
+ this.serverLocatorClassName = clazz;
+
+ this.params = params;
+
this.name = name;
}
- /**
- * @param localBindAddress the localBindAddress to set
- */
- public void setLocalBindAdress(final String localBindAddress)
+ public String getServerLocatorClassName()
{
- this.localBindAddress = localBindAddress;
+ return this.serverLocatorClassName;
}
- /**
- * @param groupAddress the groupAddress to set
- */
- public void setGroupAddress(final String groupAddress)
+ public Map<String, Object> getParams()
{
- this.groupAddress = groupAddress;
+ return this.params;
}
- /**
- * @param groupPort the groupPort to set
- */
- public void setGroupPort(final int groupPort)
+ public String getName()
{
- this.groupPort = groupPort;
+ return this.name;
}
-
- /**
- * @param refreshTimeout the refreshTimeout to set
- */
- public void setRefreshTimeout(final long refreshTimeout)
- {
- this.refreshTimeout = refreshTimeout;
- }
-
- /**
- * @return the discoveryInitialWaitTimeout
- */
- public long getDiscoveryInitialWaitTimeout()
- {
- return discoveryInitialWaitTimeout;
- }
-
- /**
- * @param discoveryInitialWaitTimeout the discoveryInitialWaitTimeout to set
- */
- public void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
- {
- this.discoveryInitialWaitTimeout = discoveryInitialWaitTimeout;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- DiscoveryGroupConfiguration that = (DiscoveryGroupConfiguration) o;
-
- if (discoveryInitialWaitTimeout != that.discoveryInitialWaitTimeout) return false;
- if (groupPort != that.groupPort) return false;
- if (refreshTimeout != that.refreshTimeout) return false;
- if (groupAddress != null ? !groupAddress.equals(that.groupAddress) : that.groupAddress != null) return false;
- if (localBindAddress != null ? !localBindAddress.equals(that.localBindAddress) : that.localBindAddress != null)
- return false;
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode()
- {
- int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (localBindAddress != null ? localBindAddress.hashCode() : 0);
- result = 31 * result + (groupAddress != null ? groupAddress.hashCode() : 0);
- result = 31 * result + groupPort;
- result = 31 * result + (int) (refreshTimeout ^ (refreshTimeout >>> 32));
- result = 31 * result + (int) (discoveryInitialWaitTimeout ^ (discoveryInitialWaitTimeout >>> 32));
- return result;
- }
+
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/api/core/client/HornetQClient.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -13,13 +13,15 @@
package org.hornetq.api.core.client;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
+import org.hornetq.core.client.impl.StaticServerLocatorImpl;
-import java.util.List;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.Map;
/**
* Utility class for creating HornetQ {@link ClientSessionFactory} objects.
@@ -107,21 +109,34 @@
*/
public static ServerLocator createServerLocatorWithoutHA(TransportConfiguration... transportConfigurations)
{
- return new ServerLocatorImpl(false, transportConfigurations);
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, transportConfigurations);
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, null);
+ return createServerLocatorWithoutHA(config);
}
/**
* Create a ServerLocator which creates session factories from a set of live servers, no HA backup information is propagated to the client
*
- * The UDP address and port are used to listen for live servers in the cluster
- *
- * @param discoveryAddress The UDP group address to listen for updates
- * @param discoveryPort the UDP port to listen for updates
+ * @param groupConfiguration The configuration for server discovery
* @return the ServerLocator
*/
public static ServerLocator createServerLocatorWithoutHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(false, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(Boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.FALSE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ new HornetQException(HornetQException.INTERNAL_ERROR, "Could not instantiate ServerLocator implementation class: " + className, e);
+ }
+ return serverLocator;
}
/**
@@ -135,9 +150,12 @@
*/
public static ServerLocator createServerLocatorWithHA(TransportConfiguration... initialServers)
{
- return new ServerLocatorImpl(true, initialServers);
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, initialServers);
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(StaticServerLocatorImpl.class.getName(), params, null);
+ return createServerLocatorWithHA(config);
}
-
+
/**
* Create a ServerLocator which will receive cluster topology updates from the cluster as servers leave or join and new backups are appointed or removed.
* The discoveryAddress and discoveryPort parameters in this method are used to listen for UDP broadcasts which contain connection information for members of the cluster.
@@ -150,7 +168,20 @@
*/
public static ServerLocator createServerLocatorWithHA(final DiscoveryGroupConfiguration groupConfiguration)
{
- return new ServerLocatorImpl(true, groupConfiguration);
+ ServerLocator serverLocator = null;
+ String className = groupConfiguration.getServerLocatorClassName();
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class<?> clazz = loader.loadClass(className);
+ Constructor<?> constructor = clazz.getConstructor(Boolean.class, DiscoveryGroupConfiguration.class);
+ serverLocator = (ServerLocator)constructor.newInstance(Boolean.TRUE, groupConfiguration);
+ }
+ catch(Exception e)
+ {
+ new HornetQException(HornetQException.INTERNAL_ERROR, "Could not instantiate ServerLocator implementation class: " + className, e);
+ }
+ return serverLocator;
}
Copied: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (from rev 10124, branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java)
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/AbstractServerLocator.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,1026 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A AbstractServerLocator
+ *
+ * @author Tim Fox
+ */
+public abstract class AbstractServerLocator implements ServerLocatorInternal, Serializable
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(AbstractServerLocator.class);
+
+ private final boolean ha;
+
+ private boolean finalizeCheck = true;
+
+ private boolean clusterConnection;
+
+ private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+
+ private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
+
+ private TransportConfiguration[] initialConnectors;
+
+ private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+
+ private Topology topology = new Topology();
+
+ private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
+
+ private boolean receivedTopology;
+
+ private boolean compressLargeMessage;
+
+ private ExecutorService threadPool;
+
+ private ScheduledExecutorService scheduledThreadPool;
+
+ private ConnectionLoadBalancingPolicy loadBalancingPolicy;
+
+ private boolean readOnly;
+
+ // Settable attributes:
+
+ private boolean cacheLargeMessagesClient;
+
+ private long clientFailureCheckPeriod;
+
+ private long connectionTTL;
+
+ private long callTimeout;
+
+ private int minLargeMessageSize;
+
+ private int consumerWindowSize;
+
+ private int consumerMaxRate;
+
+ private int confirmationWindowSize;
+
+ private int producerWindowSize;
+
+ private int producerMaxRate;
+
+ private boolean blockOnAcknowledge;
+
+ private boolean blockOnDurableSend;
+
+ private boolean blockOnNonDurableSend;
+
+ private boolean autoGroup;
+
+ private boolean preAcknowledge;
+
+ private String connectionLoadBalancingPolicyClassName;
+
+ private int ackBatchSize;
+
+ private boolean useGlobalPools;
+
+ private int scheduledThreadPoolMaxSize;
+
+ private int threadPoolMaxSize;
+
+ private long retryInterval;
+
+ private double retryIntervalMultiplier;
+
+ private long maxRetryInterval;
+
+ private int reconnectAttempts;
+
+ private int initialConnectAttempts;
+
+ private boolean failoverOnInitialConnection;
+
+ private int initialMessagePacketSize;
+
+ private volatile boolean closed;
+
+ private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
+
+ private static ExecutorService globalThreadPool;
+
+ private static ScheduledExecutorService globalScheduledThreadPool;
+
+ private String groupID;
+
+ private String nodeID;
+
+ private TransportConfiguration clusterTransportConfiguration;
+
+ private boolean backup;
+
+ private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ private static synchronized ExecutorService getGlobalThreadPool()
+ {
+ if (globalThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
+
+ globalThreadPool = Executors.newCachedThreadPool(factory);
+ }
+
+ return globalThreadPool;
+ }
+
+ public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
+ {
+ if (globalScheduledThreadPool == null)
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
+ true,
+ getThisClassLoader());
+
+ globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+
+ factory);
+ }
+
+ return globalScheduledThreadPool;
+ }
+
+ protected void setThreadPools()
+ {
+ if (useGlobalPools)
+ {
+ threadPool = getGlobalThreadPool();
+
+ scheduledThreadPool = getGlobalScheduledThreadPool();
+ }
+ else
+ {
+ ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ if (threadPoolMaxSize == -1)
+ {
+ threadPool = Executors.newCachedThreadPool(factory);
+ }
+ else
+ {
+ threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
+ }
+
+ factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
+ true,
+ getThisClassLoader());
+
+ scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
+ }
+ }
+
+ private static ClassLoader getThisClassLoader()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
+ }
+
+ protected void instantiateLoadBalancingPolicy()
+ {
+ if (connectionLoadBalancingPolicyClassName == null)
+ {
+ throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
+ }
+
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ return null;
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
+ "\"",
+ e);
+ }
+ }
+ });
+ }
+
+ public AbstractServerLocator(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ e.fillInStackTrace();
+ this.ha = useHA;
+
+ this.discoveryGroupConfiguration = discoveryGroupConfiguration;
+
+ this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
+
+ clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+
+ connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
+
+ minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+
+ consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
+
+ confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+
+ producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
+
+ producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
+
+ blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+
+ blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
+
+ blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
+
+ autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
+
+ preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
+
+ ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
+
+ connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+
+ useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
+
+ scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+
+ threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
+
+ retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
+
+ retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+
+ maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
+
+ reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
+
+ initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
+
+ failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
+
+ cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+
+ compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
+
+ clusterConnection = false;
+ }
+
+ protected TransportConfiguration selectConnector()
+ {
+ if (receivedTopology)
+ {
+ int pos = loadBalancingPolicy.select(topologyArray.length);
+
+ Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
+
+ return pair.a;
+ }
+ else
+ {
+ // Get from initialconnectors
+
+ int pos = loadBalancingPolicy.select(initialConnectors.length);
+
+ return initialConnectors[pos];
+ }
+ }
+
+ protected int getConnectorLength()
+ {
+ if(receivedTopology)
+ {
+ return topologyArray.length;
+ }
+ else if(initialConnectors == null)
+ {
+ return -1;
+ }
+ else
+ {
+ return initialConnectors.length;
+ }
+ }
+
+ public void disableFinalizeCheck()
+ {
+ finalizeCheck = false;
+ }
+
+ public synchronized boolean isHA()
+ {
+ return ha;
+ }
+
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ return cacheLargeMessagesClient;
+ }
+
+ public synchronized void setCacheLargeMessagesClient(final boolean cached)
+ {
+ cacheLargeMessagesClient = cached;
+ }
+
+ public synchronized long getClientFailureCheckPeriod()
+ {
+ return clientFailureCheckPeriod;
+ }
+
+ public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
+ {
+ checkWrite();
+ this.clientFailureCheckPeriod = clientFailureCheckPeriod;
+ }
+
+ public synchronized long getConnectionTTL()
+ {
+ return connectionTTL;
+ }
+
+ public synchronized void setConnectionTTL(final long connectionTTL)
+ {
+ checkWrite();
+ this.connectionTTL = connectionTTL;
+ }
+
+ public synchronized long getCallTimeout()
+ {
+ return callTimeout;
+ }
+
+ public synchronized void setCallTimeout(final long callTimeout)
+ {
+ checkWrite();
+ this.callTimeout = callTimeout;
+ }
+
+ public synchronized int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
+ {
+ checkWrite();
+ this.minLargeMessageSize = minLargeMessageSize;
+ }
+
+ public synchronized int getConsumerWindowSize()
+ {
+ return consumerWindowSize;
+ }
+
+ public synchronized void setConsumerWindowSize(final int consumerWindowSize)
+ {
+ checkWrite();
+ this.consumerWindowSize = consumerWindowSize;
+ }
+
+ public synchronized int getConsumerMaxRate()
+ {
+ return consumerMaxRate;
+ }
+
+ public synchronized void setConsumerMaxRate(final int consumerMaxRate)
+ {
+ checkWrite();
+ this.consumerMaxRate = consumerMaxRate;
+ }
+
+ public synchronized int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
+ public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
+ {
+ checkWrite();
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
+
+ public synchronized int getProducerWindowSize()
+ {
+ return producerWindowSize;
+ }
+
+ public synchronized void setProducerWindowSize(final int producerWindowSize)
+ {
+ checkWrite();
+ this.producerWindowSize = producerWindowSize;
+ }
+
+ public synchronized int getProducerMaxRate()
+ {
+ return producerMaxRate;
+ }
+
+ public synchronized void setProducerMaxRate(final int producerMaxRate)
+ {
+ checkWrite();
+ this.producerMaxRate = producerMaxRate;
+ }
+
+ public synchronized boolean isBlockOnAcknowledge()
+ {
+ return blockOnAcknowledge;
+ }
+
+ public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
+ {
+ checkWrite();
+ this.blockOnAcknowledge = blockOnAcknowledge;
+ }
+
+ public synchronized boolean isBlockOnDurableSend()
+ {
+ return blockOnDurableSend;
+ }
+
+ public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
+ {
+ checkWrite();
+ this.blockOnDurableSend = blockOnDurableSend;
+ }
+
+ public synchronized boolean isBlockOnNonDurableSend()
+ {
+ return blockOnNonDurableSend;
+ }
+
+ public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
+ {
+ checkWrite();
+ this.blockOnNonDurableSend = blockOnNonDurableSend;
+ }
+
+ public synchronized boolean isAutoGroup()
+ {
+ return autoGroup;
+ }
+
+ public synchronized void setAutoGroup(final boolean autoGroup)
+ {
+ checkWrite();
+ this.autoGroup = autoGroup;
+ }
+
+ public synchronized boolean isPreAcknowledge()
+ {
+ return preAcknowledge;
+ }
+
+ public synchronized void setPreAcknowledge(final boolean preAcknowledge)
+ {
+ checkWrite();
+ this.preAcknowledge = preAcknowledge;
+ }
+
+ public synchronized int getAckBatchSize()
+ {
+ return ackBatchSize;
+ }
+
+ public synchronized void setAckBatchSize(final int ackBatchSize)
+ {
+ checkWrite();
+ this.ackBatchSize = ackBatchSize;
+ }
+
+ public synchronized boolean isUseGlobalPools()
+ {
+ return useGlobalPools;
+ }
+
+ public synchronized void setUseGlobalPools(final boolean useGlobalPools)
+ {
+ checkWrite();
+ this.useGlobalPools = useGlobalPools;
+ }
+
+ public synchronized int getScheduledThreadPoolMaxSize()
+ {
+ return scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
+ {
+ checkWrite();
+ this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
+ }
+
+ public synchronized int getThreadPoolMaxSize()
+ {
+ return threadPoolMaxSize;
+ }
+
+ public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
+ {
+ checkWrite();
+ this.threadPoolMaxSize = threadPoolMaxSize;
+ }
+
+ public synchronized long getRetryInterval()
+ {
+ return retryInterval;
+ }
+
+ public synchronized void setRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ this.retryInterval = retryInterval;
+ }
+
+ public synchronized long getMaxRetryInterval()
+ {
+ return maxRetryInterval;
+ }
+
+ public synchronized void setMaxRetryInterval(final long retryInterval)
+ {
+ checkWrite();
+ maxRetryInterval = retryInterval;
+ }
+
+ public synchronized double getRetryIntervalMultiplier()
+ {
+ return retryIntervalMultiplier;
+ }
+
+ public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
+ {
+ checkWrite();
+ this.retryIntervalMultiplier = retryIntervalMultiplier;
+ }
+
+ public synchronized int getReconnectAttempts()
+ {
+ return reconnectAttempts;
+ }
+
+ public synchronized void setReconnectAttempts(final int reconnectAttempts)
+ {
+ checkWrite();
+ this.reconnectAttempts = reconnectAttempts;
+ }
+
+ public void setInitialConnectAttempts(int initialConnectAttempts)
+ {
+ checkWrite();
+ this.initialConnectAttempts = initialConnectAttempts;
+ }
+
+ public int getInitialConnectAttempts()
+ {
+ return initialConnectAttempts;
+ }
+
+ public synchronized boolean isFailoverOnInitialConnection()
+ {
+ return this.failoverOnInitialConnection;
+ }
+
+ public synchronized void setFailoverOnInitialConnection(final boolean failover)
+ {
+ checkWrite();
+ this.failoverOnInitialConnection = failover;
+ }
+
+ public synchronized String getConnectionLoadBalancingPolicyClassName()
+ {
+ return connectionLoadBalancingPolicyClassName;
+ }
+
+ public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
+ {
+ checkWrite();
+ connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
+ }
+
+ public TransportConfiguration[] getStaticTransportConfigurations()
+ {
+ return this.initialConnectors;
+ }
+
+ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
+ {
+ return discoveryGroupConfiguration;
+ }
+
+ public void addInterceptor(final Interceptor interceptor)
+ {
+ interceptors.add(interceptor);
+ }
+
+ public boolean removeInterceptor(final Interceptor interceptor)
+ {
+ return interceptors.remove(interceptor);
+ }
+
+ public synchronized int getInitialMessagePacketSize()
+ {
+ return initialMessagePacketSize;
+ }
+
+ public synchronized void setInitialMessagePacketSize(final int size)
+ {
+ checkWrite();
+ initialMessagePacketSize = size;
+ }
+
+ public void setGroupID(final String groupID)
+ {
+ checkWrite();
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ public boolean isCompressLargeMessage()
+ {
+ return compressLargeMessage;
+ }
+
+ public void setCompressLargeMessage(boolean compress)
+ {
+ this.compressLargeMessage = compress;
+ }
+
+ private void checkWrite()
+ {
+ if (readOnly)
+ {
+ throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
+ }
+ }
+
+ public void setNodeID(String nodeID)
+ {
+ this.nodeID = nodeID;
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public void setClusterConnection(boolean clusterConnection)
+ {
+ this.clusterConnection = clusterConnection;
+ }
+
+ public boolean isClusterConnection()
+ {
+ return clusterConnection;
+ }
+
+ public TransportConfiguration getClusterTransportConfiguration()
+ {
+ return clusterTransportConfiguration;
+ }
+
+ public void setClusterTransportConfiguration(TransportConfiguration tc)
+ {
+ this.clusterTransportConfiguration = tc;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public void setBackup(boolean backup)
+ {
+ this.backup = backup;
+ }
+
+ protected boolean isReceivedTopology()
+ {
+ return this.receivedTopology;
+ }
+
+ protected boolean doFinalizeCheck()
+ {
+ return this.finalizeCheck;
+ }
+
+ protected ConnectionLoadBalancingPolicy getLoadBalancingPolicy()
+ {
+ return this.loadBalancingPolicy;
+ }
+
+ protected ExecutorService getThreadPool()
+ {
+ return threadPool;
+ }
+
+ protected ScheduledExecutorService getScheduledThreadPool()
+ {
+ return scheduledThreadPool;
+ }
+
+ protected List<Interceptor> getInterceptors()
+ {
+ return this.interceptors;
+ }
+
+ protected TransportConfiguration[] getInitialConnectors()
+ {
+ return this.initialConnectors;
+ }
+
+ protected void setInitialConnectors(TransportConfiguration[] initialConnectors)
+ {
+ this.initialConnectors = initialConnectors;
+ }
+
+ protected boolean isReadOnly()
+ {
+ return this.readOnly;
+ }
+
+ protected void setReadOnly(boolean readOnly)
+ {
+ this.readOnly = readOnly;
+ }
+
+ protected boolean isClosed()
+ {
+ return this.closed;
+ }
+
+ @Override
+ protected void finalize() throws Throwable
+ {
+ if (finalizeCheck)
+ {
+ close();
+ }
+
+ super.finalize();
+ }
+
+ public void close()
+ {
+ if (closed)
+ {
+ return;
+ }
+
+
+ for (ClientSessionFactory factory : factories)
+ {
+ factory.close();
+ }
+
+ factories.clear();
+
+ if (!useGlobalPools)
+ {
+ if (threadPool != null)
+ {
+ threadPool.shutdown();
+
+ try
+ {
+ if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ if (scheduledThreadPool != null)
+ {
+ scheduledThreadPool.shutdown();
+
+ try
+ {
+ if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
+ {
+ log.warn("Timed out waiting for scheduled pool to terminate");
+ }
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ readOnly = false;
+
+ closed = true;
+ }
+
+ public synchronized void notifyNodeDown(final String nodeID)
+ {
+ boolean removed = false;
+
+ if (!ha)
+ {
+ return;
+ }
+
+ removed = topology.removeMember(nodeID);
+
+ if (!topology.isEmpty())
+ {
+ updateArraysAndPairs();
+
+ if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
+ {
+ receivedTopology = false;
+ }
+ }
+ else
+ {
+ topologyArray = null;
+
+ receivedTopology = false;
+ }
+
+ if (removed)
+ {
+ for (ClusterTopologyListener listener : topologyListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
+ }
+ }
+
+ public synchronized void notifyNodeUp(final String nodeID,
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last)
+ {
+ if (!ha)
+ {
+ return;
+ }
+
+ topology.addMember(nodeID, new TopologyMember(connectorPair));
+
+ TopologyMember actMember = topology.getMember(nodeID);
+
+ if (actMember.getConnector().a != null && actMember.getConnector().b != null)
+ {
+ for (ClientSessionFactory factory : factories)
+ {
+ ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
+ actMember.getConnector().b);
+ }
+ }
+
+ if (connectorPair.a != null)
+ {
+ updateArraysAndPairs();
+ }
+
+ if (last)
+ {
+ receivedTopology = true;
+ }
+
+ for (ClusterTopologyListener listener : topologyListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last);
+ }
+
+ // Notify if waiting on getting topology
+ notify();
+ }
+
+ private void updateArraysAndPairs()
+ {
+ topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
+ topology.members());
+
+ int count = 0;
+ for (TopologyMember pair : topology.getMembers())
+ {
+ topologyArray[count++] = pair.getConnector();
+ }
+ }
+
+ public synchronized void factoryClosed(final ClientSessionFactory factory)
+ {
+ factories.remove(factory);
+
+ if (factories.isEmpty())
+ {
+ // Go back to using the broadcast or static list
+
+ receivedTopology = false;
+
+ topology = null;
+
+ }
+ }
+
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topologyListeners.add(listener);
+ if(topology.members() > 0)
+ {
+ System.out.println("ServerLocatorImpl.addClusterTopologyListener");
+ }
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ topologyListeners.remove(listener);
+ }
+
+ public synchronized void addFactory(ClientSessionFactoryInternal factory)
+ {
+ if (factory != null)
+ {
+ TransportConfiguration backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
+ factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
+ factories.add(factory);
+ }
+ }
+
+ public static void shutdown()
+ {
+ if (globalScheduledThreadPool != null)
+ {
+ globalScheduledThreadPool.shutdown();
+ globalScheduledThreadPool = null;
+ }
+ if (globalThreadPool != null)
+ {
+ globalThreadPool.shutdown();
+ globalThreadPool = null;
+ }
+ }
+
+ public boolean isStaticDirectConnection(TransportConfiguration conf)
+ {
+ return false;
+ }
+}
Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi(a)gmail.com\">Tomohisa Igarashi</a>"
+ *
+ */
+public class DiscoveryGroupConstants
+{
+ // for static discovery
+ public static final String STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME = "static-connector-ref-list";
+ public static final String STATIC_CONNECTORS_LIST_NAME = "static-connector-list";
+
+ // for simple UDP discovery
+ public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
+ public static final String GROUP_ADDRESS_NAME = "group-address";
+ public static final String GROUP_PORT_NAME = "group-port";
+ public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+ public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/DiscoveryGroupConstants.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Deleted: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -1,1456 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.client.impl;
-
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
-import org.hornetq.core.cluster.DiscoveryEntry;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.cluster.DiscoveryListener;
-import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUIDGenerator;
-
-/**
- * A ServerLocatorImpl
- *
- * @author Tim Fox
- */
-public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
-{
- private static final long serialVersionUID = -1615857864410205260L;
-
- private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
-
- private final boolean ha;
-
- private boolean finalizeCheck = true;
-
- private boolean clusterConnection;
-
- private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
-
- private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
-
- private TransportConfiguration[] initialConnectors;
-
- private DiscoveryGroupConfiguration discoveryGroupConfiguration;
-
- private StaticConnector staticConnector = new StaticConnector();
-
- private Topology topology = new Topology();
-
- private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
-
- private boolean receivedTopology;
-
- private boolean compressLargeMessage;
-
- private ExecutorService threadPool;
-
- private ScheduledExecutorService scheduledThreadPool;
-
- private DiscoveryGroup discoveryGroup;
-
- private ConnectionLoadBalancingPolicy loadBalancingPolicy;
-
- private boolean readOnly;
-
- // Settable attributes:
-
- private boolean cacheLargeMessagesClient;
-
- private long clientFailureCheckPeriod;
-
- private long connectionTTL;
-
- private long callTimeout;
-
- private int minLargeMessageSize;
-
- private int consumerWindowSize;
-
- private int consumerMaxRate;
-
- private int confirmationWindowSize;
-
- private int producerWindowSize;
-
- private int producerMaxRate;
-
- private boolean blockOnAcknowledge;
-
- private boolean blockOnDurableSend;
-
- private boolean blockOnNonDurableSend;
-
- private boolean autoGroup;
-
- private boolean preAcknowledge;
-
- private String connectionLoadBalancingPolicyClassName;
-
- private int ackBatchSize;
-
- private boolean useGlobalPools;
-
- private int scheduledThreadPoolMaxSize;
-
- private int threadPoolMaxSize;
-
- private long retryInterval;
-
- private double retryIntervalMultiplier;
-
- private long maxRetryInterval;
-
- private int reconnectAttempts;
-
- private int initialConnectAttempts;
-
- private boolean failoverOnInitialConnection;
-
- private int initialMessagePacketSize;
-
- private volatile boolean closed;
-
- private volatile boolean closing;
-
- private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
-
- private static ExecutorService globalThreadPool;
-
- private static ScheduledExecutorService globalScheduledThreadPool;
-
- private String groupID;
-
- private String nodeID;
-
- private TransportConfiguration clusterTransportConfiguration;
-
- private boolean backup;
-
- private final Exception e = new Exception();
-
- // To be called when there are ServerLocator being finalized.
- // To be used on test assertions
- public static Runnable finalizeCallback = null;
-
- private static synchronized ExecutorService getGlobalThreadPool()
- {
- if (globalThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-threads", true, getThisClassLoader());
-
- globalThreadPool = Executors.newCachedThreadPool(factory);
- }
-
- return globalThreadPool;
- }
-
- public static synchronized ScheduledExecutorService getGlobalScheduledThreadPool()
- {
- if (globalScheduledThreadPool == null)
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
-
- globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
-
- factory);
- }
-
- return globalScheduledThreadPool;
- }
-
- private void setThreadPools()
- {
- if (useGlobalPools)
- {
- threadPool = getGlobalThreadPool();
-
- scheduledThreadPool = getGlobalScheduledThreadPool();
- }
- else
- {
- ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- if (threadPoolMaxSize == -1)
- {
- threadPool = Executors.newCachedThreadPool(factory);
- }
- else
- {
- threadPool = Executors.newFixedThreadPool(threadPoolMaxSize, factory);
- }
-
- factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
-
- scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
- }
- }
-
- private static ClassLoader getThisClassLoader()
- {
- return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
- }
-
- private void instantiateLoadBalancingPolicy()
- {
- if (connectionLoadBalancingPolicyClassName == null)
- {
- throw new IllegalStateException("Please specify a load balancing policy class name on the session factory");
- }
-
- AccessController.doPrivileged(new PrivilegedAction<Object>()
- {
- public Object run()
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
- return null;
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
- }
- }
- });
- }
-
- private synchronized void initialise() throws Exception
- {
- if (!readOnly)
- {
- setThreadPools();
-
- instantiateLoadBalancingPolicy();
-
- if (discoveryGroupConfiguration != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryGroupConfiguration.getGroupAddress());
-
- InetAddress lbAddress;
-
- if (discoveryGroupConfiguration.getLocalBindAddress() != null)
- {
- lbAddress = InetAddress.getByName(discoveryGroupConfiguration.getLocalBindAddress());
- }
- else
- {
- lbAddress = null;
- }
-
- discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryGroupConfiguration.getName(),
- lbAddress,
- groupAddress,
- discoveryGroupConfiguration.getGroupPort(),
- discoveryGroupConfiguration.getRefreshTimeout());
-
- discoveryGroup.registerListener(this);
-
- discoveryGroup.start();
- }
-
- readOnly = true;
- }
- }
-
- private ServerLocatorImpl(final boolean useHA,
- final DiscoveryGroupConfiguration discoveryGroupConfiguration,
- final TransportConfiguration[] transportConfigs)
- {
- e.fillInStackTrace();
- this.ha = useHA;
-
- this.discoveryGroupConfiguration = discoveryGroupConfiguration;
-
- this.initialConnectors = transportConfigs;
-
- this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
- clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-
- connectionTTL = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- callTimeout = HornetQClient.DEFAULT_CALL_TIMEOUT;
-
- minLargeMessageSize = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- consumerWindowSize = HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
-
- consumerMaxRate = HornetQClient.DEFAULT_CONSUMER_MAX_RATE;
-
- confirmationWindowSize = HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-
- producerWindowSize = HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE;
-
- producerMaxRate = HornetQClient.DEFAULT_PRODUCER_MAX_RATE;
-
- blockOnAcknowledge = HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-
- blockOnDurableSend = HornetQClient.DEFAULT_BLOCK_ON_DURABLE_SEND;
-
- blockOnNonDurableSend = HornetQClient.DEFAULT_BLOCK_ON_NON_DURABLE_SEND;
-
- autoGroup = HornetQClient.DEFAULT_AUTO_GROUP;
-
- preAcknowledge = HornetQClient.DEFAULT_PRE_ACKNOWLEDGE;
-
- ackBatchSize = HornetQClient.DEFAULT_ACK_BATCH_SIZE;
-
- connectionLoadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-
- useGlobalPools = HornetQClient.DEFAULT_USE_GLOBAL_POOLS;
-
- scheduledThreadPoolMaxSize = HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-
- threadPoolMaxSize = HornetQClient.DEFAULT_THREAD_POOL_MAX_SIZE;
-
- retryInterval = HornetQClient.DEFAULT_RETRY_INTERVAL;
-
- retryIntervalMultiplier = HornetQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
-
- maxRetryInterval = HornetQClient.DEFAULT_MAX_RETRY_INTERVAL;
-
- reconnectAttempts = HornetQClient.DEFAULT_RECONNECT_ATTEMPTS;
-
- initialConnectAttempts = HornetQClient.INITIAL_CONNECT_ATTEMPTS;
-
- failoverOnInitialConnection = HornetQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
-
- cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
- compressLargeMessage = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
-
- clusterConnection = false;
- }
-
- /**
- * Create a ServerLocatorImpl using UDP discovery to lookup cluster
- *
- * @param discoveryAddress
- * @param discoveryPort
- */
- public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
- {
- this(useHA, groupConfiguration, null);
- }
-
- /**
- * Create a ServerLocatorImpl using a static list of live servers
- *
- * @param transportConfigs
- */
- public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
- {
- this(useHA, null, transportConfigs);
- }
-
- private TransportConfiguration selectConnector()
- {
- if (receivedTopology)
- {
- int pos = loadBalancingPolicy.select(topologyArray.length);
-
- Pair<TransportConfiguration, TransportConfiguration> pair = topologyArray[pos];
-
- return pair.a;
- }
- else
- {
- // Get from initialconnectors
-
- int pos = loadBalancingPolicy.select(initialConnectors.length);
-
- return initialConnectors[pos];
- }
- }
-
- public void start(Executor executor) throws Exception
- {
- initialise();
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- connect();
- }
- catch (Exception e)
- {
- if (!closing)
- {
- log.warn("did not connect the cluster connection to other nodes", e);
- }
- }
- }
- });
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#disableFinalizeCheck()
- */
- public void disableFinalizeCheck()
- {
- finalizeCheck = false;
- }
-
- public ClientSessionFactory connect() throws Exception
- {
- ClientSessionFactoryInternal sf;
- // static list of initial connectors
- if (initialConnectors != null && discoveryGroup == null)
- {
- sf = (ClientSessionFactoryInternal)staticConnector.connect();
- }
- // wait for discovery group to get the list of initial connectors
- else
- {
- sf = (ClientSessionFactoryInternal)createSessionFactory();
- }
- addFactory(sf);
- return sf;
- }
-
- public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
- transportConfiguration,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
-
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
-
- addFactory(factory);
-
- return factory;
- }
-
- public ClientSessionFactory createSessionFactory() throws Exception
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- if (initialConnectors == null && discoveryGroup != null)
- {
- // Wait for an initial broadcast to give us at least one node in the cluster
- long timeout = clusterConnection ? 0 : discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
- boolean ok = discoveryGroup.waitForBroadcast(timeout);
-
- if (!ok)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from cluster");
- }
- }
-
- ClientSessionFactoryInternal factory = null;
-
- synchronized (this)
- {
- boolean retry;
- int attempts = 0;
- do
- {
- retry = false;
-
- TransportConfiguration tc = selectConnector();
-
- // try each factory in the list until we find one which works
-
- try
- {
- factory = new ClientSessionFactoryImpl(this,
- tc,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
- }
- catch (HornetQException e)
- {
- factory.close();
- factory = null;
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- attempts++;
-
- if (topologyArray != null && attempts == topologyArray.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- if (topologyArray == null && initialConnectors != null && attempts == initialConnectors.length)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
- }
- retry = true;
- }
- else
- {
- throw e;
- }
- }
- }
- while (retry);
-
- if (ha)
- {
- long toWait = 30000;
- long start = System.currentTimeMillis();
- while (!receivedTopology && toWait > 0)
- {
- // Now wait for the topology
-
- try
- {
- wait(toWait);
- }
- catch (InterruptedException ignore)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- if (toWait <= 0)
- {
- throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster topology");
- }
- }
-
- addFactory(factory);
-
- return factory;
- }
- }
-
- public synchronized boolean isHA()
- {
- return ha;
- }
-
- public synchronized boolean isCacheLargeMessagesClient()
- {
- return cacheLargeMessagesClient;
- }
-
- public synchronized void setCacheLargeMessagesClient(final boolean cached)
- {
- cacheLargeMessagesClient = cached;
- }
-
- public synchronized long getClientFailureCheckPeriod()
- {
- return clientFailureCheckPeriod;
- }
-
- public synchronized void setClientFailureCheckPeriod(final long clientFailureCheckPeriod)
- {
- checkWrite();
- this.clientFailureCheckPeriod = clientFailureCheckPeriod;
- }
-
- public synchronized long getConnectionTTL()
- {
- return connectionTTL;
- }
-
- public synchronized void setConnectionTTL(final long connectionTTL)
- {
- checkWrite();
- this.connectionTTL = connectionTTL;
- }
-
- public synchronized long getCallTimeout()
- {
- return callTimeout;
- }
-
- public synchronized void setCallTimeout(final long callTimeout)
- {
- checkWrite();
- this.callTimeout = callTimeout;
- }
-
- public synchronized int getMinLargeMessageSize()
- {
- return minLargeMessageSize;
- }
-
- public synchronized void setMinLargeMessageSize(final int minLargeMessageSize)
- {
- checkWrite();
- this.minLargeMessageSize = minLargeMessageSize;
- }
-
- public synchronized int getConsumerWindowSize()
- {
- return consumerWindowSize;
- }
-
- public synchronized void setConsumerWindowSize(final int consumerWindowSize)
- {
- checkWrite();
- this.consumerWindowSize = consumerWindowSize;
- }
-
- public synchronized int getConsumerMaxRate()
- {
- return consumerMaxRate;
- }
-
- public synchronized void setConsumerMaxRate(final int consumerMaxRate)
- {
- checkWrite();
- this.consumerMaxRate = consumerMaxRate;
- }
-
- public synchronized int getConfirmationWindowSize()
- {
- return confirmationWindowSize;
- }
-
- public synchronized void setConfirmationWindowSize(final int confirmationWindowSize)
- {
- checkWrite();
- this.confirmationWindowSize = confirmationWindowSize;
- }
-
- public synchronized int getProducerWindowSize()
- {
- return producerWindowSize;
- }
-
- public synchronized void setProducerWindowSize(final int producerWindowSize)
- {
- checkWrite();
- this.producerWindowSize = producerWindowSize;
- }
-
- public synchronized int getProducerMaxRate()
- {
- return producerMaxRate;
- }
-
- public synchronized void setProducerMaxRate(final int producerMaxRate)
- {
- checkWrite();
- this.producerMaxRate = producerMaxRate;
- }
-
- public synchronized boolean isBlockOnAcknowledge()
- {
- return blockOnAcknowledge;
- }
-
- public synchronized void setBlockOnAcknowledge(final boolean blockOnAcknowledge)
- {
- checkWrite();
- this.blockOnAcknowledge = blockOnAcknowledge;
- }
-
- public synchronized boolean isBlockOnDurableSend()
- {
- return blockOnDurableSend;
- }
-
- public synchronized void setBlockOnDurableSend(final boolean blockOnDurableSend)
- {
- checkWrite();
- this.blockOnDurableSend = blockOnDurableSend;
- }
-
- public synchronized boolean isBlockOnNonDurableSend()
- {
- return blockOnNonDurableSend;
- }
-
- public synchronized void setBlockOnNonDurableSend(final boolean blockOnNonDurableSend)
- {
- checkWrite();
- this.blockOnNonDurableSend = blockOnNonDurableSend;
- }
-
- public synchronized boolean isAutoGroup()
- {
- return autoGroup;
- }
-
- public synchronized void setAutoGroup(final boolean autoGroup)
- {
- checkWrite();
- this.autoGroup = autoGroup;
- }
-
- public synchronized boolean isPreAcknowledge()
- {
- return preAcknowledge;
- }
-
- public synchronized void setPreAcknowledge(final boolean preAcknowledge)
- {
- checkWrite();
- this.preAcknowledge = preAcknowledge;
- }
-
- public synchronized int getAckBatchSize()
- {
- return ackBatchSize;
- }
-
- public synchronized void setAckBatchSize(final int ackBatchSize)
- {
- checkWrite();
- this.ackBatchSize = ackBatchSize;
- }
-
- public synchronized boolean isUseGlobalPools()
- {
- return useGlobalPools;
- }
-
- public synchronized void setUseGlobalPools(final boolean useGlobalPools)
- {
- checkWrite();
- this.useGlobalPools = useGlobalPools;
- }
-
- public synchronized int getScheduledThreadPoolMaxSize()
- {
- return scheduledThreadPoolMaxSize;
- }
-
- public synchronized void setScheduledThreadPoolMaxSize(final int scheduledThreadPoolMaxSize)
- {
- checkWrite();
- this.scheduledThreadPoolMaxSize = scheduledThreadPoolMaxSize;
- }
-
- public synchronized int getThreadPoolMaxSize()
- {
- return threadPoolMaxSize;
- }
-
- public synchronized void setThreadPoolMaxSize(final int threadPoolMaxSize)
- {
- checkWrite();
- this.threadPoolMaxSize = threadPoolMaxSize;
- }
-
- public synchronized long getRetryInterval()
- {
- return retryInterval;
- }
-
- public synchronized void setRetryInterval(final long retryInterval)
- {
- checkWrite();
- this.retryInterval = retryInterval;
- }
-
- public synchronized long getMaxRetryInterval()
- {
- return maxRetryInterval;
- }
-
- public synchronized void setMaxRetryInterval(final long retryInterval)
- {
- checkWrite();
- maxRetryInterval = retryInterval;
- }
-
- public synchronized double getRetryIntervalMultiplier()
- {
- return retryIntervalMultiplier;
- }
-
- public synchronized void setRetryIntervalMultiplier(final double retryIntervalMultiplier)
- {
- checkWrite();
- this.retryIntervalMultiplier = retryIntervalMultiplier;
- }
-
- public synchronized int getReconnectAttempts()
- {
- return reconnectAttempts;
- }
-
- public synchronized void setReconnectAttempts(final int reconnectAttempts)
- {
- checkWrite();
- this.reconnectAttempts = reconnectAttempts;
- }
-
- public void setInitialConnectAttempts(int initialConnectAttempts)
- {
- checkWrite();
- this.initialConnectAttempts = initialConnectAttempts;
- }
-
- public int getInitialConnectAttempts()
- {
- return initialConnectAttempts;
- }
-
- public synchronized boolean isFailoverOnInitialConnection()
- {
- return this.failoverOnInitialConnection;
- }
-
- public synchronized void setFailoverOnInitialConnection(final boolean failover)
- {
- checkWrite();
- this.failoverOnInitialConnection = failover;
- }
-
- public synchronized String getConnectionLoadBalancingPolicyClassName()
- {
- return connectionLoadBalancingPolicyClassName;
- }
-
- public synchronized void setConnectionLoadBalancingPolicyClassName(final String loadBalancingPolicyClassName)
- {
- checkWrite();
- connectionLoadBalancingPolicyClassName = loadBalancingPolicyClassName;
- }
-
- public TransportConfiguration[] getStaticTransportConfigurations()
- {
- return this.initialConnectors;
- }
-
- public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()
- {
- return discoveryGroupConfiguration;
- }
-
- public void addInterceptor(final Interceptor interceptor)
- {
- interceptors.add(interceptor);
- }
-
- public boolean removeInterceptor(final Interceptor interceptor)
- {
- return interceptors.remove(interceptor);
- }
-
- public synchronized int getInitialMessagePacketSize()
- {
- return initialMessagePacketSize;
- }
-
- public synchronized void setInitialMessagePacketSize(final int size)
- {
- checkWrite();
- initialMessagePacketSize = size;
- }
-
- public void setGroupID(final String groupID)
- {
- checkWrite();
- this.groupID = groupID;
- }
-
- public String getGroupID()
- {
- return groupID;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#isCompressLargeMessage()
- */
- public boolean isCompressLargeMessage()
- {
- return compressLargeMessage;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.api.core.client.ServerLocator#setCompressLargeMessage(boolean)
- */
- public void setCompressLargeMessage(boolean compress)
- {
- this.compressLargeMessage = compress;
- }
-
- private void checkWrite()
- {
- if (readOnly)
- {
- throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
- }
- }
-
- public void setNodeID(String nodeID)
- {
- this.nodeID = nodeID;
- }
-
- public String getNodeID()
- {
- return nodeID;
- }
-
- public void setClusterConnection(boolean clusterConnection)
- {
- this.clusterConnection = clusterConnection;
- }
-
- public boolean isClusterConnection()
- {
- return clusterConnection;
- }
-
- public TransportConfiguration getClusterTransportConfiguration()
- {
- return clusterTransportConfiguration;
- }
-
- public void setClusterTransportConfiguration(TransportConfiguration tc)
- {
- this.clusterTransportConfiguration = tc;
- }
-
- public boolean isBackup()
- {
- return backup;
- }
-
- public void setBackup(boolean backup)
- {
- this.backup = backup;
- }
-
- @Override
- protected void finalize() throws Throwable
- {
- if (finalizeCheck)
- {
- close();
- }
-
- super.finalize();
- }
-
- public void close()
- {
- if (closed)
- {
- return;
- }
-
- closing = true;
-
- if (discoveryGroup != null)
- {
- try
- {
- discoveryGroup.stop();
- }
- catch (Exception e)
- {
- log.error("Failed to stop discovery group", e);
- }
- }
- else
- {
- staticConnector.disconnect();
- }
-
- for (ClientSessionFactory factory : factories)
- {
- factory.close();
- }
-
- factories.clear();
-
- if (!useGlobalPools)
- {
- if (threadPool != null)
- {
- threadPool.shutdown();
-
- try
- {
- if (!threadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
-
- if (scheduledThreadPool != null)
- {
- scheduledThreadPool.shutdown();
-
- try
- {
- if (!scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS))
- {
- log.warn("Timed out waiting for scheduled pool to terminate");
- }
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
- readOnly = false;
-
- closed = true;
- }
-
- public synchronized void notifyNodeDown(final String nodeID)
- {
- boolean removed = false;
-
- if (!ha)
- {
- return;
- }
-
- removed = topology.removeMember(nodeID);
-
- if (!topology.isEmpty())
- {
- updateArraysAndPairs();
-
- if (topology.nodes() == 1 && topology.getMember(this.nodeID) != null)
- {
- receivedTopology = false;
- }
- }
- else
- {
- topologyArray = null;
-
- receivedTopology = false;
- }
-
- if (removed)
- {
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
- }
-
- public synchronized void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
- {
- if (!ha)
- {
- return;
- }
-
- topology.addMember(nodeID, new TopologyMember(connectorPair));
-
- TopologyMember actMember = topology.getMember(nodeID);
-
- if (actMember.getConnector().a != null && actMember.getConnector().b != null)
- {
- for (ClientSessionFactory factory : factories)
- {
- ((ClientSessionFactoryInternal)factory).setBackupConnector(actMember.getConnector().a,
- actMember.getConnector().b);
- }
- }
-
- if (connectorPair.a != null)
- {
- updateArraysAndPairs();
- }
-
- if (last)
- {
- receivedTopology = true;
- }
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, connectorPair, last);
- }
-
- // Notify if waiting on getting topology
- notify();
- }
-
- private void updateArraysAndPairs()
- {
- topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
- topology.members());
-
- int count = 0;
- for (TopologyMember pair : topology.getMembers())
- {
- topologyArray[count++] = pair.getConnector();
- }
- }
-
- public synchronized void connectorsChanged()
- {
- List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
-
- this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
- newConnectors.size());
-
- int count = 0;
- for (DiscoveryEntry entry : newConnectors)
- {
- this.initialConnectors[count++] = entry.getConnector();
- }
-
- if (ha && clusterConnection && !receivedTopology && initialConnectors.length > 0)
- {
- // FIXME the node is alone in the cluster. We create a connection to the new node
- // to trigger the node notification to form the cluster.
- try
- {
- connect();
- }
- catch (Exception e)
- {
- e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- public synchronized void factoryClosed(final ClientSessionFactory factory)
- {
- factories.remove(factory);
-
- if (factories.isEmpty())
- {
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topology = null;
-
- }
- }
-
- public Topology getTopology()
- {
- return topology;
- }
-
- public void addClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topologyListeners.add(listener);
- if(topology.members() > 0)
- {
- System.out.println("ServerLocatorImpl.addClusterTopologyListener");
- }
- }
-
- public void removeClusterTopologyListener(final ClusterTopologyListener listener)
- {
- topologyListeners.remove(listener);
- }
-
- public synchronized void addFactory(ClientSessionFactoryInternal factory)
- {
- if (factory != null)
- {
- TransportConfiguration backup = topology.getBackupForConnector(factory.getConnectorConfiguration());
- factory.setBackupConnector(factory.getConnectorConfiguration(), backup);
- factories.add(factory);
- }
- }
-
- public static void shutdown()
- {
- if (globalScheduledThreadPool != null)
- {
- globalScheduledThreadPool.shutdown();
- globalScheduledThreadPool = null;
- }
- if (globalThreadPool != null)
- {
- globalThreadPool.shutdown();
- globalThreadPool = null;
- }
- }
-
- class StaticConnector implements Serializable
- {
- private List<Connector> connectors;
-
- public ClientSessionFactory connect() throws HornetQException
- {
- if (closed)
- {
- throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
- }
-
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
-
- ClientSessionFactory csf = null;
-
- createConnectors();
-
- try
- {
- List<Future<ClientSessionFactory>> futures = threadPool.invokeAll(connectors);
- for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
- {
- Future<ClientSessionFactory> future = futures.get(i);
- try
- {
- csf = future.get();
- if (csf != null)
- break;
- }
- catch (Exception e)
- {
- log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
- }
- }
- if (csf == null && !closed)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
- }
- }
- catch (InterruptedException e)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
- }
-
- if (csf == null && !closed)
- {
- throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
- }
- return csf;
- }
-
- private synchronized void createConnectors()
- {
- connectors = new ArrayList<Connector>();
- for (TransportConfiguration initialConnector : initialConnectors)
- {
- ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- connectors.add(new Connector(initialConnector, factory));
- }
- }
-
- public synchronized void disconnect()
- {
- if (connectors != null)
- {
- for (Connector connector : connectors)
- {
- connector.disconnect();
- }
- }
- }
-
- public void finalize() throws Throwable
- {
- if (!closed && finalizeCheck)
- {
- log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
- System.identityHashCode(this));
-
- log.warn("The ServerLocator you didn't close was created here:", e);
-
- if (ServerLocatorImpl.finalizeCallback != null)
- {
- ServerLocatorImpl.finalizeCallback.run();
- }
-
- close();
- }
-
- super.finalize();
- }
-
- class Connector implements Callable<ClientSessionFactory>
- {
- private TransportConfiguration initialConnector;
-
- private volatile ClientSessionFactoryInternal factory;
-
- private boolean isConnected = false;
-
- private boolean interrupted = false;
-
- private Exception e;
-
- public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
- {
- this.initialConnector = initialConnector;
- this.factory = factory;
- }
-
- public ClientSessionFactory call() throws HornetQException
- {
- try
- {
- factory.connect(reconnectAttempts, failoverOnInitialConnection);
- }
- catch (HornetQException e)
- {
- if (!interrupted)
- {
- this.e = e;
- throw e;
- }
- /*if(factory != null)
- {
- factory.close();
- factory = null;
- }*/
- return null;
- }
- isConnected = true;
- for (Connector connector : connectors)
- {
- if (!connector.isConnected())
- {
- connector.disconnect();
- }
- }
- return factory;
- }
-
- public boolean isConnected()
- {
- return isConnected;
- }
-
- public void disconnect()
- {
- interrupted = true;
-
- if (factory != null)
- {
- factory.causeExit();
- factory.close();
- factory = null;
- }
- }
- }
- }
-}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -57,4 +57,6 @@
void setBackup(boolean backup);
Topology getTopology();
+
+ boolean isStaticDirectConnection(TransportConfiguration con);
}
Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public class SimpleUDPServerLocatorImpl extends AbstractServerLocator implements DiscoveryListener
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(SimpleUDPServerLocatorImpl.class);
+
+ private String discoveryGroupName;
+
+ private InetAddress localBindAddress;
+
+ private InetAddress groupAddress;
+
+ private int groupPort;
+
+ private long refreshTimeout;
+
+ private long initialWaitTimeout;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private volatile boolean closing;
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ private synchronized void initialise() throws Exception
+ {
+ if (!isReadOnly())
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+ String lbStr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
+
+ if (lbStr != null)
+ {
+ this.localBindAddress = InetAddress.getByName(lbStr);
+ }
+ else
+ {
+ this.localBindAddress = null;
+ }
+
+ this.groupAddress = InetAddress.getByName(ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params));
+ this.groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
+ this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+ this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+
+ discoveryGroup = new DiscoveryGroupImpl(getNodeID(),
+ this.discoveryGroupName,
+ this.localBindAddress,
+ this.groupAddress,
+ this.groupPort,
+ this.refreshTimeout);
+
+ discoveryGroup.registerListener(this);
+
+ discoveryGroup.start();
+
+ setReadOnly(true);
+ }
+ }
+
+ public SimpleUDPServerLocatorImpl(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ super(useHA, discoveryGroupConfiguration);
+ }
+
+ public void start(Executor executor) throws Exception
+ {
+ initialise();
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ public ClientSessionFactory connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+
+ // wait for discovery group to get the list of initial connectors
+ sf = (ClientSessionFactoryInternal)createSessionFactory();
+
+ addFactory(sf);
+ return sf;
+ }
+
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+ transportConfiguration,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+
+ factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+ addFactory(factory);
+
+ return factory;
+ }
+
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ if (getInitialConnectors() == null)
+ {
+ // Wait for an initial broadcast to give us at least one node in the cluster
+ long timeout = isClusterConnection() ? 0 : this.initialWaitTimeout;
+ boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial broadcast from cluster");
+ }
+ }
+
+ ClientSessionFactoryInternal factory = null;
+
+ synchronized (this)
+ {
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ TransportConfiguration tc = selectConnector();
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ factory = new ClientSessionFactoryImpl(this,
+ tc,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+ factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+ }
+ catch (HornetQException e)
+ {
+ factory.close();
+ factory = null;
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (attempts == getConnectorLength())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (isHA())
+ {
+ long toWait = 30000;
+ long start = System.currentTimeMillis();
+ while (!isReceivedTopology() && toWait > 0)
+ {
+ // Now wait for the topology
+
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster topology");
+ }
+ }
+
+ addFactory(factory);
+
+ return factory;
+ }
+ }
+
+ public void close()
+ {
+ if (isClosed())
+ {
+ return;
+ }
+
+ closing = true;
+
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
+
+ super.close();
+ }
+
+ public synchronized void connectorsChanged()
+ {
+ List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+ TransportConfiguration[] initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ newConnectors.size());
+ int count = 0;
+ for (DiscoveryEntry entry : newConnectors)
+ {
+ initialConnectors[count++] = entry.getConnector();
+ }
+
+ if (isHA() && isClusterConnection() && !isReceivedTopology() && initialConnectors.length > 0)
+ {
+ // FIXME the node is alone in the cluster. We create a connection to the new node
+ // to trigger the node notification to form the cluster.
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ setInitialConnectors(initialConnectors);
+ }
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -0,0 +1,450 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.client.impl;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A SimpleUDPServerLocatorImpl
+ *
+ * @author Tim Fox
+ * @author <a href="tm.igarashi(a)gmail.com">Tomohisa Igarashi</a>
+ */
+public class StaticServerLocatorImpl extends AbstractServerLocator
+{
+ private static final long serialVersionUID = -1615857864410205260L;
+
+ private static final Logger log = Logger.getLogger(StaticServerLocatorImpl.class);
+
+ private StaticConnector staticConnector = new StaticConnector();
+
+ private volatile boolean closing;
+
+ private final Exception e = new Exception();
+
+ // To be called when there are ServerLocator being finalized.
+ // To be used on test assertions
+ public static Runnable finalizeCallback = null;
+
+ private synchronized void initialise() throws Exception
+ {
+ if (!isReadOnly())
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+ TransportConfiguration[] initialConnectors = (TransportConfiguration[])params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME);
+ setInitialConnectors(initialConnectors);
+
+ setReadOnly(true);
+ }
+ }
+
+ public StaticServerLocatorImpl(final boolean useHA,
+ final DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ super(useHA, discoveryGroupConfiguration);
+
+ e.fillInStackTrace();
+ }
+
+ public void start(Executor executor) throws Exception
+ {
+ initialise();
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ public ClientSessionFactory connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+
+ sf = (ClientSessionFactoryInternal)staticConnector.connect();
+
+ addFactory(sf);
+ return sf;
+ }
+
+ public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+ transportConfiguration,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+
+ factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+ addFactory(factory);
+
+ return factory;
+ }
+
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = null;
+
+ synchronized (this)
+ {
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ TransportConfiguration tc = selectConnector();
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ factory = new ClientSessionFactoryImpl(this,
+ tc,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+ factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+ }
+ catch (HornetQException e)
+ {
+ factory.close();
+ factory = null;
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (attempts == getConnectorLength())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (isHA())
+ {
+ long toWait = 30000;
+ long start = System.currentTimeMillis();
+ while (!isReceivedTopology() && toWait > 0)
+ {
+ // Now wait for the topology
+
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster topology");
+ }
+ }
+
+ addFactory(factory);
+
+ return factory;
+ }
+ }
+
+ public void close()
+ {
+ if (isClosed())
+ {
+ return;
+ }
+
+ closing = true;
+
+ staticConnector.disconnect();
+
+ super.close();
+ }
+
+ public boolean isStaticDirectConnection(TransportConfiguration con)
+ {
+ for(TransportConfiguration connector : getInitialConnectors())
+ {
+ if(connector.equals(con))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ class StaticConnector implements Serializable
+ {
+ private List<Connector> connectors;
+
+ public ClientSessionFactory connect() throws HornetQException
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactory csf = null;
+
+ createConnectors();
+
+ try
+ {
+ List<Future<ClientSessionFactory>> futures = getThreadPool().invokeAll(connectors);
+ for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
+ {
+ Future<ClientSessionFactory> future = futures.get(i);
+ try
+ {
+ csf = future.get();
+ if (csf != null)
+ break;
+ }
+ catch (Exception e)
+ {
+ log.debug("unable to connect with static connector " + connectors.get(i).initialConnector);
+ }
+ }
+ if (csf == null && !isClosed())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors", e);
+ }
+
+ if (csf == null && !isClosed())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to connect to any static connectors");
+ }
+ return csf;
+ }
+
+ private synchronized void createConnectors()
+ {
+ connectors = new ArrayList<Connector>();
+ for (TransportConfiguration initialConnector : getInitialConnectors())
+ {
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(StaticServerLocatorImpl.this,
+ initialConnector,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+ connectors.add(new Connector(initialConnector, factory));
+ }
+ }
+
+ public synchronized void disconnect()
+ {
+ if (connectors != null)
+ {
+ for (Connector connector : connectors)
+ {
+ connector.disconnect();
+ }
+ }
+ }
+
+ public void finalize() throws Throwable
+ {
+ if (!isClosed() && doFinalizeCheck())
+ {
+ log.warn("I'm closing a core ServerLocator you left open. Please make sure you close all ServerLocators explicitly " + "before letting them go out of scope! " +
+ System.identityHashCode(this));
+
+ log.warn("The ServerLocator you didn't close was created here:", e);
+
+ if (StaticServerLocatorImpl.finalizeCallback != null)
+ {
+ StaticServerLocatorImpl.finalizeCallback.run();
+ }
+
+ close();
+ }
+
+ super.finalize();
+ }
+
+ class Connector implements Callable<ClientSessionFactory>
+ {
+ private TransportConfiguration initialConnector;
+
+ private volatile ClientSessionFactoryInternal factory;
+
+ private boolean isConnected = false;
+
+ private boolean interrupted = false;
+
+ private Exception e;
+
+ public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
+ {
+ this.initialConnector = initialConnector;
+ this.factory = factory;
+ }
+
+ public ClientSessionFactory call() throws HornetQException
+ {
+ try
+ {
+ factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+ }
+ catch (HornetQException e)
+ {
+ if (!interrupted)
+ {
+ this.e = e;
+ throw e;
+ }
+ /*if(factory != null)
+ {
+ factory.close();
+ factory = null;
+ }*/
+ return null;
+ }
+ isConnected = true;
+ for (Connector connector : connectors)
+ {
+ if (!connector.isConnected())
+ {
+ connector.disconnect();
+ }
+ }
+ return factory;
+ }
+
+ public boolean isConnected()
+ {
+ return isConnected;
+ }
+
+ public void disconnect()
+ {
+ interrupted = true;
+
+ if (factory != null)
+ {
+ factory.causeExit();
+ factory.close();
+ factory = null;
+ }
+ }
+ }
+ }
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/StaticServerLocatorImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/config/ClusterConnectionConfiguration.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -42,6 +42,8 @@
private final boolean forwardWhenNoConsumers;
private final List<String> staticConnectors;
+
+ private final List<String> allowableConnectors;
private final String discoveryGroupName;
@@ -49,7 +51,7 @@
private final int confirmationWindowSize;
- private final boolean allowDirectConnectionsOnly;
+ private final boolean allowableConnectionsOnly;
public ClusterConnectionConfiguration(final String name,
final String address,
@@ -60,42 +62,22 @@
final int maxHops,
final int confirmationWindowSize,
final List<String> staticConnectors,
- final boolean allowDirectConnectionsOnly)
+ final String discoveryGroupName,
+ final boolean allowableConnectionsOnly,
+ final List<String> allowableConnectorNames)
{
this.name = name;
this.address = address;
this.connectorName = connectorName;
this.retryInterval = retryInterval;
- this.staticConnectors = staticConnectors;
this.duplicateDetection = duplicateDetection;
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
- discoveryGroupName = null;
- this.maxHops = maxHops;
- this.confirmationWindowSize = confirmationWindowSize;
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
- }
-
- public ClusterConnectionConfiguration(final String name,
- final String address,
- final String connectorName,
- final long retryInterval,
- final boolean duplicateDetection,
- final boolean forwardWhenNoConsumers,
- final int maxHops,
- final int confirmationWindowSize,
- final String discoveryGroupName)
- {
- this.name = name;
- this.address = address;
- this.connectorName = connectorName;
- this.retryInterval = retryInterval;
- this.duplicateDetection = duplicateDetection;
- this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = discoveryGroupName;
- this.staticConnectors = null;
this.maxHops = maxHops;
this.confirmationWindowSize = confirmationWindowSize;
- allowDirectConnectionsOnly = false;
+ this.staticConnectors = staticConnectors;
+ this.allowableConnectors = allowableConnectorNames;
+ this.allowableConnectionsOnly = allowableConnectionsOnly;
}
public String getName()
@@ -137,6 +119,11 @@
{
return staticConnectors;
}
+
+ public List<String> getAllowableConnectors()
+ {
+ return allowableConnectors;
+ }
public String getDiscoveryGroupName()
{
@@ -148,8 +135,8 @@
return retryInterval;
}
- public boolean isAllowDirectConnectionsOnly()
+ public boolean isAllowableConnectionsOnly()
{
- return allowDirectConnectionsOnly;
+ return allowableConnectionsOnly;
}
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -22,12 +22,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.StringTokenizer;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
import org.hornetq.core.config.*;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.config.impl.FileConfiguration;
@@ -939,29 +941,42 @@
{
String name = e.getAttribute("name");
- String localBindAddress = XMLConfigurationUtil.getString(e, "local-bind-address", null, Validators.NO_CHECK);
+ String clazz = XMLConfigurationUtil.getString(e, "server-locator-class", null, Validators.NOT_NULL_OR_EMPTY);
- String groupAddress = XMLConfigurationUtil.getString(e, "group-address", null, Validators.NOT_NULL_OR_EMPTY);
+ Map<String, Object> params = new HashMap<String, Object>();
- int groupPort = XMLConfigurationUtil.getInteger(e, "group-port", -1, Validators.MINUS_ONE_OR_GT_ZERO);
+ NodeList paramsNodes = e.getElementsByTagName("param");
- long discoveryInitialWaitTimeout = XMLConfigurationUtil.getLong(e,
- "initial-wait-timeout",
- HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT,
- Validators.GT_ZERO);
+ for (int i = 0; i < paramsNodes.getLength(); i++)
+ {
+ Node paramNode = paramsNodes.item(i);
- long refreshTimeout = XMLConfigurationUtil.getLong(e,
- "refresh-timeout",
- ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
- Validators.GT_ZERO);
+ NamedNodeMap attributes = paramNode.getAttributes();
- DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(name,
- localBindAddress,
- groupAddress,
- groupPort,
- refreshTimeout,
- discoveryInitialWaitTimeout);
+ Node nkey = attributes.getNamedItem("key");
+ String key = nkey.getTextContent();
+
+ Node nValue = attributes.getNamedItem("value");
+
+ params.put(key, nValue.getTextContent());
+ }
+
+ // discovery-group configuration contains static connector list
+ String connectorList = (String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
+ if(connectorList != null)
+ {
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ StringTokenizer token = new StringTokenizer(connectorList, ",", false);
+ while(token.hasMoreElements())
+ {
+ connectors.add(mainConfig.getConnectorConfigurations().get(token.nextElement()));
+ }
+ params.put(DiscoveryGroupConstants.STATIC_CONNECTORS_LIST_NAME, connectors.toArray(new TransportConfiguration[0]));
+ }
+
+ DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration(clazz, params, name);
+
if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name))
{
FileConfigurationParser.log.warn("There is already a discovery group with name " + name +
@@ -1008,8 +1023,6 @@
String discoveryGroupName = null;
- List<String> staticConnectorNames = new ArrayList<String>();
-
boolean allowDirectConnectionsOnly = false;
NodeList children = e.getChildNodes();
@@ -1021,46 +1034,58 @@
if (child.getNodeName().equals("discovery-group-ref"))
{
discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
- }
- else if (child.getNodeName().equals("static-connectors"))
- {
+
Node attr = child.getAttributes().getNamedItem("allow-direct-connections-only");
if (attr != null)
{
allowDirectConnectionsOnly = "true".equalsIgnoreCase(attr.getNodeValue()) || allowDirectConnectionsOnly;
}
- getStaticConnectors(staticConnectorNames, child);
}
}
- ClusterConnectionConfiguration config;
-
- if (discoveryGroupName == null)
+ List<String> staticConnectors = new ArrayList<String>();
+ DiscoveryGroupConfiguration discovery = mainConfig.getDiscoveryGroupConfigurations().get(discoveryGroupName);
+ Map<String,Object> params = discovery.getParams();
+ String connectorList = (String)params.get(DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME);
+ if(connectorList != null)
{
- config = new ClusterConnectionConfiguration(name,
- address,
- connectorName,
- retryInterval,
- duplicateDetection,
- forwardWhenNoConsumers,
- maxHops,
- confirmationWindowSize,
- staticConnectorNames,
- allowDirectConnectionsOnly);
+ StringTokenizer token = new StringTokenizer(connectorList, ",", false);
+ while(token.hasMoreElements())
+ {
+ staticConnectors.add(token.nextToken());
+ }
}
- else
+
+ List<String> allowableConnectionNames = null;
+ if(allowDirectConnectionsOnly)
{
- config = new ClusterConnectionConfiguration(name,
- address,
- connectorName,
- retryInterval,
- duplicateDetection,
- forwardWhenNoConsumers,
- maxHops,
- confirmationWindowSize,
- discoveryGroupName);
+ if(connectorList == null)
+ {
+ log.warn("allow-direct-connections-only was found, but "
+ + DiscoveryGroupConstants.STATIC_CONNECTORS_CONNECTOR_REF_LIST_NAME
+ + " was not found in discovery-group. ignore.");
+ }
+ else
+ {
+ allowableConnectionNames = staticConnectors;
+ }
}
+
+ ClusterConnectionConfiguration config;
+ config = new ClusterConnectionConfiguration(name,
+ address,
+ connectorName,
+ retryInterval,
+ duplicateDetection,
+ forwardWhenNoConsumers,
+ maxHops,
+ confirmationWindowSize,
+ staticConnectors,
+ discoveryGroupName,
+ allowDirectConnectionsOnly,
+ allowableConnectionNames);
+
mainConfig.getClusterConfigurations().add(config);
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -100,90 +100,16 @@
private final String clusterPassword;
- private final ClusterConnector clusterConnector;
+ private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
private ServerLocatorInternal serverLocator;
private final TransportConfiguration connector;
- private final boolean allowDirectConnectionsOnly;
+ private final boolean allowableConnectionsOnly;
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
- public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
- final TransportConfiguration connector,
- final SimpleString name,
- final SimpleString address,
- final long retryInterval,
- final boolean useDuplicateDetection,
- final boolean routeWhenNoConsumers,
- final int confirmationWindowSize,
- final ExecutorFactory executorFactory,
- final HornetQServer server,
- final PostOffice postOffice,
- final ManagementService managementService,
- final ScheduledExecutorService scheduledExecutor,
- final int maxHops,
- final UUID nodeUUID,
- final boolean backup,
- final String clusterUser,
- final String clusterPassword,
- final boolean allowDirectConnectionsOnly) throws Exception
- {
-
- if (nodeUUID == null)
- {
- throw new IllegalArgumentException("node id is null");
- }
-
- this.nodeUUID = nodeUUID;
-
- this.connector = connector;
-
- this.name = name;
-
- this.address = address;
-
- this.retryInterval = retryInterval;
-
- this.useDuplicateDetection = useDuplicateDetection;
-
- this.routeWhenNoConsumers = routeWhenNoConsumers;
-
- this.executorFactory = executorFactory;
-
- this.server = server;
-
- this.postOffice = postOffice;
-
- this.managementService = managementService;
-
- this.scheduledExecutor = scheduledExecutor;
-
- this.maxHops = maxHops;
-
- this.backup = backup;
-
- this.clusterUser = clusterUser;
-
- this.clusterPassword = clusterPassword;
-
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-
- clusterConnector = new StaticClusterConnector(tcConfigs);
-
- if (tcConfigs != null && tcConfigs.length > 0)
- {
- // a cluster connection will connect to other nodes only if they are directly connected
- // through a static list of connectors or broadcasting using UDP.
- if(allowDirectConnectionsOnly)
- {
- allowableConnections.addAll(Arrays.asList(tcConfigs));
- }
- }
-
- }
-
public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
@@ -202,7 +128,8 @@
final boolean backup,
final String clusterUser,
final String clusterPassword,
- final boolean allowDirectConnectionsOnly) throws Exception
+ final boolean allowableConnectionsOnly,
+ final TransportConfiguration[] allowableConnections) throws Exception
{
if (nodeUUID == null)
@@ -242,9 +169,9 @@
this.clusterPassword = clusterPassword;
- this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+ this.allowableConnectionsOnly = allowableConnectionsOnly;
- clusterConnector = new DiscoveryClusterConnector(dg);
+ this.discoveryGroupConfiguration = dg;
}
public synchronized void start() throws Exception
@@ -346,7 +273,7 @@
backup = false;
- serverLocator = clusterConnector.createServerLocator();
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(this.discoveryGroupConfiguration);
if (serverLocator != null)
@@ -433,7 +360,7 @@
server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+ if (allowableConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
return;
}
@@ -992,46 +919,4 @@
return out;
}
-
- interface ClusterConnector
- {
- ServerLocatorInternal createServerLocator();
- }
-
- private class StaticClusterConnector implements ClusterConnector
- {
- private final TransportConfiguration[] tcConfigs;
-
- public StaticClusterConnector(TransportConfiguration[] tcConfigs)
- {
- this.tcConfigs = tcConfigs;
- }
-
- public ServerLocatorInternal createServerLocator()
- {
- if(tcConfigs != null && tcConfigs.length > 0)
- {
- return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
- }
- else
- {
- return null;
- }
- }
- }
-
- private class DiscoveryClusterConnector implements ClusterConnector
- {
- private final DiscoveryGroupConfiguration dg;
-
- public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg)
- {
- this.dg = dg;
- }
-
- public ServerLocatorInternal createServerLocator()
- {
- return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
- }
- }
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -30,7 +30,7 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
@@ -621,46 +621,23 @@
ServerLocatorInternal serverLocator;
- if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
- if (discoveryGroupConfiguration == null)
- {
- ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
- "'. The bridge will not be deployed.");
+ if (discoveryGroupConfiguration == null)
+ {
+ ClusterManagerImpl.log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
+ "'. The bridge will not be deployed.");
- return;
- }
+ return;
+ }
- if (config.isHA())
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
- }
- else
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
- }
-
+ if (config.isHA())
+ {
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration);
}
else
{
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
- if (tcConfigs == null)
- {
- return;
- }
-
- if (config.isHA())
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
- }
- else
- {
- serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- }
-
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration);
}
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
@@ -739,61 +716,41 @@
ClusterConnectionImpl clusterConnection;
- if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
- .get(config.getDiscoveryGroupName());
+ DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
+ .get(config.getDiscoveryGroupName());
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+ if (dg == null)
+ {
+ ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
"'. The cluster connection will not be deployed.");
- }
-
- clusterConnection = new ClusterConnectionImpl(dg,
- connector,
- new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getRetryInterval(),
- config.isDuplicateDetection(),
- config.isForwardWhenNoConsumers(),
- config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- config.getMaxHops(),
- nodeUUID,
- backup,
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- config.isAllowDirectConnectionsOnly());
}
- else
- {
- TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null? connectorNameListToArray(config.getStaticConnectors()):null;
- clusterConnection = new ClusterConnectionImpl(tcConfigs,
- connector,
- new SimpleString(config.getName()),
- new SimpleString(config.getAddress()),
- config.getRetryInterval(),
- config.isDuplicateDetection(),
- config.isForwardWhenNoConsumers(),
- config.getConfirmationWindowSize(),
- executorFactory,
- server,
- postOffice,
- managementService,
- scheduledExecutor,
- config.getMaxHops(),
- nodeUUID,
- backup,
- server.getConfiguration().getClusterUser(),
- server.getConfiguration().getClusterPassword(),
- config.isAllowDirectConnectionsOnly());
+ List<String> connectorNames = config.getAllowableConnectors();
+ TransportConfiguration[] allowableConnections = null;
+ if(connectorNames != null)
+ {
+ allowableConnections = connectorNameListToArray(connectorNames);
}
+ clusterConnection = new ClusterConnectionImpl(dg,
+ connector,
+ new SimpleString(config.getName()),
+ new SimpleString(config.getAddress()),
+ config.getRetryInterval(),
+ config.isDuplicateDetection(),
+ config.isForwardWhenNoConsumers(),
+ config.getConfirmationWindowSize(),
+ executorFactory,
+ server,
+ postOffice,
+ managementService,
+ scheduledExecutor,
+ config.getMaxHops(),
+ nodeUUID,
+ backup,
+ server.getConfiguration().getClusterUser(),
+ server.getConfiguration().getClusterPassword(),
+ config.isAllowableConnectionsOnly(),
+ allowableConnections);
managementService.registerCluster(clusterConnection, config);
@@ -809,31 +766,18 @@
private void announceBackup(final ClusterConnectionConfiguration config, final TransportConfiguration connector) throws Exception
{
- if (config.getStaticConnectors() != null)
- {
- TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- backupServerLocator.setReconnectAttempts(-1);
- }
- else if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
+ DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
- "'. The cluster connection will not be deployed.");
- }
-
- backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
- backupServerLocator.setReconnectAttempts(-1);
- }
- else
+ if (dg == null)
{
- return;
+ ClusterManagerImpl.log.warn("No discovery group with name '" + config.getDiscoveryGroupName() +
+ "'. The cluster connection will not be deployed.");
}
+
+ backupServerLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
+ backupServerLocator.setReconnectAttempts(-1);
+
log.info("announcing backup");
this.executorFactory.getExecutor().execute(new Runnable()
{
Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -37,6 +37,8 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
+import org.hornetq.core.client.impl.DiscoveryGroupConstants;
+import org.hornetq.core.client.impl.SimpleUDPServerLocatorImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.ra.inflow.HornetQActivation;
@@ -1403,20 +1405,24 @@
}
else if (discoveryAddress != null)
{
+ // FIXME make discovery stategy pluggable with configuration
+ Map<String,Object> params = new HashMap<String,Object>();
+
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
: getDiscoveryPort();
- DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(discoveryAddress, discoveryPort);
-
long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
: raProperties.getDiscoveryRefreshTimeout();
long initialTimeout = overrideProperties.getDiscoveryInitialWaitTimeout() != null ? overrideProperties.getDiscoveryInitialWaitTimeout()
: raProperties.getDiscoveryInitialWaitTimeout();
- groupConfiguration.setDiscoveryInitialWaitTimeout(initialTimeout);
+ params.put(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, discoveryAddress);
+ params.put(DiscoveryGroupConstants.GROUP_PORT_NAME, discoveryPort);
+ params.put(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, initialTimeout);
+ params.put(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, initialTimeout);
- groupConfiguration.setRefreshTimeout(refreshTimeout);
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(SimpleUDPServerLocatorImpl.class.getName(), params, null);
if (ha)
{
Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -32,7 +32,7 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -189,7 +189,7 @@
{
configs[i] = createTransportConfiguration(isNetty(), false, generateParams(nodes[i], isNetty()));
}
- return new ServerLocatorImpl(true, configs);
+ return new AbstractServerLocator(true, configs);
}
// Private -------------------------------------------------------
Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java 2011-01-26 15:01:24 UTC (rev 10149)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionFactoryOnGCest.java 2011-01-27 14:06:20 UTC (rev 10150)
@@ -27,7 +27,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
-import org.hornetq.core.client.impl.ServerLocatorImpl;
+import org.hornetq.core.client.impl.AbstractServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.tests.util.JMSTestBase;
@@ -61,7 +61,7 @@
final AtomicInteger valueGC = new AtomicInteger(0);
- ServerLocatorImpl.finalizeCallback = new Runnable()
+ AbstractServerLocator.finalizeCallback = new Runnable()
{
public void run()
{
@@ -85,7 +85,7 @@
}
finally
{
- ServerLocatorImpl.finalizeCallback = null;
+ AbstractServerLocator.finalizeCallback = null;
}
assertEquals("The code is throwing exceptions", 0, valueGC.get());
13 years, 11 months
JBoss hornetq SVN: r10149 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-01-26 10:01:24 -0500 (Wed, 26 Jan 2011)
New Revision: 10149
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/NodeManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
Log:
https://issues.jboss.org/browse/JBPAPP-5781 - add flag to enable interrupting
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/NodeManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/NodeManager.java 2011-01-26 03:42:42 UTC (rev 10148)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/NodeManager.java 2011-01-26 15:01:24 UTC (rev 10149)
@@ -72,4 +72,6 @@
public abstract boolean isAwaitingFailback() throws Exception;
public abstract boolean isBackupLive() throws Exception;
+
+ public abstract void interrupt();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-01-26 03:42:42 UTC (rev 10148)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2011-01-26 15:01:24 UTC (rev 10149)
@@ -63,7 +63,9 @@
private final String directory;
+ boolean interrupted = false;
+
public FileLockNodeManager(final String directory)
{
this.directory = directory;
@@ -137,7 +139,14 @@
return false;
}
}
+
@Override
+ public void interrupt()
+ {
+ interrupted = true;
+ }
+
+ @Override
public void releaseBackup() throws Exception
{
releaseBackupLock();
@@ -332,8 +341,9 @@
//
}
}
- if (Thread.currentThread().isInterrupted())
+ if (interrupted)
{
+ interrupted = false;
throw new IOException(new InterruptedException());
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-01-26 03:42:42 UTC (rev 10148)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-01-26 15:01:24 UTC (rev 10149)
@@ -455,6 +455,8 @@
while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout)
{
+ nodeManager.interrupt();
+
backupActivationThread.interrupt();
Thread.sleep(1000);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2011-01-26 03:42:42 UTC (rev 10148)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2011-01-26 15:01:24 UTC (rev 10149)
@@ -128,6 +128,12 @@
return liveLock.availablePermits() == 0;
}
+ @Override
+ public void interrupt()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
private void releaseBackupNode()
{
if(backupLock != null)
13 years, 11 months
JBoss hornetq SVN: r10148 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-25 22:42:42 -0500 (Tue, 25 Jan 2011)
New Revision: 10148
Modified:
branches/Branch_2_2_EAP/build-maven.xml
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
Log:
uploading new release
Modified: branches/Branch_2_2_EAP/build-maven.xml
===================================================================
--- branches/Branch_2_2_EAP/build-maven.xml 2011-01-26 03:28:49 UTC (rev 10147)
+++ branches/Branch_2_2_EAP/build-maven.xml 2011-01-26 03:42:42 UTC (rev 10148)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.2.0.EAP-QA-10136"/>
+ <property name="hornetq.version" value="2.2.0.EAP-QA-10148"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-01-26 03:28:49 UTC (rev 10147)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-01-26 03:42:42 UTC (rev 10148)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.0.EAP-QA-10136</hornetq.version>
+ <hornetq.version>2.2.0.EAP-QA-10148</hornetq.version>
</properties>
<licenses>
13 years, 11 months
JBoss hornetq SVN: r10147 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-25 22:28:49 -0500 (Tue, 25 Jan 2011)
New Revision: 10147
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
Log:
removing System.out
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-01-26 03:21:12 UTC (rev 10146)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-01-26 03:28:49 UTC (rev 10147)
@@ -272,7 +272,7 @@
if (complete)
{
- System.out.println("Disabling depage!");
+ log.debug("Address " + pagingStore.getAddress() + " is leaving page mode as all messages are consumed and acknowledged from the page store");
pagingStore.forceAnotherPage();
Page currentPage = pagingStore.getCurrentPage();
13 years, 11 months
JBoss hornetq SVN: r10146 - in branches/Branch_2_2_EAP: src/main/org/hornetq/jms/client and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-01-25 22:21:12 -0500 (Tue, 25 Jan 2011)
New Revision: 10146
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
Log:
HORNETQ-616 / HORNETQ-617 - settings of connection factories through management
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2011-01-25 21:58:43 UTC (rev 10145)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2011-01-26 03:21:12 UTC (rev 10146)
@@ -161,7 +161,82 @@
@Parameter(name = "cfType", desc = "RegularCF=0, QueueCF=1, TopicCF=2, XACF=3, QueueXACF=4, TopicXACF=5") int cfType,
@Parameter(name = "connectorNames", desc = "comma-separated list of connectorNames or the discovery group name") String connectors,
@Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings (use ',' if u need to use commas in your jndi name)") String jndiBindings) throws Exception;
+
+ @Operation(desc = "Create a JMS ConnectionFactory", impact = MBeanOperationInfo.ACTION)
+ void createConnectionFactory(@Parameter(name = "name") String name,
+ @Parameter(name = "ha") boolean ha,
+ @Parameter(name = "useDiscovery", desc = "should we use discovery or a connector configuration") boolean useDiscovery,
+ @Parameter(name = "cfType", desc = "RegularCF=0, QueueCF=1, TopicCF=2, XACF=3, QueueXACF=4, TopicXACF=5") int cfType,
+ @Parameter(name = "connectorNames", desc = "An array of connector or the binding address") String[] connectors,
+ @Parameter(name = "jndiBindings", desc = "array JNDI bindings (use ',' if u need to use commas in your jndi name)") String[] jndiBindings,
+ @Parameter(name = "clientID", desc = "The clientID configured for the connectionFactory") String clientID,
+ @Parameter(name = "clientFailureCheckPeriod", desc = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
+ @Parameter(name = "connectionTTL", desc = "connectionTTL") long connectionTTL,
+ @Parameter(name = "callTimeout", desc = "callTimeout") long callTimeout,
+ @Parameter(name = "minLargeMessageSize", desc = "minLargeMessageSize") int minLargeMessageSize,
+ @Parameter(name = "compressLargeMessages", desc = "compressLargeMessages") boolean compressLargeMessages,
+ @Parameter(name = "consumerWindowSize", desc = "consumerWindowSize") int consumerWindowSize,
+ @Parameter(name = "consumerMaxRate", desc = "consumerMaxRate") int consumerMaxRate,
+ @Parameter(name = "confirmationWindowSize", desc = "confirmationWindowSize") int confirmationWindowSize,
+ @Parameter(name = "producerWindowSize", desc = "producerWindowSize") int producerWindowSize,
+ @Parameter(name = "producerMaxRate", desc = "producerMaxRate") int producerMaxRate,
+ @Parameter(name = "blockOnAcknowledge", desc = "blockOnAcknowledge") boolean blockOnAcknowledge,
+ @Parameter(name = "blockOnDurableSend", desc = "blockOnDurableSend") boolean blockOnDurableSend,
+ @Parameter(name = "blockOnNonDurableSend", desc = "blockOnNonDurableSend") boolean blockOnNonDurableSend,
+ @Parameter(name = "autoGroup", desc = "autoGroup") boolean autoGroup,
+ @Parameter(name = "preAcknowledge", desc = "preAcknowledge") boolean preAcknowledge,
+ @Parameter(name = "loadBalancingPolicyClassName", desc = "loadBalancingPolicyClassName (null or blank mean use the default value)") String loadBalancingPolicyClassName,
+ @Parameter(name = "transactionBatchSize", desc = "transactionBatchSize") int transactionBatchSize,
+ @Parameter(name = "dupsOKBatchSize", desc = "dupsOKBatchSize") int dupsOKBatchSize,
+ @Parameter(name = "useGlobalPools", desc = "useGlobalPools") boolean useGlobalPools,
+ @Parameter(name = "scheduledThreadPoolMaxSize", desc = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize", desc = "threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "retryInterval", desc = "retryInterval") long retryInterval,
+ @Parameter(name = "retryIntervalMultiplier", desc = "retryIntervalMultiplier") double retryIntervalMultiplier,
+ @Parameter(name = "maxRetryInterval", desc = "maxRetryInterval") long maxRetryInterval,
+ @Parameter(name = "reconnectAttempts", desc = "reconnectAttempts") int reconnectAttempts,
+ @Parameter(name = "failoverOnInitialConnection", desc = "failoverOnInitialConnection") boolean failoverOnInitialConnection,
+ @Parameter(name = "groupId", desc = "groupId") String groupId) throws Exception;
+
+ @Operation(desc = "Create a JMS ConnectionFactory", impact = MBeanOperationInfo.ACTION)
+ void createConnectionFactory(@Parameter(name = "name") String name,
+ @Parameter(name = "ha") boolean ha,
+ @Parameter(name = "useDiscovery", desc = "should we use discovery or a connector configuration") boolean useDiscovery,
+ @Parameter(name = "cfType", desc = "RegularCF=0, QueueCF=1, TopicCF=2, XACF=3, QueueXACF=4, TopicXACF=5") int cfType,
+ @Parameter(name = "connectorNames", desc = "comma-separated list of connectorNames or the discovery group name") String connectors,
+ @Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings (use ',' if u need to use commas in your jndi name)") String jndiBindings,
+ @Parameter(name = "clientID", desc = "The clientID configured for the connectionFactory") String clientID,
+ @Parameter(name = "clientFailureCheckPeriod", desc = "clientFailureCheckPeriod") long clientFailureCheckPeriod,
+ @Parameter(name = "connectionTTL", desc = "connectionTTL") long connectionTTL,
+ @Parameter(name = "callTimeout", desc = "callTimeout") long callTimeout,
+ @Parameter(name = "minLargeMessageSize", desc = "minLargeMessageSize") int minLargeMessageSize,
+ @Parameter(name = "compressLargeMessages", desc = "compressLargeMessages") boolean compressLargeMessages,
+ @Parameter(name = "consumerWindowSize", desc = "consumerWindowSize") int consumerWindowSize,
+ @Parameter(name = "consumerMaxRate", desc = "consumerMaxRate") int consumerMaxRate,
+ @Parameter(name = "confirmationWindowSize", desc = "confirmationWindowSize") int confirmationWindowSize,
+ @Parameter(name = "producerWindowSize", desc = "producerWindowSize") int producerWindowSize,
+ @Parameter(name = "producerMaxRate", desc = "producerMaxRate") int producerMaxRate,
+ @Parameter(name = "blockOnAcknowledge", desc = "blockOnAcknowledge") boolean blockOnAcknowledge,
+ @Parameter(name = "blockOnDurableSend", desc = "blockOnDurableSend") boolean blockOnDurableSend,
+ @Parameter(name = "blockOnNonDurableSend", desc = "blockOnNonDurableSend") boolean blockOnNonDurableSend,
+ @Parameter(name = "autoGroup", desc = "autoGroup") boolean autoGroup,
+ @Parameter(name = "preAcknowledge", desc = "preAcknowledge") boolean preAcknowledge,
+ @Parameter(name = "loadBalancingPolicyClassName", desc = "loadBalancingPolicyClassName (null or blank mean use the default value)") String loadBalancingPolicyClassName,
+ @Parameter(name = "transactionBatchSize", desc = "transactionBatchSize") int transactionBatchSize,
+ @Parameter(name = "dupsOKBatchSize", desc = "dupsOKBatchSize") int dupsOKBatchSize,
+ @Parameter(name = "useGlobalPools", desc = "useGlobalPools") boolean useGlobalPools,
+ @Parameter(name = "scheduledThreadPoolMaxSize", desc = "scheduledThreadPoolMaxSize") int scheduledThreadPoolMaxSize,
+ @Parameter(name = "threadPoolMaxSize", desc = "threadPoolMaxSize") int threadPoolMaxSize,
+ @Parameter(name = "retryInterval", desc = "retryInterval") long retryInterval,
+ @Parameter(name = "retryIntervalMultiplier", desc = "retryIntervalMultiplier") double retryIntervalMultiplier,
+ @Parameter(name = "maxRetryInterval", desc = "maxRetryInterval") long maxRetryInterval,
+ @Parameter(name = "reconnectAttempts", desc = "reconnectAttempts") int reconnectAttempts,
+ @Parameter(name = "failoverOnInitialConnection", desc = "failoverOnInitialConnection") boolean failoverOnInitialConnection,
+ @Parameter(name = "groupId", desc = "groupId") String groupId) throws Exception;
+
+
+
@Operation(desc = "Destroy a JMS ConnectionFactory", impact = MBeanOperationInfo.ACTION)
void destroyConnectionFactory(@Parameter(name = "name", desc = "Name of the ConnectionFactory to destroy") String name) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-01-25 21:58:43 UTC (rev 10145)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2011-01-26 03:21:12 UTC (rev 10146)
@@ -604,6 +604,7 @@
{
JMSException jmse = new JMSException("Failed to create session factory");
+ jmse.initCause(e);
jmse.setLinkedException(e);
throw jmse;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2011-01-25 21:58:43 UTC (rev 10145)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2011-01-26 03:21:12 UTC (rev 10146)
@@ -29,6 +29,7 @@
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
@@ -44,6 +45,8 @@
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.json.JSONArray;
import org.hornetq.utils.json.JSONObject;
@@ -206,6 +209,186 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.api.jms.management.JMSServerControl#createConnectionFactory(java.lang.String, boolean, boolean, int, java.lang.String, java.lang.String, java.lang.String, long, long, long, int, boolean, int, int, int, int, int, boolean, boolean, boolean, boolean, boolean, java.lang.String, int, int, boolean, int, int, long, double, long, int, boolean, java.lang.String)
+ */
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String connectors,
+ String jndiBindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ createConnectionFactory(name,
+ ha,
+ useDiscovery,
+ cfType,
+ toArray(connectors),
+ toArray(jndiBindings),
+ clientID,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ callTimeout,
+ minLargeMessageSize,
+ compressLargeMessages,
+ consumerWindowSize,
+ consumerMaxRate,
+ confirmationWindowSize,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnDurableSend,
+ blockOnNonDurableSend,
+ autoGroup,
+ preAcknowledge,
+ loadBalancingPolicyClassName,
+ transactionBatchSize,
+ dupsOKBatchSize,
+ useGlobalPools,
+ scheduledThreadPoolMaxSize,
+ threadPoolMaxSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ groupId);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.jms.management.JMSServerControl#createConnectionFactory(java.lang.String, boolean, boolean, int, java.lang.String[], java.lang.String[], java.lang.String, long, long, long, int, boolean, int, int, int, int, int, boolean, boolean, boolean, boolean, boolean, java.lang.String, int, int, boolean, int, int, long, double, long, int, boolean, java.lang.String)
+ */
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String[] connectorNames,
+ String[] bindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, bindings);
+
+ if (useDiscovery)
+ {
+ configuration.setDiscoveryGroupName(connectorNames[0]);
+ }
+ else
+ {
+ ArrayList<String> connectorNamesList = new ArrayList<String>();
+ for (String nameC : connectorNames)
+ {
+ connectorNamesList.add(nameC);
+ }
+ configuration.setConnectorNames(connectorNamesList);
+ }
+
+ configuration.setFactoryType(JMSFactoryType.valueOf(cfType));
+ configuration.setClientID(clientID);
+ configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
+ configuration.setConnectionTTL(connectionTTL);
+ configuration.setCallTimeout(callTimeout);
+ configuration.setMinLargeMessageSize(minLargeMessageSize);
+ configuration.setCompressLargeMessages(compressLargeMessages);
+ configuration.setConsumerWindowSize(consumerWindowSize);
+ configuration.setConsumerMaxRate(consumerMaxRate);
+ configuration.setConfirmationWindowSize(confirmationWindowSize);
+ configuration.setProducerWindowSize(producerWindowSize);
+ configuration.setProducerMaxRate(producerMaxRate);
+ configuration.setBlockOnAcknowledge(blockOnAcknowledge);
+ configuration.setBlockOnDurableSend(blockOnDurableSend);
+ configuration.setBlockOnNonDurableSend(blockOnNonDurableSend);
+ configuration.setAutoGroup(autoGroup);
+ configuration.setPreAcknowledge(preAcknowledge);
+
+ if (loadBalancingPolicyClassName == null || loadBalancingPolicyClassName.trim().equals(""))
+ {
+ loadBalancingPolicyClassName = HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+ }
+
+ configuration.setLoadBalancingPolicyClassName(loadBalancingPolicyClassName);
+ configuration.setTransactionBatchSize(transactionBatchSize);
+ configuration.setDupsOKBatchSize(dupsOKBatchSize);
+ configuration.setUseGlobalPools(useGlobalPools);
+ configuration.setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize);
+ configuration.setThreadPoolMaxSize(threadPoolMaxSize);
+ configuration.setRetryInterval(retryInterval);
+ configuration.setRetryIntervalMultiplier(retryIntervalMultiplier);
+ configuration.setMaxRetryInterval(maxRetryInterval);
+ configuration.setReconnectAttempts(reconnectAttempts);
+ configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
+ configuration.setGroupID(groupId);
+
+ server.createConnectionFactory(true, configuration, bindings);
+
+ sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
/**
* Create a JMS ConnectionFactory with the specified name connected to a single live-backup pair of servers.
* <br>
@@ -700,7 +883,6 @@
return MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class);
}
-
// Private -------------------------------------------------------
private void sendNotification(final NotificationType type, final String message)
@@ -716,6 +898,7 @@
throw new IllegalStateException("HornetQ JMS Server is not started. it can not be managed yet");
}
}
+
// Inner classes -------------------------------------------------
public static enum NotificationType
@@ -860,5 +1043,4 @@
return obj;
}
-
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2011-01-25 21:58:43 UTC (rev 10145)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2011-01-26 03:21:12 UTC (rev 10146)
@@ -516,6 +516,8 @@
connectorNames.add(str.toString());
}
}
+
+ ha = buffer.readBoolean();
clientID = BufferHelper.readNullableSimpleStringAsString(buffer);
@@ -602,6 +604,8 @@
BufferHelper.writeAsSimpleString(buffer, tc);
}
}
+
+ buffer.writeBoolean(ha);
BufferHelper.writeAsNullableSimpleString(buffer, clientID);
@@ -686,6 +690,9 @@
}
size += BufferHelper.sizeOfNullableSimpleString(clientID) +
+
+ DataConstants.SIZE_BOOLEAN +
+ // ha
DataConstants.SIZE_LONG +
// clientFailureCheckPeriod
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-01-25 21:58:43 UTC (rev 10145)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-01-26 03:21:12 UTC (rev 10146)
@@ -912,6 +912,17 @@
storage.addJNDI(PersistedType.ConnectionFactory, cfConfig.getName(), usedJNDI);
}
}
+
+ public JMSStorageManager getJMSStorageManager()
+ {
+ return storage;
+ }
+
+ // used on tests only
+ public void replaceStorageManager(JMSStorageManager newStorage)
+ {
+ this.storage = newStorage;
+ }
private String[] getJNDIList(final Map<String, List<String>> map, final String name)
{
@@ -1085,6 +1096,7 @@
cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
+ cf.setGroupID(cfConfig.getGroupID());
}
connectionFactories.put(cfConfig.getName(), cf);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-01-25 21:58:43 UTC (rev 10145)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2011-01-26 03:21:12 UTC (rev 10146)
@@ -14,7 +14,6 @@
package org.hornetq.tests.integration.jms.server.management;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,31 +34,31 @@
import junit.framework.Assert;
-import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.ObjectNameBuilder;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.client.HornetQConnection;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQQueueConnectionFactory;
import org.hornetq.jms.persistence.JMSStorageManager;
import org.hornetq.jms.persistence.config.PersistedConnectionFactory;
import org.hornetq.jms.persistence.config.PersistedDestination;
import org.hornetq.jms.persistence.config.PersistedJNDI;
import org.hornetq.jms.persistence.config.PersistedType;
-import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -112,7 +111,7 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
/** Number of consumers used by the test itself */
protected int getNumberOfConsumers()
{
@@ -395,7 +394,6 @@
Assert.assertNull(fakeJMSStorageManager.destinationMap.get(topicName));
}
-
public void testListAllConsumers() throws Exception
{
String topicJNDIBinding = RandomUtil.randomString();
@@ -416,15 +414,15 @@
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create a consumer will create a Core queue bound to the topic address
MessageConsumer cons = session.createConsumer(topic);
-
+
JSONArray jsonArray = new JSONArray(control.listAllConsumersAsJSON());
-
+
assertEquals(1 + getNumberOfConsumers(), jsonArray.length());
-
+
cons.close();
-
+
jsonArray = new JSONArray(control.listAllConsumersAsJSON());
-
+
assertEquals(getNumberOfConsumers(), jsonArray.length());
String topicAddress = HornetQDestination.createTopicAddressFromName(topicName).toString();
@@ -465,8 +463,10 @@
public void testCreateConnectionFactory_3b() throws Exception
{
- server.getConfiguration().getConnectorConfigurations().put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-
+ server.getConfiguration()
+ .getConnectorConfigurations()
+ .put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
doCreateConnectionFactory(new ConnectionFactoryCreator()
{
public void createConnectionFactory(final JMSServerControl control,
@@ -475,16 +475,119 @@
{
String jndiBindings = JMSServerControlTest.toCSV(bindings);
- control.createConnectionFactory(cfName,
- false,
- false,
- 0,
- "tst",
- jndiBindings);
+ control.createConnectionFactory(cfName, false, false, 0, "tst", jndiBindings);
}
});
}
+ public void testCreateConnectionFactory_CopmleteList() throws Exception
+ {
+ JMSServerControl control = createManagementControl();
+ control.createConnectionFactory("test", //name
+ true, // ha
+ false, // useDiscovery
+ 1, // cfType
+ "invm", // connectorNames
+ "tst", // jndiBindins
+ "tst", // clientID
+ 1, // clientFailureCheckPeriod
+ 1, // connectionTTL
+ 1, // callTimeout
+ 1, // minLargeMessageSize
+ true, // compressLargeMessages
+ 1, // consumerWindowSize
+ 1, // consumerMaxRate
+ 1, // confirmationWindowSize
+ 1, // ProducerWindowSize
+ 1, // producerMaxRate
+ true, // blockOnACK
+ true, // blockOnDurableSend
+ true, // blockOnNonDurableSend
+ true, // autoGroup
+ true, // preACK
+ HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, // loadBalancingClassName
+ 1, // transactionBatchSize
+ 1, // dupsOKBatchSize
+ true, // useGlobalPools
+ 1, // scheduleThreadPoolSize
+ 1, // threadPoolMaxSize
+ 1, // retryInterval
+ 1, // retryIntervalMultiplier
+ 1, // maxRetryInterval
+ 1, // reconnectAttempts
+ true, // failoverOnInitialConnection
+ "tst"); // groupID
+
+
+ HornetQQueueConnectionFactory cf = (HornetQQueueConnectionFactory)context.lookup("tst");
+
+ assertEquals(true, cf.isHA());
+ assertEquals("tst", cf.getClientID());
+ assertEquals(1, cf.getClientFailureCheckPeriod());
+ assertEquals(1, cf.getConnectionTTL());
+ assertEquals(1, cf.getCallTimeout());
+ assertEquals(1, cf.getMinLargeMessageSize());
+ assertEquals(true, cf.isCompressLargeMessage());
+ assertEquals(1, cf.getConsumerWindowSize());
+ assertEquals(1, cf.getConfirmationWindowSize());
+ assertEquals(1, cf.getProducerWindowSize());
+ assertEquals(1, cf.getProducerMaxRate());
+ assertEquals(true, cf.isBlockOnAcknowledge());
+ assertEquals(true, cf.isBlockOnDurableSend());
+ assertEquals(true, cf.isBlockOnNonDurableSend());
+ assertEquals(true, cf.isAutoGroup());
+ assertEquals(true, cf.isPreAcknowledge());
+ assertEquals(HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, cf.getConnectionLoadBalancingPolicyClassName());
+ assertEquals(1, cf.getTransactionBatchSize());
+ assertEquals(1, cf.getDupsOKBatchSize());
+ assertEquals(true, cf.isUseGlobalPools());
+ assertEquals(1, cf.getScheduledThreadPoolMaxSize());
+ assertEquals(1, cf.getThreadPoolMaxSize());
+ assertEquals(1, cf.getRetryInterval());
+ assertEquals(1.0, cf.getRetryIntervalMultiplier());
+ assertEquals(1, cf.getMaxRetryInterval());
+ assertEquals(1, cf.getReconnectAttempts());
+ assertEquals(true, cf.isFailoverOnInitialConnection());
+ assertEquals("tst", cf.getGroupID());
+
+ stopServer();
+
+ startServer();
+
+ cf = (HornetQQueueConnectionFactory)context.lookup("tst");
+
+ assertEquals(true, cf.isHA());
+ assertEquals("tst", cf.getClientID());
+ assertEquals(1, cf.getClientFailureCheckPeriod());
+ assertEquals(1, cf.getConnectionTTL());
+ assertEquals(1, cf.getCallTimeout());
+ assertEquals(1, cf.getMinLargeMessageSize());
+ assertEquals(true, cf.isCompressLargeMessage());
+ assertEquals(1, cf.getConsumerWindowSize());
+ assertEquals(1, cf.getConfirmationWindowSize());
+ assertEquals(1, cf.getProducerWindowSize());
+ assertEquals(1, cf.getProducerMaxRate());
+ assertEquals(true, cf.isBlockOnAcknowledge());
+ assertEquals(true, cf.isBlockOnDurableSend());
+ assertEquals(true, cf.isBlockOnNonDurableSend());
+ assertEquals(true, cf.isAutoGroup());
+ assertEquals(true, cf.isPreAcknowledge());
+ assertEquals(HornetQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, cf.getConnectionLoadBalancingPolicyClassName());
+ assertEquals(1, cf.getTransactionBatchSize());
+ assertEquals(1, cf.getDupsOKBatchSize());
+ assertEquals(true, cf.isUseGlobalPools());
+ assertEquals(1, cf.getScheduledThreadPoolMaxSize());
+ assertEquals(1, cf.getThreadPoolMaxSize());
+ assertEquals(1, cf.getRetryInterval());
+ assertEquals(1.0, cf.getRetryIntervalMultiplier());
+ assertEquals(1, cf.getMaxRetryInterval());
+ assertEquals(1, cf.getReconnectAttempts());
+ assertEquals(true, cf.isFailoverOnInitialConnection());
+ assertEquals("tst", cf.getGroupID());
+
+
+ }
+
public void testListPreparedTransactionDetails() throws Exception
{
Xid xid = newXID();
@@ -492,9 +595,11 @@
JMSServerControl control = createManagementControl();
String cfJNDIBinding = "/cf";
String cfName = "cf";
-
- server.getConfiguration().getConnectorConfigurations().put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ server.getConfiguration()
+ .getConnectorConfigurations()
+ .put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
control.createConnectionFactory(cfName, false, false, 0, "tst", cfJNDIBinding);
control.createQueue("q", "/q");
@@ -535,11 +640,13 @@
TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getName());
String cfJNDIBinding = "/cf";
String cfName = "cf";
-
- server.getConfiguration().getConnectorConfigurations().put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- control.createConnectionFactory(cfName, false, false, 0, "tst", cfJNDIBinding);
+ server.getConfiguration()
+ .getConnectorConfigurations()
+ .put("tst", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ control.createConnectionFactory(cfName, false, false, 0, "tst", cfJNDIBinding);
+
control.createQueue("q", "/q");
ConnectionFactory cf = (ConnectionFactory)context.lookup("/cf");
@@ -580,23 +687,50 @@
{
super.setUp();
+ startServer();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void startServer() throws Exception
+ {
Configuration conf = createBasicConfig();
conf.setSecurityEnabled(false);
conf.setJMXManagementEnabled(true);
- conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ conf.setPersistenceEnabled(true);
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
+ conf.getConnectorConfigurations().put("netty", new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ conf.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+
+ server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
+
+ serverManager = new JMSServerManagerImpl(server);
context = new InVMContext();
- fakeJMSStorageManager = new FakeJMSStorageManager();
- serverManager = new JMSServerManagerImpl(server, null, fakeJMSStorageManager);
serverManager.setContext(context);
serverManager.start();
serverManager.activated();
+
+ this.fakeJMSStorageManager = new FakeJMSStorageManager(serverManager.getJMSStorageManager());
+
+ serverManager.replaceStorageManager(fakeJMSStorageManager);
}
@Override
protected void tearDown() throws Exception
{
+ stopServer();
+
+ super.tearDown();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void stopServer() throws Exception
+ {
serverManager.stop();
server.stop();
@@ -604,8 +738,6 @@
serverManager = null;
server = null;
-
- super.tearDown();
}
protected JMSServerControl createManagementControl() throws Exception
@@ -615,9 +747,7 @@
// Private -------------------------------------------------------
- private void
-
- doCreateConnectionFactory(final ConnectionFactoryCreator creator) throws Exception
+ private void doCreateConnectionFactory(final ConnectionFactoryCreator creator) throws Exception
{
Object[] cfJNDIBindings = new Object[] { RandomUtil.randomString(),
RandomUtil.randomString(),
@@ -650,30 +780,6 @@
Assert.assertTrue(fakeJMSStorageManager.persistedJNDIMap.get(cfName).contains(cfJNDIBindings[2]));
}
- private JMSServerManager startHornetQServer(final int discoveryPort) throws Exception
- {
- Configuration conf = createBasicConfig();
- conf.setSecurityEnabled(false);
- conf.setJMXManagementEnabled(true);
- conf.getDiscoveryGroupConfigurations()
- .put("discovery",
- new DiscoveryGroupConfiguration("discovery",
- null,
- "231.7.7.7",
- discoveryPort,
- ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT,
- ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT));
- HornetQServer server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
-
- context = new InVMContext();
- JMSServerManagerImpl serverManager = new JMSServerManagerImpl(server);
- serverManager.setContext(context);
- serverManager.start();
- serverManager.activated();
-
- return serverManager;
- }
-
// Inner classes -------------------------------------------------
interface ConnectionFactoryCreator
@@ -688,35 +794,46 @@
Map<String, PersistedConnectionFactory> connectionFactoryMap = new HashMap<String, PersistedConnectionFactory>();
ConcurrentHashMap<String, List<String>> persistedJNDIMap = new ConcurrentHashMap<String, List<String>>();
+
+ JMSStorageManager delegate;
+
+ public FakeJMSStorageManager(JMSStorageManager delegate)
+ {
+ this.delegate = delegate;
+ }
public void storeDestination(PersistedDestination destination) throws Exception
{
destinationMap.put(destination.getName(), destination);
+ delegate.storeDestination(destination);
}
public void deleteDestination(PersistedType type, String name) throws Exception
{
destinationMap.remove(name);
+ delegate.deleteDestination(type, name);
}
public List<PersistedDestination> recoverDestinations()
{
- return Collections.EMPTY_LIST;
+ return delegate.recoverDestinations();
}
public void deleteConnectionFactory(String connectionFactory) throws Exception
{
connectionFactoryMap.remove(connectionFactory);
+ delegate.deleteConnectionFactory(connectionFactory);
}
public void storeConnectionFactory(PersistedConnectionFactory connectionFactory) throws Exception
{
connectionFactoryMap.put(connectionFactory.getName(), connectionFactory);
+ delegate.storeConnectionFactory(connectionFactory);
}
public List<PersistedConnectionFactory> recoverConnectionFactories()
{
- return Collections.EMPTY_LIST;
+ return delegate.recoverConnectionFactories();
}
public void addJNDI(PersistedType type, String name, String... address) throws Exception
@@ -726,36 +843,39 @@
{
persistedJNDIMap.get(name).add(ad);
}
+ delegate.addJNDI(type, name, address);
}
public List<PersistedJNDI> recoverPersistedJNDI() throws Exception
{
- return Collections.EMPTY_LIST;
+ return delegate.recoverPersistedJNDI();
}
public void deleteJNDI(PersistedType type, String name, String address) throws Exception
{
persistedJNDIMap.get(name).remove(address);
+ delegate.deleteJNDI(type, name, address);
}
public void deleteJNDI(PersistedType type, String name) throws Exception
{
persistedJNDIMap.get(name).clear();
+ delegate.deleteJNDI(type, name);
}
public void start() throws Exception
{
- // To change body of implemented methods use File | Settings | File Templates.
+ delegate.start();
}
public void stop() throws Exception
{
- // To change body of implemented methods use File | Settings | File Templates.
+ delegate.stop();
}
public boolean isStarted()
{
- return false; // To change body of implemented methods use File | Settings | File Templates.
+ return delegate.isStarted();
}
/* (non-Javadoc)
@@ -763,6 +883,7 @@
*/
public void installReplication(ReplicationEndpoint replicationEndpoint) throws Exception
{
+ delegate.installReplication(replicationEndpoint);
}
/* (non-Javadoc)
@@ -770,6 +891,7 @@
*/
public void load() throws Exception
{
+ delegate.load();
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2011-01-25 21:58:43 UTC (rev 10145)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2011-01-26 03:21:12 UTC (rev 10146)
@@ -66,13 +66,13 @@
return 1;
}
-
@Override
protected void setUp() throws Exception
{
super.setUp();
- HornetQConnectionFactory cf = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ HornetQConnectionFactory cf = (HornetQConnectionFactory)HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration(InVMConnectorFactory.class.getName()));
connection = cf.createQueueConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
@@ -93,13 +93,12 @@
@Override
protected JMSServerControl createManagementControl() throws Exception
{
- HornetQQueue managementQueue = (HornetQQueue) HornetQJMSClient.createQueue("hornetq.management");
+ HornetQQueue managementQueue = (HornetQQueue)HornetQJMSClient.createQueue("hornetq.management");
final JMSMessagingProxy proxy = new JMSMessagingProxy(session, managementQueue, ResourceNames.JMS_SERVER);
return new JMSServerControl()
{
-
public boolean closeConnectionsForAddress(final String ipAddress) throws Exception
{
return (Boolean)proxy.invokeOperation("closeConnectionsForAddress", ipAddress);
@@ -119,7 +118,7 @@
{
return (Boolean)proxy.invokeOperation("createQueue", name, jndiBindings, selector, durable);
}
-
+
public boolean createTopic(final String name) throws Exception
{
return (Boolean)proxy.invokeOperation("createTopic", name);
@@ -169,12 +168,12 @@
{
return (String[])proxy.invokeOperation("listConnectionIDs");
}
-
+
public String listConnectionsAsJSON() throws Exception
{
return (String)proxy.invokeOperation("listConnectionsAsJSON");
}
-
+
public String listConsumersAsJSON(String connectionID) throws Exception
{
return (String)proxy.invokeOperation("listConsumersAsJSON", connectionID);
@@ -199,13 +198,13 @@
{
proxy.invokeOperation("removeSecuritySettings", addressMatch);
}
-
+
@SuppressWarnings("unchecked")
public Set<Role> getSecuritySettings(String addressMatch) throws Exception
{
return (Set<Role>)proxy.invokeOperation("getSecuritySettings", addressMatch);
}
-
+
public String getSecuritySettingsAsJSON(String addressMatch) throws Exception
{
return (String)proxy.invokeOperation("getSecuritySettingsAsJSON", addressMatch);
@@ -259,10 +258,15 @@
Object[] bindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory", name, ha, useDiscovery, cfType, connectorNames, bindings);
-
+
}
- public void createConnectionFactory(String name, boolean ha, boolean useDiscovery, int cfType, String connectors, String jndiBindings) throws Exception
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String connectors,
+ String jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory", name, ha, useDiscovery, cfType, connectors, jndiBindings);
}
@@ -272,7 +276,150 @@
return (String)proxy.invokeOperation("listAllConsumersAsJSON");
}
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String[] connectors,
+ String[] jndiBindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ proxy.invokeOperation("createConnectionFactory",
+ name,
+ ha,
+ useDiscovery,
+ cfType,
+ connectors,
+ jndiBindings,
+ clientID,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ callTimeout,
+ minLargeMessageSize,
+ compressLargeMessages,
+ consumerWindowSize,
+ consumerMaxRate,
+ confirmationWindowSize,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnDurableSend,
+ blockOnNonDurableSend,
+ autoGroup,
+ preAcknowledge,
+ loadBalancingPolicyClassName,
+ transactionBatchSize,
+ dupsOKBatchSize,
+ useGlobalPools,
+ scheduledThreadPoolMaxSize,
+ threadPoolMaxSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ groupId);
+ }
+ public void createConnectionFactory(String name,
+ boolean ha,
+ boolean useDiscovery,
+ int cfType,
+ String connectors,
+ String jndiBindings,
+ String clientID,
+ long clientFailureCheckPeriod,
+ long connectionTTL,
+ long callTimeout,
+ int minLargeMessageSize,
+ boolean compressLargeMessages,
+ int consumerWindowSize,
+ int consumerMaxRate,
+ int confirmationWindowSize,
+ int producerWindowSize,
+ int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean blockOnDurableSend,
+ boolean blockOnNonDurableSend,
+ boolean autoGroup,
+ boolean preAcknowledge,
+ String loadBalancingPolicyClassName,
+ int transactionBatchSize,
+ int dupsOKBatchSize,
+ boolean useGlobalPools,
+ int scheduledThreadPoolMaxSize,
+ int threadPoolMaxSize,
+ long retryInterval,
+ double retryIntervalMultiplier,
+ long maxRetryInterval,
+ int reconnectAttempts,
+ boolean failoverOnInitialConnection,
+ String groupId) throws Exception
+ {
+ proxy.invokeOperation("createConnectionFactory",
+ name,
+ ha,
+ useDiscovery,
+ cfType,
+ connectors,
+ jndiBindings,
+ clientID,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ callTimeout,
+ minLargeMessageSize,
+ compressLargeMessages,
+ consumerWindowSize,
+ consumerMaxRate,
+ confirmationWindowSize,
+ producerWindowSize,
+ producerMaxRate,
+ blockOnAcknowledge,
+ blockOnDurableSend,
+ blockOnNonDurableSend,
+ autoGroup,
+ preAcknowledge,
+ loadBalancingPolicyClassName,
+ transactionBatchSize,
+ dupsOKBatchSize,
+ useGlobalPools,
+ scheduledThreadPoolMaxSize,
+ threadPoolMaxSize,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ groupId);
+ }
+
};
}
// Public --------------------------------------------------------
13 years, 11 months