[jboss-cvs] JBoss Messaging SVN: r5367 - in trunk: src/main/org/jboss/messaging/core/paging and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Nov 15 04:37:29 EST 2008


Author: timfox
Date: 2008-11-15 04:37:29 -0500 (Sat, 15 Nov 2008)
New Revision: 5367

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
   trunk/src/main/org/jboss/messaging/core/paging/Page.java
   trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
   trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java
Log:
More on outflow


Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -75,7 +75,7 @@
 
    public static final int DEFAULT_JOURNAL_REUSE_BUFFER_SIZE = -1;
 
-   public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = false;
+   public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
 
    public static final boolean DEFAULT_MESSAGE_COUNTER_ENABLED = false;
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/LastPageRecord.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -34,17 +34,6 @@
  */
 public interface LastPageRecord extends EncodingSupport
 {
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
    /** Internal field with the primary key, used on the journal/database */
    long getRecordId();
 
@@ -60,13 +49,4 @@
 
    /** Last Page ID*/
    void setLastId(long lastId);
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/Page.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/Page.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -32,7 +32,6 @@
  */
 public interface Page
 {
-
    int getPageId();
 
    void write(PageMessage message) throws Exception;
@@ -50,5 +49,4 @@
    void close() throws Exception;
 
    void delete() throws Exception;
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -33,7 +33,6 @@
  */
 public interface PageTransactionInfo extends EncodingSupport
 {
-
    boolean waitCompletion() throws Exception;
 
    void complete();
@@ -53,5 +52,4 @@
    int getNumberOfMessages();
 
    void markIncomplete();
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -57,7 +57,6 @@
  */
 public interface PagingManager extends MessagingComponent
 {
-
    /** The system is paging because of global-page-mode */
    boolean isGlobalPageMode();
    

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingImpl.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -71,6 +71,7 @@
       return fanout;
    }
    
+   //TODO use better method for round-robin'ing - since this may wrap
    public long getRoutings()
    {
       return routings;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -248,7 +248,7 @@
          //I suspect it is being called after the managementservice has been shut down.
          log.warn("Failed to unregister queue", e);
       }
-      
+
       return binding;
    }
 
@@ -300,7 +300,7 @@
          {
             Binding theBinding = null;
             
-            long lowestRoutings = 0;
+            long lowestRoutings = -1;
             
             for (Binding binding : bindings)
             {
@@ -322,7 +322,7 @@
                      //We choose the queue with the lowest routings value  
                      long routings = binding.getRoutings();
                      
-                     if (routings <= lowestRoutings)
+                     if (routings <= lowestRoutings || lowestRoutings == -1)
                      {                        
                         //TODO - take num consumers into account
                         lowestRoutings = routings;
@@ -334,13 +334,14 @@
             }
             
             if (theBinding != null)
-            {
+            {             
                MessageReference reference = message.createReference(theBinding.getQueue());
 
                refs.add(reference);
                
                theBinding.incrementRoutings();
             }
+
          }
 
          return refs;

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -22,10 +22,11 @@
 package org.jboss.messaging.core.postoffice.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.HashMap;
 import java.util.Map;
 
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.postoffice.Address;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.util.SimpleString;
@@ -37,6 +38,8 @@
  */
 public class WildcardAddressManager extends SimpleAddressManager
 {
+   private static final Logger log = Logger.getLogger(WildcardAddressManager.class);
+   
    static final char SINGLE_WORD = '*';
 
    static final char ANY_WORDS = '#';
@@ -62,7 +65,7 @@
     * @return true if the address was a new mapping
     */
    public boolean addMapping(final SimpleString address, final Binding binding)
-   {
+   {      
       Address add = addAndUpdateAddressMap(address);
       if (!add.containsWildCard())
       {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/Forwarder.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -152,7 +152,7 @@
    public HandleStatus handle(final MessageReference reference) throws Exception
    {
       if (busy)
-      {
+      {         
          return HandleStatus.BUSY;
       }
 
@@ -190,10 +190,7 @@
       {
          synchronized (this)
          {
-            // /TODO initially we just send batch in one tx and then acknowledge in another tx locally
-            // In event of failure this could result in duplicates on restart.
-            // To remedy that we will implement duplicate detection on the sendee by adding a unique header
-            // in the first message in the tx, and storing it on the server side.
+            //TODO - duplicate detection on sendee and if batch size = 1 then don't need tx
    
             while (true)
             {
@@ -218,6 +215,8 @@
             createTx();
    
             busy = false;
+            
+            count = 0;
          }
          
          queue.deliverAsync(executor);
@@ -238,7 +237,7 @@
    }
 
    private void createTx()
-   {
+   {      
       tx = new TransactionImpl(storageManager, postOffice);
    }
 

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowBatchSizeTest.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -0,0 +1,223 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.OutflowConfiguration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A OutflowBatchSizeTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 09:06:55
+ *
+ *
+ */
+public class OutflowBatchSizeTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(OutflowBatchSizeTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService service0;
+
+   private MessagingService service1;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testBatchSize() throws Exception
+   {
+      Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setSecurityEnabled(false);
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+      service0Conf.getAcceptorConfigurations()
+                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                  service0Params));
+
+      Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setSecurityEnabled(false);
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                                              service1Params));
+      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                       
+      final int batchSize = 10;
+ 
+      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), null, true, batchSize, 0, connectors);
+      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0Conf.setOutFlowConfigurations(ofconfigs);
+
+      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+      service0.start();
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, true);
+
+      session1.createQueue(address1, address1, null, false, false, true);  
+      
+      ClientProducer prod0_1 = session0.createProducer(address1);
+         
+      ClientConsumer cons0_1 = session0.createConsumer(address1);
+      
+      ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+      session0.start();
+      
+      session1.start();
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int j = 0; j < 10; j++)
+      {
+          
+         for (int i = 0; i < batchSize - 1; i++)
+         {      
+            ClientMessage message = session0.createClientMessage(false);
+            message.putIntProperty(propKey, i);        
+            message.getBody().flip();
+                 
+            prod0_1.send(message);
+         }
+              
+         for (int i = 0; i < batchSize - 1; i++)
+         {
+            ClientMessage rmessage1 = cons0_1.receive(1000);
+            
+            assertNotNull(rmessage1);
+            
+            assertEquals(i, rmessage1.getProperty(propKey));         
+         }
+         
+         ClientMessage rmessage1 = cons1_1.receive(250);
+         
+         assertNull(rmessage1);
+         
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, batchSize - 1);        
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+         
+         rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(batchSize - 1, rmessage1.getProperty(propKey));  
+         
+         for (int i = 0; i < batchSize; i++)
+         {
+            rmessage1 = cons1_1.receive(1000);
+            
+            assertNotNull(rmessage1);
+            
+            assertEquals(i, rmessage1.getProperty(propKey));         
+         }
+      }
+            
+      session0.close();
+      
+      session1.close();
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service0.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+      service1.stop();
+
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+
+

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithFilterTest.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.OutflowConfiguration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A OutflowWithFilterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 08:58:49
+ *
+ *
+ */
+public class OutflowWithFilterTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(OutflowWithFilterTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService service0;
+
+   private MessagingService service1;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testWithWildcard() throws Exception
+   {
+      Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setSecurityEnabled(false);
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+      service0Conf.getAcceptorConfigurations()
+                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                  service0Params));
+
+      Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setSecurityEnabled(false);
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                                              service1Params));
+      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("testaddress");
+                 
+      final String filter = "selectorkey='ORANGES'";
+      
+      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", address1.toString(), filter, true, 1, 0, connectors);
+      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0Conf.setOutFlowConfigurations(ofconfigs);
+
+      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+      service0.start();
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, true);
+
+      session1.createQueue(address1, address1, null, false, false, true);  
+      
+      ClientProducer prod0_1 = session0.createProducer(address1);
+         
+      ClientConsumer cons0_1 = session0.createConsumer(address1);
+      
+      ClientConsumer cons1_1 = session1.createConsumer(address1);
+
+      session0.start();
+      
+      session1.start();
+      
+      final int numMessages = 100;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      final SimpleString propKey2 = new SimpleString("selectorkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.putStringProperty(propKey2, new SimpleString("ORANGES"));
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.putStringProperty(propKey2, new SimpleString("APPLES"));
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+      }
+   
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));
+         
+         ClientMessage rmessage2 = cons1_1.receive(1000);
+         
+         assertNotNull(rmessage2);
+         
+         assertEquals(i, rmessage2.getProperty(propKey));         
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));                      
+      }
+      
+      ClientMessage rmessage1 = cons0_1.receiveImmediate();
+      
+      assertNull(rmessage1);
+      
+      ClientMessage rmessage2 = cons1_1.receiveImmediate();
+      
+      assertNull(rmessage2);
+            
+      session0.close();
+      
+      session1.close();
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service0.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+      service1.stop();
+
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OutflowWithWildcardTest.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -0,0 +1,306 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.OutflowConfiguration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A OutflowWithWildcardTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 15 Nov 2008 08:19:07
+ *
+ *
+ */
+public class OutflowWithWildcardTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(OutflowWithWildcardTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService service0;
+
+   private MessagingService service1;
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testWithWildcard() throws Exception
+   {
+      Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setSecurityEnabled(false);
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+      service0Conf.getAcceptorConfigurations()
+                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                  service0Params));
+
+      Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setSecurityEnabled(false);
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                                              service1Params));
+      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
+      
+      final SimpleString address1 = new SimpleString("cheese.stilton");
+      
+      final SimpleString address2 = new SimpleString("cheese.wensleydale");
+      
+      final SimpleString address3 = new SimpleString("wine.shiraz");
+      
+      final SimpleString address4 = new SimpleString("wine.cabernet");
+      
+      final SimpleString match1 = new SimpleString("cheese.#");
+      
+            
+      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", match1.toString(), null, true, 1, 0, connectors);
+      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0Conf.setOutFlowConfigurations(ofconfigs);
+
+      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+      service0.start();
+      
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
+      
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
+      
+      ClientSession session0 = csf0.createSession(false, true, true);
+      
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(address1, address1, null, false, false, true);
+      session0.createQueue(address2, address2, null, false, false, true);
+      session0.createQueue(address3, address3, null, false, false, true);
+      session0.createQueue(address4, address4, null, false, false, true);     
+      
+      session1.createQueue(address1, address1, null, false, false, true);
+      session1.createQueue(address2, address2, null, false, false, true);
+      session1.createQueue(address3, address3, null, false, false, true);
+      session1.createQueue(address4, address4, null, false, false, true);     
+      
+      ClientProducer prod0_1 = session0.createProducer(address1);
+      ClientProducer prod0_2 = session0.createProducer(address2);
+      ClientProducer prod0_3 = session0.createProducer(address3);
+      ClientProducer prod0_4 = session0.createProducer(address4);      
+      
+      ClientConsumer cons0_1 = session0.createConsumer(address1);
+      ClientConsumer cons0_2 = session0.createConsumer(address2);
+      ClientConsumer cons0_3 = session0.createConsumer(address3);
+      ClientConsumer cons0_4 = session0.createConsumer(address4);
+      
+      ClientConsumer cons1_1 = session1.createConsumer(address1);
+      ClientConsumer cons1_2 = session1.createConsumer(address2);
+      ClientConsumer cons1_3 = session1.createConsumer(address3);
+      ClientConsumer cons1_4 = session1.createConsumer(address4);
+      
+      session0.start();
+      
+      session1.start();
+      
+      final int numMessages = 100;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_1.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_2.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_3.send(message);
+      }
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0_4.send(message);
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage1 = cons0_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));
+         
+         ClientMessage rmessage2 = cons0_2.receive(1000);
+         
+         assertNotNull(rmessage2);
+         
+         assertEquals(i, rmessage2.getProperty(propKey));
+         
+         ClientMessage rmessage3 = cons0_3.receive(1000);
+         
+         assertNotNull(rmessage3);
+         
+         assertEquals(i, rmessage3.getProperty(propKey));  
+         
+         ClientMessage rmessage4 = cons0_4.receive(1000);
+         
+         assertNotNull(rmessage4);
+         
+         assertEquals(i, rmessage4.getProperty(propKey));  
+      }
+      
+      ClientMessage rmessage1 = cons0_1.receiveImmediate();
+      
+      assertNull(rmessage1);
+      
+      ClientMessage rmessage2 = cons0_2.receiveImmediate();
+      
+      assertNull(rmessage2);
+            
+      ClientMessage rmessage3 = cons0_3.receiveImmediate();
+      
+      assertNull(rmessage3);
+      
+      ClientMessage rmessage4 = cons0_4.receiveImmediate();
+      
+      assertNull(rmessage4);
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         rmessage1 = cons1_1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));
+         
+         rmessage2 = cons1_2.receive(1000);
+         
+         assertNotNull(rmessage2);
+         
+         assertEquals(i, rmessage2.getProperty(propKey));         
+      }
+      
+      rmessage1 = cons1_1.receiveImmediate();
+      
+      assertNull(rmessage1);
+      
+      rmessage2 = cons1_2.receiveImmediate();
+      
+      assertNull(rmessage2);
+            
+      rmessage3 = cons1_3.receiveImmediate();
+      
+      assertNull(rmessage3);
+      
+      rmessage4 = cons1_4.receiveImmediate();
+      
+      assertNull(rmessage4);
+      
+      session0.close();
+      
+      session1.close();
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service0.stop();
+      
+      assertEquals(0, service0.getServer().getRemotingService().getConnections().size());
+
+      service1.stop();
+
+      assertEquals(0, service1.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SimpleOutflowTest.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -69,14 +69,14 @@
    private MessagingService service0;
 
    private MessagingService service1;
-
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
 
-   public void testSimpleOutflow() throws Exception
+   public void testSimpleOutflowFanout() throws Exception
    {
       Configuration service0Conf = new ConfigurationImpl();
       service0Conf.setSecurityEnabled(false);
@@ -136,29 +136,146 @@
       
       session1.start();
       
-      SimpleString propKey = new SimpleString("hello");
-      SimpleString propVal = new SimpleString("world");
+      final int numMessages = 100;
       
-      ClientMessage message = session0.createClientMessage(false);
-      message.putStringProperty(propKey, propVal);
-      message.getBody().flip();
-           
-      prod0.send(message);
+      final SimpleString propKey = new SimpleString("testkey");
       
-      ClientMessage rmessage0 = cons0.receive(1000);
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0.send(message);
+      }
       
-      assertNotNull(rmessage0);
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage rmessage0 = cons0.receive(1000);
+         
+         assertNotNull(rmessage0);
+         
+         assertEquals(i, rmessage0.getProperty(propKey));
+         
+         ClientMessage rmessage1 = cons1.receive(1000);
+         
+         assertNotNull(rmessage1);
+         
+         assertEquals(i, rmessage1.getProperty(propKey));  
+      }
+   }
+   
+   public void testSimpleOutflowRoundRobin() throws Exception
+   {
+      Configuration service0Conf = new ConfigurationImpl();
+      service0Conf.setSecurityEnabled(false);
+      Map<String, Object> service0Params = new HashMap<String, Object>();
+      service0Params.put(TransportConstants.SERVER_ID_PROP_NAME, 0);
+      service0Conf.getAcceptorConfigurations()
+                  .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                  service0Params));
+
+      Configuration service1Conf = new ConfigurationImpl();
+      service1Conf.setSecurityEnabled(false);
+      Map<String, Object> service1Params = new HashMap<String, Object>();
+      service1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      service1Conf.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                                              service1Params));
+      service1 = MessagingServiceImpl.newNullStorageMessagingServer(service1Conf);
+      service1.start();
+
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      TransportConfiguration server1tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service1Params);
+      connectors.add(server1tc);
       
