[jboss-cvs] JBoss Messaging SVN: r5971 - in trunk/src/main/org/jboss/messaging: core/management/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Mar 3 11:23:54 EST 2009
Author: timfox
Date: 2009-03-03 11:23:54 -0500 (Tue, 03 Mar 2009)
New Revision: 5971
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
Log:
made some management classes thread safe
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-03 16:23:54 UTC (rev 5971)
@@ -710,7 +710,9 @@
if (consumer != null)
{
ClientMessageInternal clMessage = message.getClientMessage();
+
clMessage.setFlowControlSize(clMessage.getEncodeSize());
+
consumer.handleMessage(message.getClientMessage());
}
}
@@ -742,14 +744,14 @@
{
return;
}
-
+
try
{
closeChildren();
closedSent = true;
- channel.sendBlocking(new SessionCloseMessage());
+ channel.sendBlocking(new SessionCloseMessage());
}
catch (Throwable ignore)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-03-03 16:23:54 UTC (rev 5971)
@@ -602,7 +602,6 @@
private boolean reconnect(final int retries)
{
- log.info("reconnecting");
// We fail over sessions per connection to ensure there is the same mapping of channel id
// on live and backup connections
@@ -748,7 +747,7 @@
if (refCount == 0)
{
// Close connections
-
+
Set<ConnectionEntry> copy = new HashSet<ConnectionEntry>(connections.values());
connections.clear();
@@ -756,13 +755,13 @@
for (ConnectionEntry entry : copy)
{
try
- {
+ {
entry.connection.destroy();
-
+
entry.connector.close();
}
catch (Throwable ignore)
- {
+ {
}
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-03-03 16:23:54 UTC (rev 5971)
@@ -274,9 +274,7 @@
return configuration.isSecurityEnabled();
}
- // TODO - do we really need this method?
-
- public void createQueue(final String address, final String name) throws Exception
+ public synchronized void createQueue(final String address, final String name) throws Exception
{
SimpleString sAddress = new SimpleString(address);
SimpleString sName = new SimpleString(name);
@@ -289,7 +287,7 @@
}
}
- public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
+ public synchronized void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
{
SimpleString sAddress = new SimpleString(address);
SimpleString sName = new SimpleString(name);
@@ -311,7 +309,7 @@
}
}
- public void destroyQueue(final String name) throws Exception
+ public synchronized void destroyQueue(final String name) throws Exception
{
SimpleString sName = new SimpleString(name);
Binding binding = postOffice.getBinding(sName);
@@ -419,7 +417,7 @@
return s;
}
- public boolean commitPreparedTransaction(String transactionAsBase64) throws Exception
+ public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
{
List<Xid> xids = resourceManager.getPreparedTransactions();
@@ -435,7 +433,7 @@
return false;
}
- public boolean rollbackPreparedTransaction(String transactionAsBase64) throws Exception
+ public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception
{
List<Xid> xids = resourceManager.getPreparedTransactions();
@@ -479,7 +477,7 @@
return (String[])remoteConnections.toArray(new String[remoteConnections.size()]);
}
- public boolean closeConnectionsForAddress(final String ipAddress)
+ public synchronized boolean closeConnectionsForAddress(final String ipAddress)
{
boolean closed = false;
Set<RemotingConnection> connections = remotingService.getConnections();
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2009-03-03 16:23:54 UTC (rev 5971)
@@ -148,6 +148,7 @@
public String getDeadLetterAddress()
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
+
if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
{
return addressSettings.getDeadLetterAddress().toString();
@@ -158,7 +159,7 @@
}
}
- public void setDeadLetterAddress(String deadLetterAddress) throws Exception
+ public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
@@ -171,6 +172,7 @@
public String getExpiryAddress()
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
+
if (addressSettings != null && addressSettings.getExpiryAddress() != null)
{
return addressSettings.getExpiryAddress().toString();
@@ -181,7 +183,7 @@
}
}
- public void setExpiryAddress(String expiryAddres) throws Exception
+ public void setExpiryAddress(final String expiryAddres) throws Exception
{
AddressSettings addressSettings = addressSettingsRepository.getMatch(getName());
@@ -291,9 +293,10 @@
}
}
- public int removeMatchingMessages(String filterStr) throws Exception
+ public int removeMatchingMessages(final String filterStr) throws Exception
{
Filter filter = FilterImpl.createFilter(filterStr);
+
return queue.deleteMatchingReferences(filter);
}
@@ -307,6 +310,7 @@
try
{
Filter filter = FilterImpl.createFilter(filterStr);
+
return queue.expireMessages(filter);
}
catch (MessagingException e)
@@ -318,6 +322,7 @@
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
{
Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+
if (binding == null)
{
throw new IllegalArgumentException("No queue found for " + otherQueueName);
@@ -326,10 +331,12 @@
return queue.moveMessage(messageID, binding.getAddress());
}
- public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
+ public int moveMatchingMessages(final String filterStr, final String otherQueueName) throws Exception
{
Filter filter = FilterImpl.createFilter(filterStr);
+
Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+
if (binding == null)
{
throw new IllegalArgumentException("No queue found for " + otherQueueName);
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-03 16:23:54 UTC (rev 5971)
@@ -21,6 +21,25 @@
*/
package org.jboss.messaging.integration.transports.netty;
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
@@ -41,8 +60,6 @@
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
-import static org.jboss.netty.channel.Channels.pipeline;
-import static org.jboss.netty.channel.Channels.write;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;
@@ -64,21 +81,6 @@
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
/**
* A NettyConnector
*
@@ -89,7 +91,7 @@
{
// Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(NettyConnection.class);
+ private static final Logger log = Logger.getLogger(NettyConnector.class);
// Attributes ----------------------------------------------------
@@ -325,7 +327,7 @@
{
return;
}
-
+
bootstrap = null;
channelFactory = null;
if (bossExecutor != null)
@@ -333,6 +335,7 @@
bossExecutor.shutdown();
}
workerExecutor.shutdown();
+
if (bossExecutor != null)
{
for (; ;)
@@ -350,7 +353,7 @@
}
}
}
-
+
for (Connection connection : connections.values())
{
listener.connectionDestroyed(connection.getID());
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2009-03-03 16:02:17 UTC (rev 5970)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2009-03-03 16:23:54 UTC (rev 5971)
@@ -144,7 +144,7 @@
return messagingServer.getVersion();
}
- public boolean createQueue(final String queueName, final String jndiBinding) throws Exception
+ public synchronized boolean createQueue(final String queueName, final String jndiBinding) throws Exception
{
JBossQueue jBossQueue = new JBossQueue(queueName);
messagingServer.createQueue(jBossQueue.getAddress(), jBossQueue.getAddress());
@@ -163,7 +163,7 @@
return added;
}
- public boolean createTopic(final String topicName, final String jndiBinding) throws Exception
+ public synchronized boolean createTopic(final String topicName, final String jndiBinding) throws Exception
{
JBossTopic jBossTopic = new JBossTopic(topicName);
//We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS checks when routing messages to a topic that
@@ -178,7 +178,7 @@
return added;
}
- public boolean undeployDestination(final String name) throws Exception
+ public synchronized boolean undeployDestination(final String name) throws Exception
{
List<String> jndiBindings = destinations.get(name);
if (jndiBindings == null || jndiBindings.size() == 0)
@@ -192,7 +192,7 @@
return true;
}
- public boolean destroyQueue(final String name) throws Exception
+ public synchronized boolean destroyQueue(final String name) throws Exception
{
undeployDestination(name);
@@ -203,7 +203,7 @@
return true;
}
- public boolean destroyTopic(final String name) throws Exception
+ public synchronized boolean destroyTopic(final String name) throws Exception
{
undeployDestination(name);
@@ -214,7 +214,7 @@
return true;
}
- public boolean createConnectionFactory(final String name,
+ public synchronized boolean createConnectionFactory(final String name,
final List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
final String connectionLoadBalancingPolicyClassName,
final long pingPeriod,
@@ -273,7 +273,7 @@
return true;
}
- public boolean createConnectionFactory(final String name,
+ public synchronized boolean createConnectionFactory(final String name,
final DiscoveryGroupConfiguration discoveryGroupConfig,
final long discoveryInitialWait,
final String connectionLoadBalancingPolicyClassName,
@@ -336,7 +336,7 @@
return true;
}
- public boolean destroyConnectionFactory(final String name) throws Exception
+ public synchronized boolean destroyConnectionFactory(final String name) throws Exception
{
List<String> jndiBindings = connectionFactoryBindings.get(name);
if (jndiBindings == null || jndiBindings.size() == 0)
More information about the jboss-cvs-commits
mailing list