[jboss-cvs] JBoss Messaging SVN: r5578 - in trunk: src/main/org/jboss/messaging/core/config/impl and 15 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 5 08:08:07 EST 2009
Author: timfox
Date: 2009-01-05 08:08:07 -0500 (Mon, 05 Jan 2009)
New Revision: 5578
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Message flow restart + various tweaks and fixes
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -13,6 +13,7 @@
package org.jboss.messaging.core.client.impl;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.LinkedList;
@@ -694,7 +695,12 @@
{
if (!directory.exists())
{
- directory.mkdirs();
+ boolean ok = directory.mkdirs();
+
+ if (!ok)
+ {
+ throw new IOException("Failed to create directory " + directory.getCanonicalPath());
+ }
}
ClientFileMessageImpl message = new ClientFileMessageImpl();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -324,7 +324,7 @@
private TransportConfiguration parseTransportConfiguration(final Node node)
{
Node nameNode = node.getAttributes().getNamedItem("name");
-
+
String name = nameNode != null ? nameNode.getNodeValue() : null;
NodeList children = node.getChildNodes();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -29,6 +29,7 @@
import java.util.List;
import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
/**
*
@@ -40,6 +41,8 @@
*/
public abstract class AbstractSequentialFactory implements SequentialFileFactory
{
+ private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
+
protected final String journalDir;
public AbstractSequentialFactory(final String journalDir)
@@ -51,9 +54,14 @@
* Create the directory if it doesn't exist yet
*/
public void createDirs() throws Exception
- {
+ {
File file = new File(journalDir);
- file.mkdirs();
+ boolean ok = file.mkdirs();
+//FIXME - uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1477 is complete
+// if (!ok)
+// {
+// throw new IOException("Failed to create directory " + journalDir);
+// }
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
/**
*
@@ -37,6 +38,8 @@
*/
public class NIOSequentialFileFactory extends AbstractSequentialFactory implements SequentialFileFactory
{
+ private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
+
public NIOSequentialFileFactory(final String journalDir)
{
super(journalDir);
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -69,7 +69,7 @@
* @return
* @throws Exception
*/
- PagingStore createPageStore(SimpleString destination) throws Exception;
+ PagingStore createPageStore(SimpleString destination, boolean createDir) throws Exception;
/** To return the PageStore associated with the address */
PagingStore getPageStore(SimpleString address) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -38,7 +38,7 @@
*/
public interface PagingStoreFactory
{
- PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings);
+ PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings, boolean createDir);
Executor getGlobalDepagerExecutor();
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -135,7 +135,7 @@
for (SimpleString dest : destinations)
{
- createPageStore(dest);
+ createPageStore(dest, false);
}
}
@@ -143,13 +143,13 @@
* @param destination
* @return
*/
- public synchronized PagingStore createPageStore(final SimpleString storeName) throws Exception
+ public synchronized PagingStore createPageStore(final SimpleString storeName, final boolean createDir) throws Exception
{
PagingStore store = stores.get(storeName);
if (store == null)
{
- store = newStore(storeName);
+ store = newStore(storeName, createDir);
PagingStore oldStore = stores.putIfAbsent(storeName, store);
@@ -172,7 +172,7 @@
if (store == null)
{
- store = createPageStore(storeName);
+ store = createPageStore(storeName, true);
}
return store;
@@ -334,9 +334,9 @@
// Private -------------------------------------------------------
- private PagingStore newStore(final SimpleString destinationName)
+ private PagingStore newStore(final SimpleString destinationName, final boolean createDir)
{
- return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
+ return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()), createDir);
}
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -106,17 +106,18 @@
parentExecutor.awaitTermination(30, TimeUnit.SECONDS);
}
- public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings)
+ public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings, final boolean createDir)
{
final String destinationDirectory = directory + "/" + Base64.encodeBytes(destinationName.getData(), Base64.URL_SAFE);
-
+
return new PagingStoreImpl(pagingManager,
storageManager,
postOffice,
newFileFactory(destinationDirectory),
destinationName,
settings,
- executorFactory.getExecutor());
+ executorFactory.getExecutor(),
+ createDir);
}
public void setPagingManager(final PagingManager pagingManager)
@@ -146,8 +147,7 @@
}
else
- {
-
+ {
ArrayList<SimpleString> filesReturn = new ArrayList<SimpleString>(files.length);
for (File file: files)
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -111,6 +111,8 @@
private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
private volatile boolean running = false;
+
+ private final boolean createDir;
// Static --------------------------------------------------------
@@ -135,7 +137,8 @@
final SequentialFileFactory fileFactory,
final SimpleString storeName,
final QueueSettings queueSettings,
- final Executor executor)
+ final Executor executor,
+ final boolean createDir)
{
if (pagingManager == null)
{
@@ -166,6 +169,8 @@
this.executor = executor;
this.pagingManager = pagingManager;
+
+ this.createDir = createDir;
}
// Public --------------------------------------------------------
@@ -531,7 +536,10 @@
{
currentPageLock.writeLock().lock();
- fileFactory.createDirs();
+ if (createDir)
+ {
+ fileFactory.createDirs();
+ }
firstPageId = Integer.MAX_VALUE;
currentPageId = 0;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -548,6 +548,8 @@
}
}
+ pagingManager.reloadStores();
+
Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
storageManager.loadMessageJournal(this, storageManager, queueSettingsRepository, queues, resourceManager, duplicateIDMap);
@@ -565,8 +567,7 @@
}
// This is necessary as if the server was previously stopped while a depage was being executed,
- // it needs to resume the depage process on those destinations
- pagingManager.reloadStores();
+ // it needs to resume the depage process on those destinations
pagingManager.startGlobalDepage();
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -138,7 +138,7 @@
}
private synchronized Address addAndUpdateAddressMap(SimpleString address)
- {
+ {
Address add = addresses.get(address);
if (add == null)
{
@@ -191,7 +191,7 @@
}
private List<SimpleString> getAddresses(final Address address)
- {
+ {
List<SimpleString> addresses = new ArrayList<SimpleString>();
SimpleString[] parts = address.getAddressParts();
@@ -199,18 +199,20 @@
addresses.add(SINGLE_WORD_SIMPLESTRING);
addresses.add(ANY_WORDS_SIMPLESTRING);
if (address.getAddressParts().length > 1)
- {
+ {
addresses = addPart(addresses, address, 1);
+
}
addresses.remove(address.getAddress());
+
return addresses;
}
private List<SimpleString> addPart(final List<SimpleString> addresses, final Address address, final int pos)
- {
+ {
List<SimpleString> newAddresses = new ArrayList<SimpleString>();
for (SimpleString add : addresses)
- {
+ {
newAddresses.add(add.concat(DELIM).concat(SINGLE_WORD));
newAddresses.add(add.concat(DELIM).concat(ANY_WORDS));
newAddresses.add(add.concat(DELIM).concat(address.getAddressParts()[pos]));
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -210,6 +210,10 @@
{
acceptor.stop();
}
+
+ acceptors.clear();
+
+ connections.clear();
started = false;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMAcceptor.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -116,7 +116,7 @@
throw new IllegalStateException("Acceptor is not started");
}
- new InVMConnection(connectionID, remoteHandler, new Listener(connector));
+ new InVMConnection(id, connectionID, remoteHandler, new Listener(connector));
}
public void disconnect(final String connectionID)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnection.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -53,19 +53,23 @@
private final String id;
private boolean closed;
+
+ private final int serverID;
private static final ExecutorFactory factory =
new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("JBM-InVM-Transport-Threads")));
private final Executor executor;
- public InVMConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
- {
- this(UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener);
+ public InVMConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener)
+ {
+ this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener);
}
- public InVMConnection(final String id, final BufferHandler handler, final ConnectionLifeCycleListener listener)
+ public InVMConnection(final int serverID, final String id, final BufferHandler handler, final ConnectionLifeCycleListener listener)
{
+ this.serverID = serverID;
+
this.handler = handler;
this.listener = listener;
@@ -134,6 +138,6 @@
public String getRemoteAddress()
{
- return "invm";
+ return "invm:" + serverID;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMConnector.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -62,7 +62,7 @@
}
}
- private final int id;
+ protected final int id;
private final BufferHandler handler;
@@ -156,11 +156,10 @@
}
}
-
// This may be an injection point for mocks on tests
protected Connection internalCreateConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
{
- return new InVMConnection(handler, listener);
+ return new InVMConnection(id, handler, listener);
}
private class Listener implements ConnectionLifeCycleListener
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/InVMRegistry.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -35,20 +35,19 @@
public class InVMRegistry
{
private static final Logger log = Logger.getLogger(InVMRegistry.class);
-
+
public static final InVMRegistry instance = new InVMRegistry();
-
- private ConcurrentMap<Integer, InVMAcceptor> acceptors =
- new ConcurrentHashMap<Integer, InVMAcceptor>();
-
+
+ private ConcurrentMap<Integer, InVMAcceptor> acceptors = new ConcurrentHashMap<Integer, InVMAcceptor>();
+
public void registerAcceptor(final int id, final InVMAcceptor acceptor)
- {
+ {
if (acceptors.putIfAbsent(id, acceptor) != null)
{
throw new IllegalArgumentException("Acceptor with id " + id + " already registered");
}
}
-
+
public void unregisterAcceptor(final int id)
{
if (acceptors.remove(id) == null)
@@ -56,17 +55,17 @@
throw new IllegalArgumentException("Acceptor with id " + id + " not registered");
}
}
-
+
public InVMAcceptor getAcceptor(final int id)
{
return acceptors.get(id);
}
-
+
public void clear()
{
this.acceptors.clear();
}
-
+
public int size()
{
return this.acceptors.size();
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -166,23 +166,26 @@
}
public synchronized void start() throws Exception
- {
+ {
if (started)
{
return;
}
+
+ queue.addConsumer(this);
createTx();
- createObjects();
-
- queue.addConsumer(this);
-
- started = true;
+ if (createObjects())
+ {
+ started = true;
+
+ queue.deliverAsync(executor);
+ }
}
public synchronized void stop() throws Exception
- {
+ {
started = false;
queue.removeConsumer(this);
@@ -239,7 +242,7 @@
{
return HandleStatus.BUSY;
}
-
+
reference.getQueue().referenceHandled();
refs.add(reference);
@@ -290,7 +293,7 @@
// Private -------------------------------------------------------
- private void createObjects() throws Exception
+ private boolean createObjects() throws Exception
{
try
{
@@ -302,12 +305,14 @@
stop();
- return;
+ return false;
}
session.addFailureListener(this);
producer = session.createProducer(null);
+
+ return true;
}
private synchronized void timeoutBatch()
@@ -359,13 +364,13 @@
SimpleString forwardingDestination = (SimpleString)message.getProperty(MessageImpl.HDR_ORIGIN_QUEUE);
- producer.send(forwardingDestination, message);
- }
+ producer.send(forwardingDestination, message);
+ }
session.commit();
tx.commit();
-
+
createTx();
busy = false;
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -39,7 +39,9 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.BindingType;
+import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.Link;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.cluster.Forwarder;
import org.jboss.messaging.core.server.cluster.MessageFlow;
@@ -101,7 +103,7 @@
private final int maxRetriesBeforeFailover;
private final int maxRetriesAfterFailover;
-
+
private final boolean useDuplicateDetection;
/*
@@ -159,7 +161,7 @@
this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
-
+
this.useDuplicateDetection = useDuplicateDetection;
this.updateConnectors(connectors);
@@ -220,7 +222,7 @@
this.maxRetriesBeforeFailover = maxRetriesBeforeFailover;
this.maxRetriesAfterFailover = maxRetriesAfterFailover;
-
+
this.useDuplicateDetection = useDuplicateDetection;
}
@@ -265,15 +267,15 @@
{
return started;
}
-
+
// MessageFlow implementation ------------------------------------
-
+
public SimpleString getName()
{
return name;
}
-
- //For testing only
+
+ // For testing only
public Set<Forwarder> getForwarders()
{
return new HashSet<Forwarder>(forwarders.values());
@@ -322,31 +324,66 @@
{
if (!forwarders.containsKey(connectorPair))
{
- SimpleString queueName = new SimpleString("outflow." + name +
- "." +
- UUIDGenerator.getInstance().generateSimpleStringUUID());
- SimpleString linkName = new SimpleString("link." + queueName.toString());
-
- Binding binding = postOffice.getBinding(queueName);
-
- // TODO need to delete store and forward queues that are no longer in the config
- // and also allow ability to change filterstring etc. while keeping the same name
- if (binding == null)
+ SimpleString linkName = new SimpleString("link." + name + "." +
+ generateConnectorString(connectorPair.a) + "-" +
+ (connectorPair.b == null ? "null" : generateConnectorString(connectorPair.b)));
+
+ Queue queue = null;
+
+ Bindings bindings = postOffice.getBindingsForAddress(address);
+
+ for (Binding binding: bindings.getBindings())
{
+ if (binding.getType() == BindingType.LINK)
+ {
+ Link link = (Link)binding.getBindable();
+
+ if (link.getName().equals(linkName))
+ {
+ //Found the link
+
+ SimpleString queueName = link.getLinkAddress();
+
+ Binding queueBinding = postOffice.getBinding(queueName);
+
+ if (queueBinding == null)
+ {
+ throw new IllegalStateException("Cannot find queue with name " + queueName);
+ }
+
+ queue = (Queue)queueBinding.getBindable();
+ }
+ }
+ }
+
+ if (queue == null)
+ {
+ SimpleString queueName = new SimpleString("outflow." + name +
+ "." +
+ UUIDGenerator.getInstance().generateSimpleStringUUID());
+
Filter filter = filterString == null ? null : new FilterImpl(filterString);
- //Create the queue
+ // Create the queue
+
+ Binding binding = postOffice.addQueueBinding(queueName, queueName, filter, true, false, exclusive);
- binding = postOffice.addQueueBinding(queueName, queueName, filter, true, false, exclusive);
-
- //Create the link
-
- postOffice.addLinkBinding(linkName, address, filter, true, false, exclusive, queueName, useDuplicateDetection);
+ queue = (Queue)binding.getBindable();
+
+ // Create the link
+
+ postOffice.addLinkBinding(linkName,
+ address,
+ filter,
+ true,
+ false,
+ exclusive,
+ queueName,
+ useDuplicateDetection);
}
-
- Queue queue = (Queue)binding.getBindable();
+
Forwarder forwarder = new ForwarderImpl(queue,
connectorPair,
executorFactory.getExecutor(),
@@ -370,5 +407,39 @@
}
}
}
+
+ private String replaceWildcardChars(final String str)
+ {
+ return str.replace('.', '-');
+ }
+
+ private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
+ {
+ StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
+
+ if (!config.getParams().isEmpty())
+ {
+ str.append("?");
+ }
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry: config.getParams().entrySet())
+ {
+ if (!first)
+ {
+ str.append("&");
+ }
+ String encodedKey = replaceWildcardChars(entry.getKey());
+
+ String val = entry.getValue().toString();
+ String encodedVal = replaceWildcardChars(val);
+
+ str.append(encodedKey).append('=').append(encodedVal);
+
+ first = false;
+ }
+ return new SimpleString(str.toString());
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -107,9 +107,9 @@
private PostOffice postOffice;
- private final ExecutorService asyncDeliveryPool = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads"));
+ private ExecutorService asyncDeliveryPool;
- private final ExecutorFactory executorFactory = new OrderedExecutorFactory(asyncDeliveryPool);
+ private ExecutorFactory executorFactory;
private HierarchicalRepository<Set<Role>> securityRepository;
@@ -156,7 +156,11 @@
{
return;
}
+
+ asyncDeliveryPool = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-async-session-delivery-threads"));
+ executorFactory = new OrderedExecutorFactory(asyncDeliveryPool);
+
/*
* The following components are pluggable on the messaging server: Configuration, StorageManager, RemotingService,
* SecurityManager and ManagementRegistration They must already be injected by the time the messaging server
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -159,7 +159,7 @@
DuplicateIDCache cache = null;
- if (duplicateID != null)
+ if (!message.isReload() && duplicateID != null)
{
cache = postOffice.getDuplicateIDCache(message.getDestination());
@@ -352,7 +352,7 @@
}
public void addLast(final MessageReference ref)
- {
+ {
add(ref, false);
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -32,7 +32,6 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*
- * TODO - this should be refactored to use transaction operations for adding, acking paging stuff etc
*/
public class TransactionImpl implements Transaction
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -30,11 +30,11 @@
import junit.framework.AssertionFailedError;
import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.config.Configuration;
@@ -42,7 +42,6 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -74,7 +74,7 @@
@Override
protected Connection internalCreateConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
{
- return new MockConnection(handler, listener);
+ return new MockConnection(id, handler, listener);
}
// Private -------------------------------------------------------
@@ -93,9 +93,9 @@
* @param handler
* @param listener
*/
- public MockConnection(final BufferHandler handler, final ConnectionLifeCycleListener listener)
+ public MockConnection(final int serverID, final BufferHandler handler, final ConnectionLifeCycleListener listener)
{
- super(handler, listener);
+ super(serverID, handler, listener);
}
@Override
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowRestartTest.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -0,0 +1,631 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_USE_DUPLICATE_DETECTION;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.MessageFlowConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.Pair;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A MessageFlowRestartTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 24 Nov 2008 14:26:45
+ *
+ *
+ */
+public class MessageFlowRestartTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(MessageFlowRestartTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testRestartOutflow() throws Exception
+ {
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ MessagingService service0 = createClusteredServiceWithParams(true, service0Params);
+
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ MessagingService service1 = createClusteredServiceWithParams(true, service1Params);
+
+ //We don't start server 1 at this point
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params,
+ "connector1");
+ connectors.put(server1tc.getName(), server1tc);
+
+ service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+
+ List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+ connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+
+ final SimpleString testAddress = new SimpleString("testaddress");
+
+ MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+ testAddress.toString(),
+ null,
+ false,
+ 1,
+ -1,
+ null,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ 0,
+ 0,
+ DEFAULT_USE_DUPLICATE_DETECTION,
+ connectorNames);
+
+ Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+
+ log.info("starting service0");
+ service0.start();
+
+ log.info("started service");
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ session0.createQueue(testAddress, testAddress, null, false, false);
+
+ ClientProducer prod0 = session0.createProducer(testAddress);
+
+ ClientConsumer cons0 = session0.createConsumer(testAddress);
+
+ session0.start();
+
+ final int numMessages = 10;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(true);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ log.info("sent messages");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage0 = cons0.receive(1000);
+ assertNotNull(rmessage0);
+ assertEquals(i, rmessage0.getProperty(propKey));
+ }
+
+ // At this point the messages should be in the store and forward queue for server 1
+
+ // Now shutdown server 0 and start servers 1 and 0
+
+ service0.stop();
+
+ service1.start();
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session1.createQueue(testAddress, testAddress, null, false, false);
+
+ service0.start();
+
+ csf0 = new ClientSessionFactoryImpl(server0tc);
+ session0 = csf0.createSession(false, true, true);
+
+ session0.createQueue(testAddress, testAddress, null, false, false);
+
+ cons0 = session0.createConsumer(testAddress);
+
+ session0.start();
+
+ ClientConsumer cons1 = session1.createConsumer(testAddress);
+
+ session1.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage0 = cons1.receive(1000);
+ assertNotNull(rmessage0);
+ assertEquals(i, rmessage0.getProperty(propKey));
+ }
+
+ ClientMessage rmessage = cons0.receive(1000);
+
+ assertNull(rmessage);
+
+ service0.stop();
+ service1.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+ }
+//
+// public void testStaticListRoundRobin() throws Exception
+// {
+// Map<String, Object> service0Params = new HashMap<String, Object>();
+// MessagingService service0 = createMessagingService(0, service0Params);
+//
+// Map<String, Object> service1Params = new HashMap<String, Object>();
+// MessagingService service1 = createMessagingService(1, service1Params);
+// service1.start();
+//
+// Map<String, Object> service2Params = new HashMap<String, Object>();
+// MessagingService service2 = createMessagingService(2, service2Params);
+// service2.start();
+//
+// Map<String, Object> service3Params = new HashMap<String, Object>();
+// MessagingService service3 = createMessagingService(3, service3Params);
+// service3.start();
+//
+// Map<String, Object> service4Params = new HashMap<String, Object>();
+// MessagingService service4 = createMessagingService(4, service4Params);
+// service4.start();
+//
+// Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+//
+// TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service1Params,
+// "connector1");
+// connectors.put(server1tc.getName(), server1tc);
+//
+// TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service2Params,
+// "connector2");
+// connectors.put(server2tc.getName(), server2tc);
+//
+// TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service3Params,
+// "connector3");
+// connectors.put(server3tc.getName(), server3tc);
+//
+// TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service4Params,
+// "connector4");
+// connectors.put(server4tc.getName(), server4tc);
+//
+// service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+//
+// List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();
+// connectorNames.add(new Pair<String, String>(server1tc.getName(), null));
+// connectorNames.add(new Pair<String, String>(server2tc.getName(), null));
+// connectorNames.add(new Pair<String, String>(server3tc.getName(), null));
+// connectorNames.add(new Pair<String, String>(server4tc.getName(), null));
+//
+// final SimpleString testAddress = new SimpleString("testaddress");
+//
+// MessageFlowConfiguration ofconfig = new MessageFlowConfiguration("outflow1",
+// testAddress.toString(),
+// null,
+// true,
+// 1,
+// -1,
+// null,
+// DEFAULT_RETRY_INTERVAL,
+// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+// DEFAULT_USE_DUPLICATE_DETECTION,
+// connectorNames);
+// Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+// ofconfigs.add(ofconfig);
+// service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+//
+// service0.start();
+//
+// TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service0Params);
+//
+// ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+// ClientSession session0 = csf0.createSession(false, true, true);
+//
+// ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+// ClientSession session1 = csf1.createSession(false, true, true);
+//
+// ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+// ClientSession session2 = csf2.createSession(false, true, true);
+//
+// ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+// ClientSession session3 = csf3.createSession(false, true, true);
+//
+// ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+// ClientSession session4 = csf4.createSession(false, true, true);
+//
+// session0.createQueue(testAddress, testAddress, null, false, false);
+// session1.createQueue(testAddress, testAddress, null, false, false);
+// session2.createQueue(testAddress, testAddress, null, false, false);
+// session3.createQueue(testAddress, testAddress, null, false, false);
+// session4.createQueue(testAddress, testAddress, null, false, false);
+//
+// ClientProducer prod0 = session0.createProducer(testAddress);
+//
+// ClientConsumer cons0 = session0.createConsumer(testAddress);
+// ClientConsumer cons1 = session1.createConsumer(testAddress);
+// ClientConsumer cons2 = session2.createConsumer(testAddress);
+// ClientConsumer cons3 = session3.createConsumer(testAddress);
+// ClientConsumer cons4 = session4.createConsumer(testAddress);
+//
+// session0.start();
+//
+// session1.start();
+// session2.start();
+// session3.start();
+// session4.start();
+//
+// final int numMessages = 10;
+//
+// final SimpleString propKey = new SimpleString("testkey");
+//
+// for (int i = 0; i < numMessages; i++)
+// {
+// ClientMessage message = session0.createClientMessage(false);
+// message.putIntProperty(propKey, i);
+// message.getBody().flip();
+//
+// prod0.send(message);
+// }
+//
+// // Refs should be round-robin'd in the same order the connectors are specified in the outflow
+// // With the local consumer being last since it was created last
+//
+// ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+//
+// consumers.add(cons1);
+// consumers.add(cons2);
+// consumers.add(cons3);
+// consumers.add(cons4);
+// consumers.add(cons0);
+//
+// int count = 0;
+// for (int i = 0; i < numMessages; i++)
+// {
+// ClientConsumer consumer = consumers.get(count);
+//
+// count++;
+// if (count == consumers.size())
+// {
+// count = 0;
+// }
+//
+// ClientMessage msg = consumer.receive(1000);
+//
+// assertNotNull(msg);
+//
+// assertEquals(i, msg.getProperty(propKey));
+//
+// msg.acknowledge();
+// }
+//
+// session0.close();
+// session1.close();
+// session2.close();
+// session3.close();
+// session4.close();
+//
+// service0.stop();
+// service1.stop();
+// service2.stop();
+// service3.stop();
+// service4.stop();
+//
+// assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+// }
+//
+//
+// public void testMultipleFlows() throws Exception
+// {
+// Map<String, Object> service0Params = new HashMap<String, Object>();
+// MessagingService service0 = createMessagingService(0, service0Params);
+//
+// Map<String, Object> service1Params = new HashMap<String, Object>();
+// MessagingService service1 = createMessagingService(1, service1Params);
+// service1.start();
+//
+// Map<String, Object> service2Params = new HashMap<String, Object>();
+// MessagingService service2 = createMessagingService(2, service2Params);
+// service2.start();
+//
+// Map<String, Object> service3Params = new HashMap<String, Object>();
+// MessagingService service3 = createMessagingService(3, service3Params);
+// service3.start();
+//
+// Map<String, Object> service4Params = new HashMap<String, Object>();
+// MessagingService service4 = createMessagingService(4, service4Params);
+// service4.start();
+//
+// Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+//
+// TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service1Params,
+// "connector1");
+// connectors.put(server1tc.getName(), server1tc);
+//
+// TransportConfiguration server2tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service2Params,
+// "connector2");
+// connectors.put(server2tc.getName(), server2tc);
+//
+// TransportConfiguration server3tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service3Params,
+// "connector3");
+// connectors.put(server3tc.getName(), server3tc);
+//
+// TransportConfiguration server4tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service4Params,
+// "connector4");
+// connectors.put(server4tc.getName(), server4tc);
+//
+// service0.getServer().getConfiguration().setConnectorConfigurations(connectors);
+//
+// List<Pair<String, String>> connectorNames1 = new ArrayList<Pair<String, String>>();
+// connectorNames1.add(new Pair<String, String>(server1tc.getName(), null));
+//
+// List<Pair<String, String>> connectorNames2 = new ArrayList<Pair<String, String>>();
+// connectorNames2.add(new Pair<String, String>(server2tc.getName(), null));
+//
+// List<Pair<String, String>> connectorNames3 = new ArrayList<Pair<String, String>>();
+// connectorNames3.add(new Pair<String, String>(server3tc.getName(), null));
+//
+// List<Pair<String, String>> connectorNames4 = new ArrayList<Pair<String, String>>();
+// connectorNames4.add(new Pair<String, String>(server4tc.getName(), null));
+//
+// final SimpleString testAddress = new SimpleString("testaddress");
+//
+// MessageFlowConfiguration ofconfig1 = new MessageFlowConfiguration("flow1",
+// testAddress.toString(),
+// "beatle='john'",
+// false,
+// 1,
+// -1,
+// null,
+// DEFAULT_RETRY_INTERVAL,
+// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+// DEFAULT_USE_DUPLICATE_DETECTION,
+// connectorNames1);
+// MessageFlowConfiguration ofconfig2 = new MessageFlowConfiguration("flow2",
+// testAddress.toString(),
+// "beatle='paul'",
+// false,
+// 1,
+// -1,
+// null,
+// DEFAULT_RETRY_INTERVAL,
+// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+// DEFAULT_USE_DUPLICATE_DETECTION,
+// connectorNames2);
+// MessageFlowConfiguration ofconfig3 = new MessageFlowConfiguration("flow3",
+// testAddress.toString(),
+// "beatle='george'",
+// false,
+// 1,
+// -1,
+// null,
+// DEFAULT_RETRY_INTERVAL,
+// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+// DEFAULT_USE_DUPLICATE_DETECTION,
+// connectorNames3);
+// MessageFlowConfiguration ofconfig4 = new MessageFlowConfiguration("flow4",
+// testAddress.toString(),
+// "beatle='ringo'",
+// false,
+// 1,
+// -1,
+// null,
+// DEFAULT_RETRY_INTERVAL,
+// DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+// DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+// DEFAULT_MAX_RETRIES_AFTER_FAILOVER,
+// DEFAULT_USE_DUPLICATE_DETECTION,
+// connectorNames4);
+//
+// Set<MessageFlowConfiguration> ofconfigs = new HashSet<MessageFlowConfiguration>();
+// ofconfigs.add(ofconfig1);
+// ofconfigs.add(ofconfig2);
+// ofconfigs.add(ofconfig3);
+// ofconfigs.add(ofconfig4);
+// service0.getServer().getConfiguration().setMessageFlowConfigurations(ofconfigs);
+//
+// service0.start();
+//
+// TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+// service0Params);
+//
+// ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+// ClientSession session0 = csf0.createSession(false, true, true);
+//
+// ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+// ClientSession session1 = csf1.createSession(false, true, true);
+//
+// ClientSessionFactory csf2 = new ClientSessionFactoryImpl(server2tc);
+// ClientSession session2 = csf2.createSession(false, true, true);
+//
+// ClientSessionFactory csf3 = new ClientSessionFactoryImpl(server3tc);
+// ClientSession session3 = csf3.createSession(false, true, true);
+//
+// ClientSessionFactory csf4 = new ClientSessionFactoryImpl(server4tc);
+// ClientSession session4 = csf4.createSession(false, true, true);
+//
+// session0.createQueue(testAddress, testAddress, null, false, false);
+// session1.createQueue(testAddress, testAddress, null, false, false);
+// session2.createQueue(testAddress, testAddress, null, false, false);
+// session3.createQueue(testAddress, testAddress, null, false, false);
+// session4.createQueue(testAddress, testAddress, null, false, false);
+//
+// ClientProducer prod0 = session0.createProducer(testAddress);
+//
+// ClientConsumer cons1 = session1.createConsumer(testAddress);
+// ClientConsumer cons2 = session2.createConsumer(testAddress);
+// ClientConsumer cons3 = session3.createConsumer(testAddress);
+// ClientConsumer cons4 = session4.createConsumer(testAddress);
+//
+// session1.start();
+// session2.start();
+// session3.start();
+// session4.start();
+//
+// SimpleString propKey = new SimpleString("beatle");
+//
+// ClientMessage messageJohn = session0.createClientMessage(false);
+// messageJohn.putStringProperty(propKey, new SimpleString("john"));
+// messageJohn.getBody().flip();
+//
+// ClientMessage messagePaul = session0.createClientMessage(false);
+// messagePaul.putStringProperty(propKey, new SimpleString("paul"));
+// messagePaul.getBody().flip();
+//
+// ClientMessage messageGeorge = session0.createClientMessage(false);
+// messageGeorge.putStringProperty(propKey, new SimpleString("george"));
+// messageGeorge.getBody().flip();
+//
+// ClientMessage messageRingo = session0.createClientMessage(false);
+// messageRingo.putStringProperty(propKey, new SimpleString("ringo"));
+// messageRingo.getBody().flip();
+//
+// ClientMessage messageOsama = session0.createClientMessage(false);
+// messageOsama.putStringProperty(propKey, new SimpleString("osama"));
+// messageOsama.getBody().flip();
+//
+// prod0.send(messageJohn);
+// prod0.send(messagePaul);
+// prod0.send(messageGeorge);
+// prod0.send(messageRingo);
+// prod0.send(messageOsama);
+//
+// ClientMessage r1 = cons1.receive(1000);
+// assertNotNull(r1);
+// assertEquals(new SimpleString("john"), r1.getProperty(propKey));
+// r1 = cons1.receiveImmediate();
+// assertNull(r1);
+//
+// ClientMessage r2 = cons2.receive(1000);
+// assertNotNull(r2);
+// assertEquals(new SimpleString("paul"), r2.getProperty(propKey));
+// r2 = cons2.receiveImmediate();
+// assertNull(r2);
+//
+// ClientMessage r3 = cons3.receive(1000);
+// assertNotNull(r3);
+// assertEquals(new SimpleString("george"), r3.getProperty(propKey));
+// r3 = cons3.receiveImmediate();
+// assertNull(r3);
+//
+// ClientMessage r4 = cons4.receive(1000);
+// assertNotNull(r4);
+// assertEquals(new SimpleString("ringo"), r4.getProperty(propKey));
+// r4 = cons4.receiveImmediate();
+// assertNull(r4);
+//
+// session0.close();
+// session1.close();
+// session2.close();
+// session3.close();
+// session4.close();
+//
+// service0.stop();
+// service1.stop();
+// service2.stop();
+// service3.stop();
+// service4.stop();
+//
+// assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service2.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service3.getServer().getRemotingService().getConnections().size());
+// assertEquals(0, service4.getServer().getRemotingService().getConnections().size());
+// }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.clearData();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
+
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageFlowTestBase.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -45,6 +45,22 @@
*/
public abstract class MessageFlowTestBase extends TestCase
{
+
+ protected MessagingService createMessagingServiceNIO(final int id, final Map<String, Object> params)
+ {
+ Configuration serviceConf = new ConfigurationImpl();
+ serviceConf.setClustered(true);
+ serviceConf.setSecurityEnabled(false);
+ serviceConf.setJournalMinFiles(2);
+ serviceConf.setJournalFileSize(100 * 1024);
+ params.put(TransportConstants.SERVER_ID_PROP_NAME, id);
+ serviceConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ params));
+ MessagingService service = MessagingServiceImpl.newMessagingService(serviceConf);
+ return service;
+ }
+
protected MessagingService createMessagingService(final int id, final Map<String, Object> params)
{
Configuration serviceConf = new ConfigurationImpl();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -74,7 +74,7 @@
managerImpl.start();
- TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"));
+ TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"), true);
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
@@ -123,7 +123,7 @@
false);
managerImpl.start();
- managerImpl.createPageStore(new SimpleString("simple-test"));
+ managerImpl.createPageStore(new SimpleString("simple-test"), true);
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -67,7 +67,7 @@
factory,
destinationTestName,
new QueueSettings(),
- executor);
+ executor, true);
storeImpl.start();
@@ -90,7 +90,7 @@
factory,
destinationTestName,
new QueueSettings(),
- executor);
+ executor, true);
storeImpl.start();
@@ -123,7 +123,7 @@
factory,
destinationTestName,
new QueueSettings(),
- executor);
+ executor, true);
storeImpl.start();
@@ -141,7 +141,7 @@
factory,
destinationTestName,
new QueueSettings(),
- executor);
+ executor, true);
storeImpl.start();
@@ -202,7 +202,7 @@
factory,
destinationTestName,
new QueueSettings(),
- executor);
+ executor, true);
storeImpl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -113,7 +113,7 @@
factory,
new SimpleString("test"),
settings,
- executor);
+ executor, true);
storeImpl.start();
@@ -268,7 +268,7 @@
factory,
new SimpleString("test"),
settings,
- executor);
+ executor, true);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-01-04 07:31:40 UTC (rev 5577)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-01-05 13:08:07 UTC (rev 5578)
@@ -135,7 +135,12 @@
{
return createService(realFiles, configuration, new HashMap<String, QueueSettings>());
}
-
+
+ protected MessagingService createClusteredServiceWithParams(final boolean realFiles, final Map<String, Object> params)
+ {
+ return createService(realFiles, createClusteredDefaultConfig(params, INVM_ACCEPTOR_FACTORY), new HashMap<String, QueueSettings>());
+ }
+
protected Configuration createDefaultConfig()
{
return createDefaultConfig(false);
@@ -145,16 +150,24 @@
{
if (netty)
{
- return createDefaultConfig(INVM_ACCEPTOR_FACTORY, NETTY_ACCEPTOR_FACTORY);
+ return createDefaultConfig(new HashMap<String, Object>(), INVM_ACCEPTOR_FACTORY, NETTY_ACCEPTOR_FACTORY);
}
else
{
- return createDefaultConfig(INVM_ACCEPTOR_FACTORY);
- }
+ return createDefaultConfig(new HashMap<String, Object>(), INVM_ACCEPTOR_FACTORY);
+ }
+ }
+
+ protected Configuration createClusteredDefaultConfig(final Map<String, Object> params, final String... acceptors)
+ {
+ Configuration config = createDefaultConfig(params, acceptors);
+ config.setClustered(true);
+
+ return config;
}
- protected Configuration createDefaultConfig(final String... acceptors)
+ protected Configuration createDefaultConfig(final Map<String, Object> params, final String... acceptors)
{
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
@@ -169,7 +182,7 @@
for (String acceptor : acceptors)
{
- TransportConfiguration transportConfig = new TransportConfiguration(acceptor);
+ TransportConfiguration transportConfig = new TransportConfiguration(acceptor, params);
configuration.getAcceptorConfigurations().add(transportConfig);
}
More information about the jboss-cvs-commits
mailing list