[jboss-cvs] JBoss Messaging SVN: r5971 - in trunk/src/main/org/jboss/messaging: core/management/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Mar 3 11:23:54 EST 2009


Author: timfox
Date: 2009-03-03 11:23:54 -0500 (Tue, 03 Mar 2009)
New Revision: 5971

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
Log:
made some management classes thread safe

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-03-03 16:23:54 UTC (rev 5971)
@@ -710,7 +710,9 @@
       if (consumer != null)
       {
          ClientMessageInternal clMessage = message.getClientMessage();
+         
          clMessage.setFlowControlSize(clMessage.getEncodeSize());
+         
          consumer.handleMessage(message.getClientMessage());
       }
    }
@@ -742,14 +744,14 @@
       {
          return;
       }
-
+   
       try
       {
          closeChildren();
          
          closedSent = true;
          
-         channel.sendBlocking(new SessionCloseMessage());   
+         channel.sendBlocking(new SessionCloseMessage());           
       }
       catch (Throwable ignore)
       {

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-03-03 16:23:54 UTC (rev 5971)
@@ -602,7 +602,6 @@
 
    private boolean reconnect(final int retries)
    {
-      log.info("reconnecting");
       // We fail over sessions per connection to ensure there is the same mapping of channel id
       // on live and backup connections
       
@@ -748,7 +747,7 @@
       if (refCount == 0)
       {
          // Close connections
-
+         
          Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
 
          connections.clear();
@@ -756,13 +755,13 @@
          for (ConnectionEntry entry : copy)
          {
             try
-            {
+            {               
                entry.connection.destroy();
-
+               
                entry.connector.close();
             }
             catch (Throwable ignore)
-            {
+            {               
             }
          }
 

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-03-03 16:23:54 UTC (rev 5971)
@@ -274,9 +274,7 @@
       return configuration.isSecurityEnabled();
    }
 
-   // TODO - do we really need this method?
-
-   public void createQueue(final String address, final String name) throws Exception
+   public synchronized void createQueue(final String address, final String name) throws Exception
    {
       SimpleString sAddress = new SimpleString(address);
       SimpleString sName = new SimpleString(name);
@@ -289,7 +287,7 @@
       }
    }
 
-   public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
+   public synchronized void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
    {
       SimpleString sAddress = new SimpleString(address);
       SimpleString sName = new SimpleString(name);
@@ -311,7 +309,7 @@
       }
    }
 
-   public void destroyQueue(final String name) throws Exception
+   public synchronized void destroyQueue(final String name) throws Exception
    {
       SimpleString sName = new SimpleString(name);
       Binding binding = postOffice.getBinding(sName);
@@ -419,7 +417,7 @@
       return s;
    }
 
-   public boolean commitPreparedTransaction(String transactionAsBase64) throws Exception
+   public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
    {
       List<Xid> xids = resourceManager.getPreparedTransactions();
 
@@ -435,7 +433,7 @@
       return false;
    }
 
-   public boolean rollbackPreparedTransaction(String transactionAsBase64) throws Exception
+   public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception
    {
       List<Xid> xids = resourceManager.getPreparedTransactions();
 
@@ -479,7 +477,7 @@
       return (String[])remoteConnections.toArray(new String[remoteConnections.size()]);
    }
 
-   public boolean closeConnectionsForAddress(final String ipAddress)
+   public synchronized boolean closeConnectionsForAddress(final String ipAddress)
    {
       boolean closed = false;
       Set<RemotingConnection> connections = remotingService.getConnections();

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2009-03-03 16:23:54 UTC (rev 5971)
@@ -148,6 +148,7 @@
    public String getDeadLetterAddress()
    {
       AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
+      
       if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
       {
          return addressSettings.getDeadLetterAddress().toString();
@@ -158,7 +159,7 @@
       }
    }
 
-   public void setDeadLetterAddress(String deadLetterAddress) throws Exception
+   public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
    {
       AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
 
@@ -171,6 +172,7 @@
    public String getExpiryAddress()
    {
       AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
+      
       if (addressSettings != null && addressSettings.getExpiryAddress() != null)
       {
          return addressSettings.getExpiryAddress().toString();
@@ -181,7 +183,7 @@
       }
    }
 
-   public void setExpiryAddress(String expiryAddres) throws Exception
+   public void setExpiryAddress(final String expiryAddres) throws Exception
    {
       AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
 
@@ -291,9 +293,10 @@
       }
    }
 
-   public int removeMatchingMessages(String filterStr) throws Exception
+   public int removeMatchingMessages(final String filterStr) throws Exception
    {
       Filter filter = FilterImpl.createFilter(filterStr);
+      
       return queue.deleteMatchingReferences(filter);
    }
 
@@ -307,6 +310,7 @@
       try
       {
          Filter filter = FilterImpl.createFilter(filterStr);
+         
          return queue.expireMessages(filter);
       }
       catch (MessagingException e)
@@ -318,6 +322,7 @@
    public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
    {
       Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+      
       if (binding == null)
       {
          throw new IllegalArgumentException("No queue found for " + otherQueueName);
@@ -326,10 +331,12 @@
       return queue.moveMessage(messageID, binding.getAddress());
    }
 
-   public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
+   public int moveMatchingMessages(final String filterStr, final String otherQueueName) throws Exception
    {
       Filter filter = FilterImpl.createFilter(filterStr);
+      
       Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+      
       if (binding == null)
       {
          throw new IllegalArgumentException("No queue found for " + otherQueueName);

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-03 16:23:54 UTC (rev 5971)
@@ -21,6 +21,25 @@
  */
 package org.jboss.messaging.integration.transports.netty;
 
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
@@ -41,8 +60,6 @@
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
-import static org.jboss.netty.channel.Channels.pipeline;
-import static org.jboss.netty.channel.Channels.write;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.UpstreamMessageEvent;
@@ -64,21 +81,6 @@
 import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.jboss.netty.handler.ssl.SslHandler;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A NettyConnector
  *
@@ -89,7 +91,7 @@
 {
    // Constants -----------------------------------------------------
 
-   private static final Logger log = Logger.getLogger(NettyConnection.class);
+   private static final Logger log = Logger.getLogger(NettyConnector.class);
 
    // Attributes ----------------------------------------------------
 
@@ -325,7 +327,7 @@
       {
          return;
       }
-
+      
       bootstrap = null;
       channelFactory = null;
       if (bossExecutor != null)
@@ -333,6 +335,7 @@
          bossExecutor.shutdown();
       }
       workerExecutor.shutdown();
+       
       if (bossExecutor != null)
       {
          for (; ;)
@@ -350,7 +353,7 @@
             }
          }
       }
-
+      
       for (Connection connection : connections.values())
       {
          listener.connectionDestroyed(connection.getID());

Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java	2009-03-03 16:23:54 UTC (rev 5971)
@@ -144,7 +144,7 @@
       return messagingServer.getVersion();
    }
 
-   public boolean createQueue(final String queueName, final String jndiBinding) throws Exception
+   public synchronized boolean createQueue(final String queueName, final String jndiBinding) throws Exception
    {
       JBossQueue jBossQueue = new JBossQueue(queueName);      
       messagingServer.createQueue(jBossQueue.getAddress(), jBossQueue.getAddress());
@@ -163,7 +163,7 @@
       return added;
    }
 
-   public boolean createTopic(final String topicName, final String jndiBinding) throws Exception
+   public synchronized boolean createTopic(final String topicName, final String jndiBinding) throws Exception
    {
       JBossTopic jBossTopic = new JBossTopic(topicName);
       //We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS checks when routing messages to a topic that
@@ -178,7 +178,7 @@
       return added;
    }
 
-   public boolean undeployDestination(final String name) throws Exception
+   public synchronized boolean undeployDestination(final String name) throws Exception
    {
       List<String> jndiBindings = destinations.get(name);
       if (jndiBindings == null || jndiBindings.size() == 0)
@@ -192,7 +192,7 @@
       return true;
    }
 
-   public boolean destroyQueue(final String name) throws Exception
+   public synchronized boolean destroyQueue(final String name) throws Exception
    {
       undeployDestination(name);
 
@@ -203,7 +203,7 @@
       return true;
    }
 
-   public boolean destroyTopic(final String name) throws Exception
+   public synchronized boolean destroyTopic(final String name) throws Exception
    {
       undeployDestination(name);
 
@@ -214,7 +214,7 @@
       return true;
    }
 
-   public boolean createConnectionFactory(final String name,
+   public synchronized boolean createConnectionFactory(final String name,
                                           final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
                                           final String connectionLoadBalancingPolicyClassName,
                                           final long pingPeriod,
@@ -273,7 +273,7 @@
       return true;
    }
 
-   public boolean createConnectionFactory(final String name,
+   public synchronized boolean createConnectionFactory(final String name,
                                           final DiscoveryGroupConfiguration discoveryGroupConfig,
                                           final long discoveryInitialWait,
                                           final String connectionLoadBalancingPolicyClassName,
@@ -336,7 +336,7 @@
       return true;
    }
 
-   public boolean destroyConnectionFactory(final String name) throws Exception
+   public synchronized boolean destroyConnectionFactory(final String name) throws Exception
    {
       List<String> jndiBindings = connectionFactoryBindings.get(name);
       if (jndiBindings == null || jndiBindings.size() == 0)




More information about the jboss-cvs-commits mailing list