[jboss-cvs] JBoss Messaging SVN: r5367 - in trunk: src/main/org/jboss/messaging/core/paging and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Nov 15 04:37:29 EST 2008
Author: timfox
Date: 2008-11-15 04:37:29 -0500 (Sat, 15 Nov 2008)
New Revision: 5367
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
trunk/src/main/org/jboss/messaging/core/paging/Page.java
trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java
Log:
More on outflow
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -75,7 +75,7 @@
public static final int DEFAULT_JOURNAL_REUSE_BUFFER_SIZE = -1;
- public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = false;
+ public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
public static final boolean DEFAULT_MESSAGE_COUNTER_ENABLED = false;
Modified: trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -34,17 +34,6 @@
*/
public interface LastPageRecord extends EncodingSupport
{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
/** Internal field with the primary key, used on the journal/database */
long getRecordId();
@@ -60,13 +49,4 @@
/** Last Page ID*/
void setLastId(long lastId);
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/Page.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/Page.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -32,7 +32,6 @@
*/
public interface Page
{
-
int getPageId();
void write(PageMessage message) throws Exception;
@@ -50,5 +49,4 @@
void close() throws Exception;
void delete() throws Exception;
-
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -33,7 +33,6 @@
*/
public interface PageTransactionInfo extends EncodingSupport
{
-
boolean waitCompletion() throws Exception;
void complete();
@@ -53,5 +52,4 @@
int getNumberOfMessages();
void markIncomplete();
-
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -57,7 +57,6 @@
*/
public interface PagingManager extends MessagingComponent
{
-
/** The system is paging because of global-page-mode */
boolean isGlobalPageMode();
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -71,6 +71,7 @@
return fanout;
}
+ //TODO use better method for round-robin'ing - since this may wrap
public long getRoutings()
{
return routings;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -248,7 +248,7 @@
//I suspect it is being called after the managementservice has been shut down.
log.warn("Failed to unregister queue", e);
}
-
+
return binding;
}
@@ -300,7 +300,7 @@
{
Binding theBinding = null;
- long lowestRoutings = 0;
+ long lowestRoutings = -1;
for (Binding binding : bindings)
{
@@ -322,7 +322,7 @@
//We choose the queue with the lowest routings value
long routings = binding.getRoutings();
- if (routings <= lowestRoutings)
+ if (routings <= lowestRoutings || lowestRoutings == -1)
{
//TODO - take num consumers into account
lowestRoutings = routings;
@@ -334,13 +334,14 @@
}
if (theBinding != null)
- {
+ {
MessageReference reference = message.createReference(theBinding.getQueue());
refs.add(reference);
theBinding.incrementRoutings();
}
+
}
return refs;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -22,10 +22,11 @@
package org.jboss.messaging.core.postoffice.impl;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.HashMap;
import java.util.Map;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.postoffice.Address;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.util.SimpleString;
@@ -37,6 +38,8 @@
*/
public class WildcardAddressManager extends SimpleAddressManager
{
+ private static final Logger log = Logger.getLogger(WildcardAddressManager.class);
+
static final char SINGLE_WORD = '*';
static final char ANY_WORDS = '#';
@@ -62,7 +65,7 @@
* @return true if the address was a new mapping
*/
public boolean addMapping(final SimpleString address, final Binding binding)
- {
+ {
Address add = addAndUpdateAddressMap(address);
if (!add.containsWildCard())
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -152,7 +152,7 @@
public HandleStatus handle(final MessageReference reference) throws Exception
{
if (busy)
- {
+ {
return HandleStatus.BUSY;
}
@@ -190,10 +190,7 @@
{
synchronized (this)
{
- // /TODO initially we just send batch in one tx and then acknowledge in another tx locally
- // In event of failure this could result in duplicates on restart.
- // To remedy that we will implement duplicate detection on the sendee by adding a unique header
- // in the first message in the tx, and storing it on the server side.
+ //TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
while (true)
{
@@ -218,6 +215,8 @@
createTx();
busy = false;
+
+ count = 0;
}
queue.deliverAsync(executor);
@@ -238,7 +237,7 @@
}
private void createTx()
- {
+ {
tx = new TransactionImpl(storageManager, postOffice);
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -0,0 +1,223 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.OutflowConfiguration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A OutflowBatchSizeTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 09:06:55
+ *
+ *
+ */
+public class OutflowBatchSizeTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService service0;
+
+ private MessagingService service1;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testBatchSize() throws Exception
+ {
+ Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setSecurityEnabled(false);
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+ service0Conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service0Params));
+
+ Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setSecurityEnabled(false);
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service1Params));
+ service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final int batchSize = 10;
+
+ OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, connectors);
+ Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0Conf.setOutFlowConfigurations(ofconfigs);
+
+ service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, true);
+
+ session1.createQueue(address1, address1, null, false, false, true);
+
+ ClientProducer prod0_1 = session0.createProducer(address1);
+
+ ClientConsumer cons0_1 = session0.createConsumer(address1);
+
+ ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+ session0.start();
+
+ session1.start();
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int j = 0; j < 10; j++)
+ {
+
+ for (int i = 0; i < batchSize - 1; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+
+ for (int i = 0; i < batchSize - 1; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+
+ ClientMessage rmessage1 = cons1_1.receive(250);
+
+ assertNull(rmessage1);
+
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, batchSize - 1);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+
+ rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(batchSize - 1, rmessage1.getProperty(propKey));
+
+ for (int i = 0; i < batchSize; i++)
+ {
+ rmessage1 = cons1_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+ }
+
+ session0.close();
+
+ session1.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service0.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+ service1.stop();
+
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
+
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.OutflowConfiguration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A OutflowWithFilterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 08:58:49
+ *
+ *
+ */
+public class OutflowWithFilterTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(OutflowWithFilterTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService service0;
+
+ private MessagingService service1;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testWithWildcard() throws Exception
+ {
+ Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setSecurityEnabled(false);
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+ service0Conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service0Params));
+
+ Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setSecurityEnabled(false);
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service1Params));
+ service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("testaddress");
+
+ final String filter = "selectorkey='ORANGES'";
+
+ OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, connectors);
+ Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0Conf.setOutFlowConfigurations(ofconfigs);
+
+ service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, true);
+
+ session1.createQueue(address1, address1, null, false, false, true);
+
+ ClientProducer prod0_1 = session0.createProducer(address1);
+
+ ClientConsumer cons0_1 = session0.createConsumer(address1);
+
+ ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+ session0.start();
+
+ session1.start();
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ final SimpleString propKey2 = new SimpleString("selectorkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.putStringProperty(propKey2, new SimpleString("ORANGES"));
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.putStringProperty(propKey2, new SimpleString("APPLES"));
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+
+ ClientMessage rmessage2 = cons1_1.receive(1000);
+
+ assertNotNull(rmessage2);
+
+ assertEquals(i, rmessage2.getProperty(propKey));
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+
+ ClientMessage rmessage1 = cons0_1.receiveImmediate();
+
+ assertNull(rmessage1);
+
+ ClientMessage rmessage2 = cons1_1.receiveImmediate();
+
+ assertNull(rmessage2);
+
+ session0.close();
+
+ session1.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service0.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+ service1.stop();
+
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -0,0 +1,306 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.OutflowConfiguration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A OutflowWithWildcardTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 15 Nov 2008 08:19:07
+ *
+ *
+ */
+public class OutflowWithWildcardTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(OutflowWithWildcardTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService service0;
+
+ private MessagingService service1;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testWithWildcard() throws Exception
+ {
+ Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setSecurityEnabled(false);
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+ service0Conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service0Params));
+
+ Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setSecurityEnabled(false);
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service1Params));
+ service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
+
+ final SimpleString address1 = new SimpleString("cheese.stilton");
+
+ final SimpleString address2 = new SimpleString("cheese.wensleydale");
+
+ final SimpleString address3 = new SimpleString("wine.shiraz");
+
+ final SimpleString address4 = new SimpleString("wine.cabernet");
+
+ final SimpleString match1 = new SimpleString("cheese.#");
+
+
+ OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", match1.toString(), null, true, 1, 0, connectors);
+ Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0Conf.setOutFlowConfigurations(ofconfigs);
+
+ service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+ service0.start();
+
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
+
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+
+ ClientSession session0 = csf0.createSession(false, true, true);
+
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(address1, address1, null, false, false, true);
+ session0.createQueue(address2, address2, null, false, false, true);
+ session0.createQueue(address3, address3, null, false, false, true);
+ session0.createQueue(address4, address4, null, false, false, true);
+
+ session1.createQueue(address1, address1, null, false, false, true);
+ session1.createQueue(address2, address2, null, false, false, true);
+ session1.createQueue(address3, address3, null, false, false, true);
+ session1.createQueue(address4, address4, null, false, false, true);
+
+ ClientProducer prod0_1 = session0.createProducer(address1);
+ ClientProducer prod0_2 = session0.createProducer(address2);
+ ClientProducer prod0_3 = session0.createProducer(address3);
+ ClientProducer prod0_4 = session0.createProducer(address4);
+
+ ClientConsumer cons0_1 = session0.createConsumer(address1);
+ ClientConsumer cons0_2 = session0.createConsumer(address2);
+ ClientConsumer cons0_3 = session0.createConsumer(address3);
+ ClientConsumer cons0_4 = session0.createConsumer(address4);
+
+ ClientConsumer cons1_1 = session1.createConsumer(address1);
+ ClientConsumer cons1_2 = session1.createConsumer(address2);
+ ClientConsumer cons1_3 = session1.createConsumer(address3);
+ ClientConsumer cons1_4 = session1.createConsumer(address4);
+
+ session0.start();
+
+ session1.start();
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_1.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_2.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_3.send(message);
+ }
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0_4.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage1 = cons0_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+
+ ClientMessage rmessage2 = cons0_2.receive(1000);
+
+ assertNotNull(rmessage2);
+
+ assertEquals(i, rmessage2.getProperty(propKey));
+
+ ClientMessage rmessage3 = cons0_3.receive(1000);
+
+ assertNotNull(rmessage3);
+
+ assertEquals(i, rmessage3.getProperty(propKey));
+
+ ClientMessage rmessage4 = cons0_4.receive(1000);
+
+ assertNotNull(rmessage4);
+
+ assertEquals(i, rmessage4.getProperty(propKey));
+ }
+
+ ClientMessage rmessage1 = cons0_1.receiveImmediate();
+
+ assertNull(rmessage1);
+
+ ClientMessage rmessage2 = cons0_2.receiveImmediate();
+
+ assertNull(rmessage2);
+
+ ClientMessage rmessage3 = cons0_3.receiveImmediate();
+
+ assertNull(rmessage3);
+
+ ClientMessage rmessage4 = cons0_4.receiveImmediate();
+
+ assertNull(rmessage4);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ rmessage1 = cons1_1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+
+ rmessage2 = cons1_2.receive(1000);
+
+ assertNotNull(rmessage2);
+
+ assertEquals(i, rmessage2.getProperty(propKey));
+ }
+
+ rmessage1 = cons1_1.receiveImmediate();
+
+ assertNull(rmessage1);
+
+ rmessage2 = cons1_2.receiveImmediate();
+
+ assertNull(rmessage2);
+
+ rmessage3 = cons1_3.receiveImmediate();
+
+ assertNull(rmessage3);
+
+ rmessage4 = cons1_4.receiveImmediate();
+
+ assertNull(rmessage4);
+
+ session0.close();
+
+ session1.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service0.stop();
+
+ assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+ service1.stop();
+
+ assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
+
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -69,14 +69,14 @@
private MessagingService service0;
private MessagingService service1;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- public void testSimpleOutflow() throws Exception
+ public void testSimpleOutflowFanout() throws Exception
{
Configuration service0Conf = new ConfigurationImpl();
service0Conf.setSecurityEnabled(false);
@@ -136,29 +136,146 @@
session1.start();
- SimpleString propKey = new SimpleString("hello");
- SimpleString propVal = new SimpleString("world");
+ final int numMessages = 100;
- ClientMessage message = session0.createClientMessage(false);
- message.putStringProperty(propKey, propVal);
- message.getBody().flip();
-
- prod0.send(message);
+ final SimpleString propKey = new SimpleString("testkey");
- ClientMessage rmessage0 = cons0.receive(1000);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
- assertNotNull(rmessage0);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage rmessage0 = cons0.receive(1000);
+
+ assertNotNull(rmessage0);
+
+ assertEquals(i, rmessage0.getProperty(propKey));
+
+ ClientMessage rmessage1 = cons1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+ }
+
+ public void testSimpleOutflowRoundRobin() throws Exception
+ {
+ Configuration service0Conf = new ConfigurationImpl();
+ service0Conf.setSecurityEnabled(false);
+ Map<String, Object> service0Params = new HashMap<String, Object>();
+ service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+ service0Conf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service0Params));
+
+ Configuration service1Conf = new ConfigurationImpl();
+ service1Conf.setSecurityEnabled(false);
+ Map<String, Object> service1Params = new HashMap<String, Object>();
+ service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ service1Params));
+ service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+ service1.start();
+
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service1Params);
+ connectors.add(server1tc);
- assertEquals(propVal, rmessage0.getProperty(propKey));
+ final SimpleString testAddress = new SimpleString("testaddress");
+
+ OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, connectors);
+ Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+ ofconfigs.add(ofconfig);
+ service0Conf.setOutFlowConfigurations(ofconfigs);
+
+ service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+ service0.start();
- ClientMessage rmessage1 = cons1.receive(1000);
+ TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ service0Params);
- assertNotNull(rmessage1);
+ ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
- assertEquals(propVal, rmessage1.getProperty(propKey));
+ ClientSession session0 = csf0.createSession(false, true, true);
+ ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+
+ ClientSession session1 = csf1.createSession(false, true, true);
+
+ session0.createQueue(testAddress, testAddress, null, false, false, false);
+
+ session1.createQueue(testAddress, testAddress, null, false, false, false);
+
+ ClientProducer prod0 = session0.createProducer(testAddress);
+
+ ClientConsumer cons0 = session0.createConsumer(testAddress);
+
+ ClientConsumer cons1 = session1.createConsumer(testAddress);
+
+ session0.start();
+
+ session1.start();
+
+ final int numMessages = 100;
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
+ message.putIntProperty(propKey, i);
+ message.getBody().flip();
+
+ prod0.send(message);
+ }
+
+ ClientMessage msg = cons0.receive(1000);
+
+ boolean toggle = msg != null;
+
+ int i;
+ if (toggle)
+ {
+ assertEquals(0, msg.getProperty(propKey));
+
+ i = 1;
+ }
+ else
+ {
+ i = 0;
+ }
+
+ for (; i < numMessages; i++)
+ {
+ if (!toggle)
+ {
+ ClientMessage rmessage0 = cons0.receive(1000);
+
+ assertNotNull(rmessage0);
+
+ assertEquals(i, rmessage0.getProperty(propKey));
+ }
+ else
+ {
+ ClientMessage rmessage1 = cons1.receive(1000);
+
+ assertNotNull(rmessage1);
+
+ assertEquals(i, rmessage1.getProperty(propKey));
+ }
+
+ toggle = !toggle;
+ }
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java 2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java 2008-11-15 09:37:29 UTC (rev 5367)
@@ -22,7 +22,6 @@
package org.jboss.messaging.tests.unit.core.postoffice.impl;
import org.jboss.messaging.core.postoffice.impl.WildcardAddressManager;
-import org.jboss.messaging.util.SimpleString;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
More information about the jboss-cvs-commits
mailing list