-      assertEquals(propVal, rmessage0.getProperty(propKey));
+      final SimpleString testAddress = new SimpleString("testaddress");
+
+      OutflowConfiguration ofconfig = new OutflowConfiguration("outflow1", testAddress.toString(), null, false, 1, 0, connectors);
+      Set<OutflowConfiguration> ofconfigs = new HashSet<OutflowConfiguration>();
+      ofconfigs.add(ofconfig);
+      service0Conf.setOutFlowConfigurations(ofconfigs);
+
+      service0 = MessagingServiceImpl.newNullStorageMessagingServer(service0Conf);
+      service0.start();
       
-      ClientMessage rmessage1 = cons1.receive(1000);
+      TransportConfiguration server0tc = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                    service0Params);
       
-      assertNotNull(rmessage1);
+      ClientSessionFactory csf0 = new ClientSessionFactoryImpl(server0tc);
       
-      assertEquals(propVal, rmessage1.getProperty(propKey));
+      ClientSession session0 = csf0.createSession(false, true, true);
       
+      ClientSessionFactory csf1 = new ClientSessionFactoryImpl(server1tc);
+      
+      ClientSession session1 = csf1.createSession(false, true, true);
+      
+      session0.createQueue(testAddress, testAddress, null, false, false, false);
+      
+      session1.createQueue(testAddress, testAddress, null, false, false, false);
+      
+      ClientProducer prod0 = session0.createProducer(testAddress);
+      
+      ClientConsumer cons0 = session0.createConsumer(testAddress);
+      
+      ClientConsumer cons1 = session1.createConsumer(testAddress);
+      
+      session0.start();
+      
+      session1.start();
+      
+      final int numMessages = 100;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {      
+         ClientMessage message = session0.createClientMessage(false);
+         message.putIntProperty(propKey, i);
+         message.getBody().flip();
+              
+         prod0.send(message);
+      }
+      
+      ClientMessage msg = cons0.receive(1000);
+      
+      boolean toggle = msg != null;
+      
+      int i;
+      if (toggle)
+      {
+         assertEquals(0, msg.getProperty(propKey));
+         
+         i = 1;
+      }
+      else
+      {
+         i = 0;
+      }
+      
+      for (; i < numMessages; i++)
+      {
+         if (!toggle)
+         {
+            ClientMessage rmessage0 = cons0.receive(1000);
+            
+            assertNotNull(rmessage0);
+            
+            assertEquals(i, rmessage0.getProperty(propKey));
+         }
+         else
+         {
+            ClientMessage rmessage1 = cons1.receive(1000);
+            
+            assertNotNull(rmessage1);
+            
+            assertEquals(i, rmessage1.getProperty(propKey));  
+         }
+         
+         toggle = !toggle;
+      }
    }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java	2008-11-14 21:46:28 UTC (rev 5366)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/WildcardAddressManagerTest.java	2008-11-15 09:37:29 UTC (rev 5367)
@@ -22,7 +22,6 @@
 package org.jboss.messaging.tests.unit.core.postoffice.impl;
 
 import org.jboss.messaging.core.postoffice.impl.WildcardAddressManager;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>




More information about the jboss-cvs-commits mailing list