[jboss-cvs] JBoss Messaging SVN: r3647 - in trunk: src/main/org/jboss/jms/server/endpoint and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 30 12:45:21 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-01-30 12:45:21 -0500 (Wed, 30 Jan 2008)
New Revision: 3647
Added:
trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/PacketFilter.java
Modified:
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryAdvisedPacketHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/Configuration.java
trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
trunk/tests/etc/ConfigurationTest-config.xml
trunk/tests/src/org/jboss/jms/server/test/unit/ConfigurationTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ReversePacketHandler.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/TestPacketHandler.java
trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptor.java
trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptorB.java
trunk/tests/src/org/jboss/test/messaging/jms/interception/PacketFilterTest.java
Log:
JBMESSAGING-1225 - Interceptors cleanup
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerPacketHandler.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -6,7 +6,9 @@
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.util.Logger;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -16,6 +18,8 @@
*/
public class ClientConsumerPacketHandler implements PacketHandler
{
+ private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
+
private final ClientConsumer clientConsumer;
private final String consumerID;
@@ -37,7 +41,7 @@
return consumerID;
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
try
{
@@ -51,8 +55,8 @@
}
catch (Exception e)
{
- // TODO Auto-generated catch block
- e.printStackTrace();
+ // TODO: Is there anything else to be done here if an exception happens?
+ log.error(e.getMessage(), e);
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryAdvisedPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryAdvisedPacketHandler.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryAdvisedPacketHandler.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionRequest;
import org.jboss.messaging.core.remoting.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.ExceptionUtil;
@@ -68,7 +69,7 @@
return ClientConnectionFactoryImpl.id;
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
try
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -47,6 +47,7 @@
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.Logger;
@@ -288,11 +289,11 @@
return ServerBrowserEndpoint.this.id;
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
try
{
- AbstractPacket response = null;
+ Packet response = null;
PacketType type = packet.getType();
if (type == REQ_BROWSER_HASNEXTMESSAGE)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -64,6 +64,7 @@
import org.jboss.messaging.core.remoting.wireformat.GetClientIDResponse;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.SetClientIDMessage;
import org.jboss.messaging.core.tx.MessagingXid;
@@ -564,11 +565,11 @@
return ServerConnectionEndpoint.this.id;
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
try
{
- AbstractPacket response = null;
+ Packet response = null;
PacketType type = packet.getType();
if (type == REQ_CREATESESSION)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -46,6 +46,7 @@
import org.jboss.messaging.core.remoting.wireformat.ConsumerChangeRateMessage;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.Logger;
@@ -552,11 +553,11 @@
return ServerConsumerEndpoint.this.id;
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
try
{
- AbstractPacket response = null;
+ Packet response = null;
PacketType type = packet.getType();
if (type == MSG_CHANGERATE)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -77,6 +77,7 @@
import org.jboss.messaging.core.remoting.wireformat.DeleteTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.NullPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
import org.jboss.messaging.core.remoting.wireformat.SessionAcknowledgeMessage;
import org.jboss.messaging.core.remoting.wireformat.SessionCancelMessage;
@@ -1493,11 +1494,11 @@
return ServerSessionEndpoint.this.id;
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
try
{
- AbstractPacket response = null;
+ Packet response = null;
PacketType type = packet.getType();
if (type == MSG_SENDMESSAGE)
Modified: trunk/src/main/org/jboss/messaging/core/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Configuration.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/Configuration.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -34,7 +34,9 @@
import java.beans.PropertyChangeSupport;
import java.io.Serializable;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
/**
* This is the JBM configuration. It is used to configure the MessagingServer.
@@ -65,6 +67,7 @@
private String _defaultTopicJNDIContext = "";
private String _securityDomain;
private HashSet<Role> _securityConfig;
+ private List<String> defaultInterceptors;
private String _defaultDLQ;
// The default maximum number of delivery attempts before sending to DLQ - can be overridden on
// the destination
@@ -160,6 +163,25 @@
}
_securityConfig = securityConfig;
}
+
+ NodeList defaultInterceptors = e.getElementsByTagName("default-interceptors-config");
+
+ ArrayList<String> interceptorList = new ArrayList<String>();
+ if (defaultInterceptors.getLength() > 0)
+ {
+
+ NodeList interceptors = defaultInterceptors.item(0).getChildNodes();
+ for (int k = 0; k < interceptors.getLength(); k++)
+ {
+ if ("interceptor".equalsIgnoreCase(interceptors.item(k).getNodeName()))
+ {
+ String clazz = interceptors.item(k).getAttributes().getNamedItem("class").getNodeValue();
+ interceptorList.add(clazz);
+ }
+ }
+ }
+ this.defaultInterceptors = interceptorList;
+
}
private Boolean getBoolean(Element e, String name, Boolean def)
@@ -256,6 +278,11 @@
{
return _securityConfig;
}
+
+ public List<String> getDefaultInterceptors()
+ {
+ return defaultInterceptors;
+ }
public void setSecurityConfig(HashSet<Role> securityConfig)
{
Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -57,6 +57,8 @@
import org.jboss.messaging.core.impl.memory.SimpleMemoryManager;
import org.jboss.messaging.core.impl.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.impl.postoffice.PostOfficeImpl;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.deployers.queue.QueueSettingsDeployer;
import org.jboss.messaging.deployers.security.SecurityDeployer;
@@ -189,6 +191,21 @@
ConnectionFactoryAdvisedPacketHandler connectionFactoryAdvisedPacketHandler =
new ConnectionFactoryAdvisedPacketHandler(this);
getRemotingService().getDispatcher().register(connectionFactoryAdvisedPacketHandler);
+
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ for (String interceptorClass: configuration.getDefaultInterceptors())
+ {
+ try
+ {
+ Class clazz = loader.loadClass(interceptorClass);
+ getRemotingService().addInterceptor((Interceptor)clazz.newInstance());
+ }
+ catch (Exception e)
+ {
+ log.warn("Error instantiating interceptor \"" + interceptorClass + "\"", e);
+ }
+ }
+
started = true;
log.info("JBoss Messaging " + getVersion().getProviderVersion() + " server [" +
configuration.getMessagingServerID() + "] started");
Copied: trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java (from rev 3641, trunk/src/main/org/jboss/messaging/core/remoting/PacketFilter.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/Interceptor.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.jms.exception.MessagingJMSException;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+
+/**
+ *
+ * This is class is a simple way to intercepting server calls on JBoss Messaging.
+ * Deploying it to the POJO Container (Micro Container) is then all you need to do.
+ *
+ * This gives you the option of deploying it on any package you like, as long as you define your Filter.
+ *
+ * Example of configuration file:
+ *
+ * <code>
+ <?xml version="1.0" encoding="UTF-8"?>
+ <deployment xmlns="urn:jboss:bean-deployer:2.0">
+ <bean name="MyInterceptor" class="a.b.MyClassImplementingPacketFilter"/>
+ </deployment>
+ </code>
+ *
+ * Note: This interface only captures messages from client2Server. If you need to capture server at client calls you should substitute the sender parameter by an inner class.
+ *
+ * @author clebert.suconic at jboss.com
+ */
+public interface Interceptor
+{
+ /**
+ * If you need to intercept a return value, you could create your own implementation of PacketSender and recover the return value.
+ *
+ * @param packet
+ * @param handler
+ * @param sender
+ * @return false if the Packet transmission should be interrupted after this call
+ */
+ void intercept(Packet packet) throws MessagingJMSException;
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -9,12 +9,14 @@
import static org.jboss.messaging.core.remoting.Assert.assertValidID;
import static org.jboss.messaging.core.remoting.wireformat.AbstractPacket.NO_ID_SET;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.SetSessionIDMessage;
import org.jboss.messaging.util.Logger;
@@ -32,7 +34,7 @@
// Attributes ----------------------------------------------------
private Map<String, PacketHandler> handlers;
- private List<PacketFilter> filters;
+ private List<Interceptor> filters;
// Static --------------------------------------------------------
@@ -46,7 +48,7 @@
handlers = new ConcurrentHashMap<String, PacketHandler>();
}
- public PacketDispatcher(List<PacketFilter> filters)
+ public PacketDispatcher(List<Interceptor> filters)
{
this();
this.filters = filters;
@@ -88,7 +90,7 @@
return handlers.get(handlerID);
}
- public void dispatch(AbstractPacket packet, PacketSender sender)
+ public void dispatch(Packet packet, PacketSender sender)
{
//FIXME better separation between client and server PacketDispatchers
if (this != client)
@@ -114,47 +116,38 @@
if (log.isTraceEnabled())
log.trace(handler + " handles " + packet);
- if (fireFilter(packet, handler, sender))
+ try
{
+ callFilters(packet);
handler.handle(packet, sender);
}
+ catch (MessagingJMSException e)
+ {
+ sender.send(new JMSExceptionMessage(e));
+ }
+
} else
{
log.error("Unhandled packet " + packet);
}
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected boolean fireFilter(AbstractPacket packet, PacketHandler handler, PacketSender sender)
+ /** Call filters on a package */
+ public void callFilters(Packet packet) throws MessagingJMSException
{
- if (filters == null)
+ if (filters != null)
{
- return true;
- }
- else
- {
- for (PacketFilter filter: filters)
+ for (Interceptor filter: filters)
{
- try
- {
- if (!filter.filterMessage(packet, handler, sender))
- {
- log.info("Filter " + filter.getClass().getName() + " Cancelled packet " + packet);
- return false;
- }
- }
- catch (Exception ignored)
- {
- }
+ filter.intercept(packet);
}
-
- return true;
}
}
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/PacketFilter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketFilter.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketFilter.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -1,43 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.messaging.core.remoting;
-
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
-
-/**
- *
- * This is class is a simple way to intercepting server calls on JBoss Messaging.
- * Deploying it to the POJO Container (Micro Container) is then all you need to do.
- *
- * This gives you the option of deploying it on any package you like, as long as you define your Filter.
- *
- * Example of configuration file:
- *
- * <code>
- <?xml version="1.0" encoding="UTF-8"?>
- <deployment xmlns="urn:jboss:bean-deployer:2.0">
- <bean name="MyInterceptor" class="a.b.MyClassImplementingPacketFilter"/>
- </deployment>
- </code>
- *
- * Note: This interface only captures messages from client2Server. If you need to capture server at client calls you should substitute the sender parameter by an inner class.
- *
- * @author clebert.suconic at jboss.com
- */
-public interface PacketFilter
-{
- /**
- * If you need to intercept a return value, you could create your own implementation of PacketSender and recover the return value.
- *
- * @param packet
- * @param handler
- * @param sender
- * @return false if the Packet transmission should be interrupted after this call
- */
- boolean filterMessage(AbstractPacket packet, PacketHandler handler, PacketSender sender);
-}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketHandler.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -6,7 +6,7 @@
*/
package org.jboss.messaging.core.remoting;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
/**
* A PacketHandler handles packets (as defined by {@link AbstractPacket} and its
@@ -34,5 +34,5 @@
*/
String getID();
- void handle(AbstractPacket packet, PacketSender sender);
+ void handle(Packet packet, PacketSender sender);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketSender.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -6,7 +6,7 @@
*/
package org.jboss.messaging.core.remoting;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -16,7 +16,7 @@
*/
public interface PacketSender
{
- void send(AbstractPacket packet);
+ void send(Packet packet);
String getSessionID();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -19,4 +19,9 @@
PacketDispatcher getDispatcher();
RemotingConfiguration getRemotingConfiguration();
+
+ void addInterceptor(Interceptor interceptor);
+
+ void removeInterceptor(Interceptor interceptor);
+
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/invm/INVMSession.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -10,10 +10,13 @@
import java.util.concurrent.TimeUnit;
+import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
+import org.jboss.messaging.util.Logger;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -32,6 +35,7 @@
private PacketDispatcher serverDispatcher;
// Static --------------------------------------------------------
+ private static final Logger log = Logger.getLogger(INVMSession.class);
// Constructors --------------------------------------------------
@@ -70,9 +74,17 @@
serverDispatcher.dispatch((AbstractPacket) object,
new PacketSender()
{
- public void send(AbstractPacket response)
+ public void send(Packet response)
{
- PacketDispatcher.client.dispatch(response, null);
+ try
+ {
+ serverDispatcher.callFilters(response);
+ PacketDispatcher.client.dispatch(response, null);
+ }
+ catch (MessagingJMSException e)
+ {
+ log.warn("An interceptor throwed an exception what caused the packet " + response + " to be ignored", e);
+ }
}
public String getSessionID()
@@ -86,14 +98,23 @@
long timeout, TimeUnit timeUnit) throws Throwable
{
request.setCorrelationID(correlationCounter++);
- final AbstractPacket[] responses = new AbstractPacket[1];
+ final Packet[] responses = new Packet[1];
serverDispatcher.dispatch(request,
new PacketSender()
{
- public void send(AbstractPacket response)
+ public void send(Packet response)
{
- responses[0] = response;
+ try
+ {
+ serverDispatcher.callFilters(response);
+ responses[0] = response;
+ }
+ catch (MessagingJMSException e)
+ {
+ log.warn("An interceptor throwed an exception what caused the packet " + response + " to be ignored", e);
+ responses[0] = null;
+ }
}
public String getSessionID()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -12,9 +12,11 @@
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveTimeoutException;
import org.apache.mina.filter.reqres.Response;
+import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.Ping;
import org.jboss.messaging.util.Logger;
@@ -97,9 +99,18 @@
AbstractPacket packet = (AbstractPacket) message;
PacketSender sender = new PacketSender()
{
- public void send(AbstractPacket p)
+ public void send(Packet p)
{
- session.write(p);
+ try
+ {
+ dispatcher.callFilters(p);
+ session.write(p);
+ }
+ catch (MessagingJMSException e)
+ {
+ log.warn("An interceptor throwed an exception what caused the packet " + p + " to be ignored", e);
+ }
+
}
public String getSessionID()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -25,7 +25,7 @@
import org.jboss.messaging.core.remoting.ConnectionExceptionListener;
import org.jboss.messaging.core.remoting.KeepAliveFactory;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketFilter;
+import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.RemotingConfiguration;
import org.jboss.messaging.core.remoting.RemotingService;
import org.jboss.messaging.util.Logger;
@@ -56,7 +56,7 @@
private KeepAliveFactory factory;
- private List<PacketFilter> filters = new CopyOnWriteArrayList<PacketFilter>();
+ private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
// Static --------------------------------------------------------
@@ -78,13 +78,13 @@
}
@Install
- public void addFilter(PacketFilter filter)
+ public void addInterceptor(Interceptor filter)
{
this.filters.add(filter);
}
@Uninstall
- public void removeFilter(PacketFilter filter)
+ public void removeInterceptor(Interceptor filter)
{
this.filters.remove(filter);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/AbstractPacket.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -15,7 +15,7 @@
*
* @version <tt>$Revision$</tt>
*/
-public class AbstractPacket
+public class AbstractPacket implements Packet
{
// Constants -----------------------------------------------------
@@ -103,7 +103,7 @@
return oneWay;
}
- public void normalize(AbstractPacket other)
+ public void normalize(Packet other)
{
assert other != null;
Modified: trunk/tests/etc/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/etc/ConfigurationTest-config.xml 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/etc/ConfigurationTest-config.xml 2008-01-30 17:45:21 UTC (rev 3647)
@@ -4,6 +4,10 @@
<default-queue-jndi-context>/queuetest</default-queue-jndi-context>
<default-topic-jndi-context>/topictest</default-topic-jndi-context>
<security-domain>java:/jaas/messagingtest</security-domain>
+ <default-interceptors-config>
+ <interceptor class="org.jboss.tst"/>
+ <interceptor class="org.jboss.tst2"/>
+ </default-interceptors-config>
<default-security-config>
<role name="guest1" read="true" write="true" create="true"/>
<role name="guest2" read="true" write="true" create="false"/>
Modified: trunk/tests/src/org/jboss/jms/server/test/unit/ConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/jms/server/test/unit/ConfigurationTest.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/src/org/jboss/jms/server/test/unit/ConfigurationTest.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -186,6 +186,13 @@
assertEquals(10000, remotingConfig.getPort());
assertEquals(100, remotingConfig.getTimeout());
}
+ public void testSetInterceptorsList() throws Exception
+ {
+ assertEquals("Didn't get the correct number of elements on interceptors", 2, configuration.getDefaultInterceptors().size());
+ assertEquals("org.jboss.tst", configuration.getDefaultInterceptors().get(0));
+ assertEquals("org.jboss.tst2", configuration.getDefaultInterceptors().get(1));
+ }
+
public void testPropertyChangeListener() throws Exception
{
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ReversePacketHandler.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ReversePacketHandler.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/ReversePacketHandler.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -18,6 +18,7 @@
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.test.unit.TestPacketHandler;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
/**
@@ -56,7 +57,7 @@
// TestPacketHandler overrides -----------------------------------
@Override
- protected void doHandle(AbstractPacket packet, PacketSender sender)
+ protected void doHandle(Packet packet, PacketSender sender)
{
Assert.assertTrue(packet instanceof TextPacket);
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -25,6 +25,7 @@
import org.jboss.messaging.core.remoting.impl.mina.MinaService;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -152,7 +153,7 @@
return handlerID;
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
messagesReceived++;
if (messagesReceived % spinner == 0)
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/TestPacketHandler.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/TestPacketHandler.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/TestPacketHandler.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -15,6 +15,7 @@
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
public class TestPacketHandler implements PacketHandler
@@ -46,7 +47,7 @@
this.latch = new CountDownLatch(count);
}
- public void handle(AbstractPacket packet, PacketSender sender)
+ public void handle(Packet packet, PacketSender sender)
{
packets.add((TextPacket) packet);
@@ -56,7 +57,7 @@
latch.countDown();
}
- protected void doHandle(AbstractPacket packet, PacketSender sender)
+ protected void doHandle(Packet packet, PacketSender sender)
{
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptor.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptor.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptor.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -7,38 +7,49 @@
package org.jboss.test.messaging.jms.interception;
-import org.jboss.messaging.core.remoting.PacketFilter;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.jms.exception.MessagingJMSException;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.util.Logger;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-public class DummyInterceptor implements PacketFilter
+public class DummyInterceptor implements Interceptor
{
protected Logger log = Logger.getLogger(DummyInterceptor.class);
- static boolean status = true;
- static SynchronizedInt syncCounter = new SynchronizedInt(0);
+ boolean sendException = false;
+ boolean changeMessage = false;
+ SynchronizedInt syncCounter = new SynchronizedInt(0);
- public static int getCounter()
+ public int getCounter()
{
return syncCounter.get();
}
- public static void clearCounter()
+ public void clearCounter()
{
syncCounter.set(0);
}
- public boolean filterMessage(AbstractPacket packet, PacketHandler handler,
- PacketSender sender)
+ public void intercept(Packet packet) throws MessagingJMSException
{
+ log.info("DummyFilter packet = " + packet.getClass().getName());
syncCounter.add(1);
- log.info("DummyFilter packet = " + packet + " handler = " + handler + " sender = " + sender);
-
- return status;
+ if (sendException)
+ {
+ throw new MessagingJMSException("Test");
+ }
+ if (changeMessage)
+ {
+ if (packet instanceof DeliverMessage)
+ {
+ DeliverMessage deliver = (DeliverMessage)packet;
+ log.info("msg = " + deliver.getMessage().getClass().getName());
+ deliver.getMessage().getHeaders().put("DummyInterceptor", "was here");
+ }
+ }
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptorB.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptorB.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/src/org/jboss/test/messaging/jms/interception/DummyInterceptorB.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -7,15 +7,14 @@
package org.jboss.test.messaging.jms.interception;
-import org.jboss.messaging.core.remoting.PacketFilter;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketSender;
-import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.jms.exception.MessagingJMSException;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.wireformat.Packet;
import org.jboss.messaging.util.Logger;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-public class DummyInterceptorB implements PacketFilter
+public class DummyInterceptorB implements Interceptor
{
protected Logger log = Logger.getLogger(DummyInterceptorB.class);
@@ -31,13 +30,11 @@
{
syncCounter.set(0);
}
-
- public boolean filterMessage(AbstractPacket packet, PacketHandler handler,
- PacketSender sender)
+
+ public void intercept(Packet packet) throws MessagingJMSException
{
- log.info("InterceptorB received " + packet);
syncCounter.add(1);
- return true;
+ log.info("DummyFilter packet = " + packet);
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/interception/PacketFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/interception/PacketFilterTest.java 2008-01-30 15:44:42 UTC (rev 3646)
+++ trunk/tests/src/org/jboss/test/messaging/jms/interception/PacketFilterTest.java 2008-01-30 17:45:21 UTC (rev 3647)
@@ -7,8 +7,13 @@
package org.jboss.test.messaging.jms.interception;
+import java.util.UUID;
+
import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import org.jboss.kernel.spi.deployment.KernelDeployment;
import org.jboss.test.messaging.jms.JMSTestCase;
@@ -26,66 +31,160 @@
public void testFilter() throws Throwable
{
- KernelDeployment packetFilterDeployment = servers.get(0).deployXML("packetFilterDeployment", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><deployment xmlns=\"urn:jboss:bean-deployer:2.0\"><bean name=\"DummyInterceptionTest\" class=\"org.jboss.test.messaging.jms.interception.DummyInterceptor\"/></deployment>");
-
-
- DummyInterceptor.status=false;
+ DummyInterceptor interceptorA = null;
+ KernelDeployment packetFilterDeploymentB = null;
+ Connection conn = null;
+
try
{
- Connection conn = cf.createConnection();
- fail("Exception expected");
+
+ // Deploy using the API
+ interceptorA = new DummyInterceptor();
+ servers.get(0).getMessagingServer().getRemotingService().addInterceptor(interceptorA);
+
+
+ interceptorA.sendException=true;
+ try
+ {
+ conn = cf.createConnection();
+ fail("Exception expected");
+ }
+ catch (Exception e)
+ {
+ conn = null;
+ }
+
+ interceptorA.sendException=false;
+
+ conn = cf.createConnection();
+ conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.close();
+ conn = null;
+
+
+ assertEquals(0, DummyInterceptorB.getCounter());
+ assertTrue(interceptorA.getCounter() > 0);
+
+ interceptorA.clearCounter();
+ DummyInterceptorB.clearCounter();
+
+
+ // deploy using MC
+ packetFilterDeploymentB = servers.get(0).deployXML("packetFilterDeploymentB", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><deployment xmlns=\"urn:jboss:bean-deployer:2.0\"><bean name=\"DummyInterceptionTestB\" class=\"org.jboss.test.messaging.jms.interception.DummyInterceptorB\"/></deployment>");
+
+ conn = cf.createConnection();
+ conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.close();
+ conn = null;
+
+ assertTrue(DummyInterceptorB.getCounter() > 0);
+ assertTrue(interceptorA.getCounter() > 0);
+
+ interceptorA.clearCounter();
+ DummyInterceptorB.clearCounter();
+
+ servers.get(0).getMessagingServer().getRemotingService().removeInterceptor(interceptorA);
+
+ conn = cf.createConnection();
+ conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.close();
+ conn = null;
+
+ assertTrue(DummyInterceptorB.getCounter() > 0);
+ assertTrue(interceptorA.getCounter() == 0);
+
+
+ log.info("Undeploying server");
+ servers.get(0).undeploy(packetFilterDeploymentB);
+ packetFilterDeploymentB = null;
+ interceptorA.clearCounter();
+ DummyInterceptorB.clearCounter();
+
+ conn = cf.createConnection();
+ conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.close();
+ conn = null;
+
+ assertEquals(0, interceptorA.getCounter());
+ assertEquals(0, DummyInterceptorB.getCounter());
+
+ interceptorA = null;
}
- catch (Exception e)
+ finally
{
+ if (conn != null)
+ {
+ try{conn.close();} catch (Exception ignored){}
+ }
+ if (interceptorA != null)
+ {
+ servers.get(0).getMessagingServer().getRemotingService().removeInterceptor(interceptorA);
+ }
+ if (packetFilterDeploymentB != null)
+ {
+ try{servers.get(0).undeploy(packetFilterDeploymentB);} catch (Exception ignored){}
+ }
}
+ }
+
+ public void testReceiveMessages() throws Throwable
+ {
- DummyInterceptor.status=true;
-
- Connection conn = cf.createConnection();
- conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn.close();
-
-
- assertEquals(0, DummyInterceptorB.getCounter());
- assertTrue(DummyInterceptor.getCounter() > 0);
-
- DummyInterceptor.clearCounter();
- DummyInterceptorB.clearCounter();
-
+ DummyInterceptor interceptor = null;
+ Connection conn = null;
+
+ try
+ {
+
+ interceptor = new DummyInterceptor();
+ servers.get(0).getMessagingServer().getRemotingService().addInterceptor(interceptor);
+
+
+ interceptor.sendException=false;
- KernelDeployment packetFilterDeploymentB = servers.get(0).deployXML("packetFilterDeploymentB", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><deployment xmlns=\"urn:jboss:bean-deployer:2.0\"><bean name=\"DummyInterceptionTestB\" class=\"org.jboss.test.messaging.jms.interception.DummyInterceptorB\"/></deployment>");
+ conn = cf.createConnection();
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue1);
+ String msg = "msg " + UUID.randomUUID().toString();
+
+ interceptor.changeMessage = true;
- conn = cf.createConnection();
- conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn.close();
-
- assertTrue(DummyInterceptorB.getCounter() > 0);
- assertTrue(DummyInterceptor.getCounter() > 0);
-
- DummyInterceptor.clearCounter();
- DummyInterceptorB.clearCounter();
+ producer.send(session.createTextMessage(msg));
+
+ MessageConsumer consumer = session.createConsumer(queue1);
+ TextMessage jmsMsg = (TextMessage)consumer.receive(100000);
+ assertEquals(jmsMsg.getStringProperty("DummyInterceptor"), "was here");
+
+
+ assertNotNull(jmsMsg);
+
+ assertEquals(msg, jmsMsg.getText());
+ }
+ finally
+ {
+ try
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
- servers.get(0).undeploy(packetFilterDeployment);
-
- conn = cf.createConnection();
- conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn.close();
-
- assertTrue(DummyInterceptorB.getCounter() > 0);
- assertTrue(DummyInterceptor.getCounter() == 0);
-
-
- log.info("Undeploying server");
- servers.get(0).undeploy(packetFilterDeploymentB);
- DummyInterceptor.clearCounter();
- DummyInterceptorB.clearCounter();
-
- conn = cf.createConnection();
- conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- conn.close();
-
- assertEquals(0, DummyInterceptor.getCounter());
- assertEquals(0, DummyInterceptorB.getCounter());
+ try
+ {
+ if (interceptor != null)
+ {
+ servers.get(0).getMessagingServer().getRemotingService().removeInterceptor(interceptor);
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
}
-
}
More information about the jboss-cvs-commits
mailing list