[hornetq-commits] JBoss hornetq SVN: r8524 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 3 09:05:00 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-03 09:05:00 -0500 (Thu, 03 Dec 2009)
New Revision: 8524

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
Removing waitOnCompletion at QueueImpl + reverting commit on SessionFactory.

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2009-12-03 14:02:04 UTC (rev 8523)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2009-12-03 14:05:00 UTC (rev 8524)
@@ -264,54 +264,58 @@
       }
    }
 
-   private void initialise() throws Exception
+   private synchronized void initialise() throws Exception
    {
-      setThreadPools();
+      if (!readOnly)
+      {
+         readOnly = true;
+         setThreadPools();
 
-      instantiateLoadBalancingPolicy();
+         instantiateLoadBalancingPolicy();
 
-      if (discoveryAddress != null)
-      {
-         InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
+         if (discoveryAddress != null)
+         {
+            InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
 
-         discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
-                                                 discoveryAddress,
-                                                 groupAddress,
-                                                 discoveryPort,
-                                                 discoveryRefreshTimeout);
+            discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+                                                    discoveryAddress,
+                                                    groupAddress,
+                                                    discoveryPort,
+                                                    discoveryRefreshTimeout);
 
-         discoveryGroup.registerListener(this);
+            discoveryGroup.registerListener(this);
 
-         discoveryGroup.start();
-      }
-      else if (staticConnectors != null)
-      {
-         for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
+            discoveryGroup.start();
+         }
+         else if (staticConnectors != null)
          {
-            FailoverManager cm = new FailoverManagerImpl(this,
-                                                         pair.a,
-                                                         pair.b,
-                                                         failoverOnServerShutdown,
-                                                         callTimeout,
-                                                         clientFailureCheckPeriod,
-                                                         connectionTTL,
-                                                         retryInterval,
-                                                         retryIntervalMultiplier,
-                                                         maxRetryInterval,
-                                                         reconnectAttempts,
-                                                         threadPool,
-                                                         scheduledThreadPool,
-                                                         interceptors);
+            for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
+            {
+               FailoverManager cm = new FailoverManagerImpl(this,
+                                                            pair.a,
+                                                            pair.b,
+                                                            failoverOnServerShutdown,
+                                                            callTimeout,
+                                                            clientFailureCheckPeriod,
+                                                            connectionTTL,
+                                                            retryInterval,
+                                                            retryIntervalMultiplier,
+                                                            maxRetryInterval,
+                                                            reconnectAttempts,
+                                                            threadPool,
+                                                            scheduledThreadPool,
+                                                            interceptors);
 
-            failoverManagerMap.put(pair, cm);
+               failoverManagerMap.put(pair, cm);
+            }
+
+            updatefailoverManagerArray();
          }
-
-         updatefailoverManagerArray();
+         else
+         {
+            throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
+         }
       }
-      else
-      {
-         throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
-      }
    }
 
    // Static
@@ -1077,7 +1081,7 @@
       }
    }
 
-   private synchronized ClientSession createSessionInternal(final String username,
+   private ClientSession createSessionInternal(final String username,
                                                final String password,
                                                final boolean xa,
                                                final boolean autoCommitSends,
@@ -1100,8 +1104,6 @@
          {
             throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
          }
-
-         readOnly = true;
       }
 
       if (discoveryGroup != null && !receivedBroadcast)
@@ -1115,32 +1117,35 @@
          }
       }
 
-      int pos = loadBalancingPolicy.select(failoverManagerArray.length);
+      synchronized (this)
+      {
+         int pos = loadBalancingPolicy.select(failoverManagerArray.length);
 
-      FailoverManager failoverManager = failoverManagerArray[pos];
+         FailoverManager failoverManager = failoverManagerArray[pos];
 
-      ClientSession session = failoverManager.createSession(username,
-                                                            password,
-                                                            xa,
-                                                            autoCommitSends,
-                                                            autoCommitAcks,
-                                                            preAcknowledge,
-                                                            ackBatchSize,
-                                                            cacheLargeMessagesClient,
-                                                            minLargeMessageSize,
-                                                            blockOnAcknowledge,
-                                                            autoGroup,
-                                                            confirmationWindowSize,
-                                                            producerWindowSize,
-                                                            consumerWindowSize,
-                                                            producerMaxRate,
-                                                            consumerMaxRate,
-                                                            blockOnNonPersistentSend,
-                                                            blockOnPersistentSend,
-                                                            initialMessagePacketSize,
-                                                            groupID);
+         ClientSession session = failoverManager.createSession(username,
+                                                               password,
+                                                               xa,
+                                                               autoCommitSends,
+                                                               autoCommitAcks,
+                                                               preAcknowledge,
+                                                               ackBatchSize,
+                                                               cacheLargeMessagesClient,
+                                                               minLargeMessageSize,
+                                                               blockOnAcknowledge,
+                                                               autoGroup,
+                                                               confirmationWindowSize,
+                                                               producerWindowSize,
+                                                               consumerWindowSize,
+                                                               producerMaxRate,
+                                                               consumerMaxRate,
+                                                               blockOnNonPersistentSend,
+                                                               blockOnPersistentSend,
+                                                               initialMessagePacketSize,
+                                                               groupID);
 
-      return session;
+         return session;
+      }
    }
 
    private void instantiateLoadBalancingPolicy()

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-12-03 14:02:04 UTC (rev 8523)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-12-03 14:05:00 UTC (rev 8524)
@@ -922,7 +922,6 @@
       if (message.isDurable() && durable)
       {
          storageManager.updateDeliveryCount(reference);
-         storageManager.waitOnOperations();
       }
 
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@@ -932,7 +931,6 @@
       if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
       {
          sendToDeadLetterAddress(reference);
-         storageManager.waitOnOperations();
 
          return false;
       }



More information about the hornetq-commits mailing list