[hornetq-commits] JBoss hornetq SVN: r9024 - in trunk: src/main/org/hornetq/core/message/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Mar 31 07:31:38 EDT 2010


Author: timfox
Date: 2010-03-31 07:31:37 -0400 (Wed, 31 Mar 2010)
New Revision: 9024

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
fixed default address optimisation

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-03-31 11:31:37 UTC (rev 9024)
@@ -999,6 +999,9 @@
          }
 
          channel.setTransferring(false);         
+         
+         //Reset default address
+         defaultAddress = null;
       }
       catch (Throwable t)
       {

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-03-31 11:31:37 UTC (rev 9024)
@@ -76,7 +76,7 @@
 
    protected ResetLimitWrappedHornetQBuffer bodyBuffer;
 
-   protected boolean bufferValid;
+   protected volatile boolean bufferValid;
 
    private int endOfBodyPosition = -1;
 
@@ -244,7 +244,7 @@
       return address;
    }
 
-   public void setAddress(final SimpleString address)
+   public synchronized void setAddress(final SimpleString address)
    {
       if (this.address != address)
       {

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-03-31 11:31:37 UTC (rev 9024)
@@ -548,7 +548,7 @@
       }
 
       SimpleString address = message.getAddress();
-
+      
       setPagingStore(message);
 
       Object duplicateID = message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-03-31 11:31:37 UTC (rev 9024)
@@ -411,7 +411,7 @@
             // nodes could have same queue ids
             // Note we must copy since same message may get routed to other nodes which require different headers
             message = message.copy();
-
+            
             // TODO - we can optimise this
 
             Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-03-31 11:31:37 UTC (rev 9024)
@@ -911,10 +911,12 @@
    {
       long id = storageManager.generateUniqueID();
 
+      SimpleString address = message.getAddress();
+      
       message.setMessageID(id);
       message.encodeMessageIDToBuffer();
-      
-      if (message.getAddress() == null)
+          
+      if (address == null)
       {
          if (message.isDurable())
          {
@@ -942,7 +944,7 @@
       
       if (defaultAddress == null)
       {
-         defaultAddress = message.getAddress();
+         defaultAddress = address;
       }
    }
 

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-03-31 11:31:37 UTC (rev 9024)
@@ -95,7 +95,7 @@
    private boolean isDeliveryTransacted;
 
    private HornetQDestination destination;
-   
+
    /** The name of the temporary subscription name that all the sessions will share */
    private SimpleString topicTemporaryQueue;
 
@@ -220,7 +220,6 @@
       return isTopic;
    }
 
-   
    /**
     * Start the activation
     *
@@ -235,7 +234,7 @@
       deliveryActive.set(true);
       ra.getWorkManager().scheduleWork(new SetupActivation());
    }
-   
+
    /**
     * @return the topicTemporaryQueue
     */
@@ -287,12 +286,25 @@
       setupDestination();
       for (int i = 0; i < spec.getMaxSessionInt(); i++)
       {
-         ClientSession session = setupSession();
+         ClientSession session = null;
 
-         HornetQMessageHandler handler = new HornetQMessageHandler(this, session, i);
-         handler.setup();
-         session.start();
-         handlers.add(handler);
+         try
+         {
+            session = setupSession();
+            HornetQMessageHandler handler = new HornetQMessageHandler(this, session, i);
+            handler.setup();
+            session.start();
+            handlers.add(handler);
+         }
+         catch (Exception e)
+         {
+            if (session != null)
+            {
+               session.close();
+            }
+            
+            throw e;
+         }
       }
 
       HornetQActivation.log.info("Setup complete " + this);
@@ -426,11 +438,11 @@
                // If there is no binding on naming, we will just create a new instance
                if (isTopic)
                {
-                  destination = (HornetQDestination) HornetQJMSClient.createTopic(destinationName.substring(destinationName.lastIndexOf('/') + 1));
+                  destination = (HornetQDestination)HornetQJMSClient.createTopic(destinationName.substring(destinationName.lastIndexOf('/') + 1));
                }
                else
                {
-                  destination = (HornetQDestination) HornetQJMSClient.createQueue(destinationName.substring(destinationName.lastIndexOf('/') + 1));
+                  destination = (HornetQDestination)HornetQJMSClient.createQueue(destinationName.substring(destinationName.lastIndexOf('/') + 1));
                }
             }
          }
@@ -452,11 +464,11 @@
       {
          if (Topic.class.getName().equals(spec.getDestinationType()))
          {
-            destination = (HornetQDestination) HornetQJMSClient.createTopic(spec.getDestination());
+            destination = (HornetQDestination)HornetQJMSClient.createTopic(spec.getDestination());
          }
          else
          {
-            destination = (HornetQDestination) HornetQJMSClient.createQueue(spec.getDestination());
+            destination = (HornetQDestination)HornetQJMSClient.createQueue(spec.getDestination());
          }
       }
 

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-03-31 11:31:37 UTC (rev 9024)
@@ -134,8 +134,6 @@
 
       fail(session, latch);
 
-      FailoverTest.log.info("got here 1");
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
@@ -158,7 +156,6 @@
          }
       }
 
-      FailoverTest.log.info("closing session");
       session.close();
 
       Assert.assertEquals(0, sf.numSessions());
@@ -228,7 +225,6 @@
          message.acknowledge();
       }
 
-      FailoverTest.log.info("closing session");
       session.close();
 
       Assert.assertEquals(0, sf.numSessions());
@@ -1166,7 +1162,6 @@
       {
          public void connectionFailed(final HornetQException me)
          {
-            FailoverTest.log.info("calling listener");
             latch.countDown();
          }
       }
@@ -1224,8 +1219,6 @@
 
       boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
 
-      FailoverTest.log.info("waited for latch");
-
       Assert.assertTrue(ok);
 
       try
@@ -1976,24 +1969,19 @@
             {
                sf.addInterceptor(interceptor);
 
-               FailoverTest.log.info("attempting commit");
                session.commit();
             }
             catch (HornetQException e)
             {
                if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
                {
-                  FailoverTest.log.info("got transaction rolled back");
-
                   // Ok - now we retry the commit after removing the interceptor
 
                   sf.removeInterceptor(interceptor);
 
                   try
                   {
-                     FailoverTest.log.info("trying to commit again");
                      session.commit();
-                     FailoverTest.log.info("committed again ok");
 
                      failed = false;
                   }
@@ -2017,8 +2005,6 @@
 
       fail(session, latch);
 
-      FailoverTest.log.info("connection has failed");
-
       committer.join();
 
       Assert.assertFalse(committer.failed);
@@ -2270,7 +2256,6 @@
     */
    protected void setBody(final int i, final ClientMessage message) throws Exception
    {
-      log.info("in failovertest:: setbody");
       message.getBodyBuffer().writeString("message" + i);
    }
 



More information about the hornetq-commits mailing list