[jboss-cvs] JBoss Messaging SVN: r1629 - in trunk: . docs/examples/http/etc src/etc/server/default/deploy src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/jms/server/container src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/util tests tests/bin tests/etc tests/smoke tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/tools/jmx tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 22 23:42:33 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-11-22 23:42:17 -0500 (Wed, 22 Nov 2006)
New Revision: 1629
Added:
trunk/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java
trunk/tests/src/org/jboss/test/messaging/jms/RemotingConnectionConfigurationTest.java
Removed:
trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
trunk/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java
Modified:
trunk/build-messaging.xml
trunk/build-thirdparty.xml
trunk/docs/examples/http/etc/messaging-http-service.xml
trunk/messaging.iml
trunk/src/etc/server/default/deploy/remoting-service.xml
trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java
trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/src/main/org/jboss/jms/util/XMLUtil.java
trunk/tests/bin/.testrc.example
trunk/tests/bin/runtest
trunk/tests/bin/start-rmi-server
trunk/tests/build.xml
trunk/tests/etc/container.xml
trunk/tests/smoke/build.xml
trunk/tests/smoke/smoke.properties.example
trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
merging the HTTP branch; http://jira.jboss.org/jira/browse/JBMESSAGING-207
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/build-messaging.xml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -457,6 +457,9 @@
<copy todir="${build.scoped-sar}" file="${jboss.aop.lib}/jboss-aop.jar"/>
<copy todir="${build.scoped-sar}" file="${javassist.javassist.lib}/javassist.jar"/>
<copy todir="${build.scoped-sar}" file="${trove.trove.lib}/trove.jar"/>
+ <copy todir="${build.scoped-sar}" file="${apache.tomcat.lib}/tomcat-coyote.jar"/>
+ <copy todir="${build.scoped-sar}" file="${apache.tomcat.lib}/tomcat-http.jar"/>
+ <copy todir="${build.scoped-sar}" file="${apache.tomcat.lib}/tomcat-util.jar"/>
<mkdir dir="${build.scoped-sar}/tmp"/>
@@ -576,6 +579,10 @@
<unjar dest="${project.output}/jboss-messaging-client" src="${jboss.naming.lib}/jnp-client.jar"/>
<unjar dest="${project.output}/jboss-messaging-client" src="${oswego.concurrent.lib}/concurrent.jar"/>
<unjar dest="${project.output}/jboss-messaging-client" src="${apache.log4j.lib}/log4j.jar"/>
+ <unjar dest="${project.output}/jboss-messaging-client" src="${apache.logging.lib}/commons-logging.jar"/>
+ <unjar dest="${project.output}/jboss-messaging-client" src="${apache.tomcat.lib}/tomcat-coyote.jar"/>
+ <unjar dest="${project.output}/jboss-messaging-client" src="${apache.tomcat.lib}/tomcat-http.jar"/>
+ <unjar dest="${project.output}/jboss-messaging-client" src="${apache.tomcat.lib}/tomcat-util.jar"/>
<jar jarfile="${build.lib}/jboss-messaging-client.jar">
<fileset dir="${build.classes}">
<include name="org/jboss/**"/>
Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/build-thirdparty.xml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -90,10 +90,10 @@
<componentref name="jboss/common" version="snapshot"/>
<componentref name="jboss/aop" version="1.5.0.GA"/>
<componentref name="jboss/serialization" version="1.0.3.GA"/>
- <componentref name="jboss/remoting" version="2.2.0.Alpha2"/>
+ <componentref name="jboss/remoting" version="2.2.0.Alpha4"/>
<!-- Need this otherwise project doesn't build in Eclipse -->
- <componentref name="apache-logging" version="1.0.5.GA-jboss"/>
+ <componentref name="apache-logging" version="1.0.4.1jboss"/>
<!--
Dependencies required to test the project.
@@ -105,7 +105,6 @@
<componentref name="jboss/profiler/jvmti" version="1.0.0.CR5"/>
<componentref name="jboss/jbossxb" version="1.0.0.CR6"/>
-
</build>
<synchronizeinfo/>
Modified: trunk/docs/examples/http/etc/messaging-http-service.xml
===================================================================
--- trunk/docs/examples/http/etc/messaging-http-service.xml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/docs/examples/http/etc/messaging-http-service.xml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -16,30 +16,34 @@
This Connector will basically run as a standalone http server
-->
<mbean code="org.jboss.remoting.transport.Connector"
- name="jboss.messaging:service=Connector,transport=HTTP"
- display-name="Messaging HTTP transport Connector">
-
- <!--
- Since there are no special configuration properties for this invoker, will just use
- the following to declare the invoker and not within the 'invoker' element within the
- 'Configuration' attribute below. IMPORTANT to note can only be either 'InvokerLocator'
- attribute OR 'invoker' element, not both.
- -->
- <attribute name="InvokerLocator">http://${jboss.bind.address}:7488</attribute>
+ name="jboss.messaging:service=Connector,transport=http"
+ display-name="HTTP transport Connector">
<attribute name="Configuration">
<config>
+ <invoker transport="http">
+ <attribute name="marshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
+ <attribute name="unmarshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
+ <attribute name="serializationtype" isParam="true">jms</attribute>
+ <attribute name="dataType" isParam="true">jms</attribute>
+ <attribute name="serverBindAddress">${jboss.bind.address}</attribute>
+ <attribute name="serverBindPort">4458</attribute>
+ <attribute name="leasePeriod">20000</attribute>
+ <attribute name="callbackStore">org.jboss.remoting.callback.CallbackStore</attribute>
+ <attribute name="callbackPollPeriod" isParam="true">100</attribute>
+ </invoker>
<handlers>
<handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>
</handlers>
</config>
</attribute>
+ <depends>jboss.messaging:service=NetworkRegistry</depends>
</mbean>
-
+
<mbean code="org.jboss.jms.server.connectionfactory.ConnectionFactory"
name="jboss.messaging.destination:service=SecureConnectionFactory"
xmbean-dd="xmdesc/ConnectionFactory-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
- <depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=HTTP</depends>
+ <depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=http</depends>
<attribute name="JNDIBindings">
<bindings>
<binding>/HttpConnectionFactory</binding>
Modified: trunk/messaging.iml
===================================================================
--- trunk/messaging.iml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/messaging.iml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -169,21 +169,21 @@
<orderEntry type="module-library">
<library>
<CLASSES>
- <root url="jar://$MODULE_DIR$/thirdparty/jboss/remoting/lib/jboss-remoting.jar!/" />
+ <root url="jar://$MODULE_DIR$/thirdparty/jboss/serialization/lib/jboss-serialization.jar!/" />
</CLASSES>
<JAVADOC />
- <SOURCES>
- <root url="file://$MODULE_DIR$/../../cvs/JBossRemoting-2.2.0.Alpha1/src/main" />
- </SOURCES>
+ <SOURCES />
</library>
</orderEntry>
<orderEntry type="module-library">
<library>
<CLASSES>
- <root url="jar://$MODULE_DIR$/thirdparty/jboss/serialization/lib/jboss-serialization.jar!/" />
+ <root url="jar://$MODULE_DIR$/thirdparty/jboss/remoting/lib/jboss-remoting.jar!/" />
</CLASSES>
<JAVADOC />
- <SOURCES />
+ <SOURCES>
+ <root url="file://$MODULE_DIR$/../../cvs/JBossRemoting-2.2.0.Alpha4/src/main" />
+ </SOURCES>
</library>
</orderEntry>
<orderEntryProperties />
Modified: trunk/src/etc/server/default/deploy/remoting-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/remoting-service.xml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/etc/server/default/deploy/remoting-service.xml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -24,6 +24,7 @@
<attribute name="serverBindAddress">${jboss.bind.address}</attribute>
<attribute name="serverBindPort">4457</attribute>
<attribute name="leasePeriod">20000</attribute>
+ <attribute name="callbackStore">org.jboss.remoting.callback.CallbackStore</attribute>
</invoker>
<handlers>
<handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>
@@ -33,6 +34,37 @@
<depends>jboss.messaging:service=NetworkRegistry</depends>
</mbean>
+ <!--
+ HTTP Connector example
+ -->
+ <!--
+ <mbean code="org.jboss.remoting.transport.Connector"
+ name="jboss.messaging:service=Connector,transport=http"
+ display-name="HTTP transport Connector">
+ <attribute name="Configuration">
+ <config>
+ <invoker transport="http">
+ <attribute name="marshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
+ <attribute name="unmarshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
+ <!== Serialization type must be jms - do not change! ==>
+ <attribute name="serializationtype" isParam="true">jms</attribute>
+ <attribute name="dataType" isParam="true">jms</attribute>
+ <attribute name="serverBindAddress">${jboss.bind.address}</attribute>
+ <attribute name="serverBindPort">4458</attribute>
+ <attribute name="leasePeriod">20000</attribute>
+ <attribute name="callbackStore">org.jboss.remoting.callback.CallbackStore</attribute>
+ <attribute name="callbackPollPeriod" isParam="true">100</attribute>
+ </invoker>
+ <handlers>
+ <handler subsystem="JMS">org.jboss.jms.server.remoting.JMSServerInvocationHandler</handler>
+ </handlers>
+ </config>
+ </attribute>
+ <depends>jboss.messaging:service=NetworkRegistry</depends>
+ </mbean>
+ -->
+
+
<!-- TODO: Do I need this> -->
<mbean code="org.jboss.remoting.network.NetworkRegistry"
name="jboss.messaging:service=NetworkRegistry"/>
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -158,6 +158,16 @@
throw new IllegalStateException("This invocation should not be handled here!");
}
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
+ public void confirmDelivery(int count)
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+
// Public --------------------------------------------------------
public void init()
Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -24,13 +24,11 @@
import java.util.List;
import java.util.Map;
-import javax.management.MBeanServer;
-
import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.remoting.MessagingMarshallable;
-import org.jboss.remoting.InvocationRequest;
-import org.jboss.remoting.ServerInvocationHandler;
-import org.jboss.remoting.ServerInvoker;
+import org.jboss.logging.Logger;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.InvokerCallbackHandler;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
@@ -39,85 +37,91 @@
*
* A CallbackManager.
*
- * The CallbackManager is an InvocationHandler used for handling callbacks to message consumers
- * The callback is received and dispatched off to the relevant consumer
+ * The CallbackManager is an InvocationHandler used for handling callbacks to message consumers.
+ * The callback is received and dispatched off to the relevant consumer.
*
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version 1.1
*
* CallbackManager.java,v 1.1 2006/02/01 17:38:30 timfox Exp
*/
-public class CallbackManager implements ServerInvocationHandler
+public class CallbackManager implements InvokerCallbackHandler
{
+ // Constants -----------------------------------------------------
+
+ protected static final Logger log = Logger.getLogger(CallbackManager.class);
+
+ public static final String JMS_CALLBACK_SUBSYSTEM = "CALLBACK";
+
+ // Static --------------------------------------------------------
+
+ protected static CallbackManager theManager;
+
+ // Attributes ----------------------------------------------------
+
protected Map callbackHandlers;
+ // Constructors --------------------------------------------------
+
public CallbackManager()
{
callbackHandlers = new ConcurrentReaderHashMap();
}
-
- public void registerHandler(int serverId, int consumerId, MessageCallbackHandler handler)
- {
- Long lookup = calcLookup(serverId, consumerId);
-
- callbackHandlers.put(lookup, handler);
- }
-
- public void unregisterHandler(int serverId, int consumerId)
- {
- Long lookup = calcLookup(serverId, consumerId);
-
- callbackHandlers.remove(lookup);
- }
-
- private Long calcLookup(int serverId, int consumerId)
- {
- long id1 = serverId;
-
- id1 <<= 32;
-
- long id2 = consumerId;
-
- long lookup = id1 | id2;
-
- return new Long(lookup);
- }
-
- public void addListener(InvokerCallbackHandler arg0)
- {
- }
- public Object invoke(InvocationRequest ir) throws Throwable
+ // InvokerCallbackHandler implementation -------------------------
+
+ public void handleCallback(Callback callback) throws HandleCallbackException
{
- MessagingMarshallable mm = (MessagingMarshallable)ir.getParameter();
-
+ MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
ClientDelivery dr = (ClientDelivery)mm.getLoad();
-
- Long lookup = calcLookup(dr.getServerId(), dr.getConsumerId());
-
+ Long lookup = computeLookup(dr.getServerId(), dr.getConsumerId());
List msgs = dr.getMessages();
- MessageCallbackHandler handler =
- (MessageCallbackHandler)callbackHandlers.get(lookup);
-
+ MessageCallbackHandler handler = (MessageCallbackHandler)callbackHandlers.get(lookup);
+
if (handler == null)
{
- throw new IllegalStateException("Cannot find handler for consumer: " + dr.getConsumerId() + " and server " + dr.getServerId());
+ throw new IllegalStateException("Cannot find handler for consumer: " + dr.getConsumerId() +
+ " and server " + dr.getServerId());
}
-
- return new MessagingMarshallable(mm.getVersion(), handler.handleMessage(msgs));
+
+ handler.handleMessage(msgs);
}
- public void removeListener(InvokerCallbackHandler arg0)
+ // Public --------------------------------------------------------
+
+ public void registerHandler(int serverID, int consumerID, MessageCallbackHandler handler)
{
+ Long lookup = computeLookup(serverID, consumerID);
+
+ callbackHandlers.put(lookup, handler);
}
- public void setInvoker(ServerInvoker arg0)
- {
+ public void unregisterHandler(int serverID, int consumerID)
+ {
+ Long lookup = computeLookup(serverID, consumerID);
+
+ callbackHandlers.remove(lookup);
}
- public void setMBeanServer(MBeanServer arg0)
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private Long computeLookup(int serverID, int consumerID)
{
+ long id1 = serverID;
+
+ id1 <<= 32;
+
+ long lookup = id1 | consumerID;
+
+ return new Long(lookup);
}
+ // Inner classes -------------------------------------------------
+
}
Deleted: trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackServerFactory.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -1,256 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.remoting;
-
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.jboss.jms.util.MessagingJMSException;
-import org.jboss.logging.Logger;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.security.SSLSocketBuilder;
-import org.jboss.remoting.transport.Connector;
-import org.jboss.remoting.transport.PortUtil;
-
-/**
- *
- * A CallbackServerFactory.
- *
- * We maintain only one callbackserver per transport per client VM.
- * This is to avoid having too many resources e.g. server sockets in use on the client
- * E.g. in the case of a socket transport, if we had one callbackserver per connection then
- * we would have one server socket listening per connection, so we could run out of available
- * ports with a lot of connections.
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision$
- *
- * $Id$
- */
-public class CallbackServerFactory
-{
- private static final Logger log = Logger.getLogger(CallbackServerFactory.class);
-
- private static final String CALLBACK_SERVER_PARAMS =
- "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
- "unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
- "dataType=jms&" +
- "timeout=0&" +
- "socket.check_connection=false";
-
- public static final String JMS_CALLBACK_SUBSYSTEM = "CALLBACK";
-
- public static final String CLIENT_HOST =
- System.getProperty("jboss.messaging.callback.bind.address");
-
- public static final int CLIENT_PORT = getPort();
-
- private static int getPort()
- {
- String propertyPort = System.getProperty("jboss.messaging.callback.bind.port");
-
- try
- {
- if (propertyPort!=null)
- {
- return Integer.parseInt(propertyPort);
- }
- else
- {
- return -1;
- }
- }
- catch (Exception e)
- {
- log.warn("Error during parsing jboss.messaging.callback.bind.port", e);
- return -1;
- }
-
- }
-
- public static CallbackServerFactory instance = new CallbackServerFactory();
-
- private Map holders;
-
- private CallbackServerFactory()
- {
- holders = new HashMap();
- }
-
- public synchronized boolean containsCallbackServer(String protocol)
- {
- return holders.containsKey(protocol);
- }
-
- public synchronized Connector getCallbackServer(InvokerLocator serverLocator) throws Exception
- {
- String protocol = serverLocator.getProtocol();
-
- Holder h = (Holder)holders.get(protocol);
-
- if (h == null)
- {
- h = new Holder();
-
- h.server = startCallbackServer(serverLocator);
-
- holders.put(protocol, h);
- }
- else
- {
- h.refCount++;
- }
-
- return h.server;
- }
-
- public synchronized void stopCallbackServer(String protocol)
- {
- Holder h = (Holder)holders.get(protocol);
-
- if (h == null)
- {
- throw new IllegalArgumentException("Cannot find callback server for protocol: " + protocol);
- }
-
- h.refCount--;
-
- if (h.refCount == 0)
- {
- stopCallbackServer(h.server);
-
- holders.remove(protocol);
- }
- }
-
- protected Connector startCallbackServer(InvokerLocator serverLocator) throws Exception
- {
- if (log.isTraceEnabled()) { log.trace(this + " setting up connection to " + serverLocator); }
-
- final int MAX_RETRIES = 50;
- boolean completed = false;
- Connector server = null;
- String serializationType = null;
- int count = 0;
-
- String thisAddress = CLIENT_HOST;
-
- if (thisAddress==null)
- {
- thisAddress = InetAddress.getLocalHost().getHostAddress();
- }
-
- boolean isSSL = serverLocator.getProtocol().equals("sslsocket");
- Map params = serverLocator.getParameters();
-
- if (params != null)
- {
- //serializationType = (String)params.get("serializationtype");
-
- //Always use jms
- serializationType = "jms";
- }
-
- while (!completed && count < MAX_RETRIES)
- {
- try
- {
- int bindPort = CLIENT_PORT;
- if (bindPort<=0)
- {
- bindPort=PortUtil.findFreePort(thisAddress);
- }
-
- String callbackServerURI;
-
- if (isSSL)
- {
- // See http://jira.jboss.com/jira/browse/JBREM-470
- callbackServerURI =
- "sslsocket://" + thisAddress + ":" + bindPort + CALLBACK_SERVER_PARAMS +
- "&" + SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE + "=true";
- }
- else
- {
- callbackServerURI = serverLocator.getProtocol() + "://" + thisAddress +
- ":" + bindPort + CALLBACK_SERVER_PARAMS;
- }
-
- if (serializationType != null)
- {
- callbackServerURI += "&serializationType=" + serializationType;
- }
-
- InvokerLocator callbackServerLocator = new InvokerLocator(callbackServerURI);
-
- log.debug(this + " starting callback server " + callbackServerLocator.getLocatorURI());
-
- server = new Connector();
- server.setInvokerLocator(callbackServerLocator.getLocatorURI());
- server.create();
- server.addInvocationHandler(JMS_CALLBACK_SUBSYSTEM, new CallbackManager());
- server.start();
-
- if (log.isTraceEnabled()) { log.trace("callback server started"); }
-
- completed = true;
- }
- catch (Exception e)
- {
- log.warn("Failed to start connection. Will retry", e);
-
- // Intermittently we can fail to open a socket on the address since it's already in use
- // This is despite remoting having checked the port is free. This is probably because
- // of the small window between remoting checking the port is free and getting the
- // port number and actually opening the connection during which some one else can use
- // that port. Therefore we catch this and retry.
-
- count++;
-
- if (count == MAX_RETRIES)
- {
- final String msg = "Cannot start callbackserver after " + MAX_RETRIES + " retries";
- log.error(msg, e);
- throw new MessagingJMSException(msg, e);
- }
- }
- }
-
- return server;
- }
-
- protected void stopCallbackServer(Connector server)
- {
- log.debug("Stopping and destroying callback server " + server.getLocator().getLocatorURI());
- server.stop();
- server.destroy();
- }
-
- private class Holder
- {
- Connector server;
- int refCount = 1;
- }
-
-}
-
Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -29,8 +29,7 @@
import org.jboss.logging.Logger;
import org.jboss.remoting.Client;
import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.callback.InvokerCallbackHandler;
-import org.jboss.remoting.transport.Connector;
+import org.jboss.remoting.callback.CallbackPoller;
/**
@@ -51,6 +50,8 @@
{
// Constants -----------------------------------------------------
+ public static final String CALLBACK_POLL_PERIOD_DEFAULT = "100";
+
private static final Logger log = Logger.getLogger(JMSRemotingConnection.class);
// Static --------------------------------------------------------
@@ -59,19 +60,15 @@
protected Client client;
protected boolean clientPing;
- protected Connector callbackServer;
protected InvokerLocator serverLocator;
protected CallbackManager callbackManager;
- private InvokerCallbackHandler dummyCallbackHandler;
-
// Constructors --------------------------------------------------
public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Throwable
- {
+ {
serverLocator = new InvokerLocator(serverLocatorURI);
this.clientPing = clientPing;
- dummyCallbackHandler = new DummyCallbackHandler();
log.debug(this + " created");
}
@@ -91,11 +88,7 @@
if (log.isTraceEnabled()) { log.trace(this + " created client"); }
- // Get the callback server
-
- callbackServer = CallbackServerFactory.instance.getCallbackServer(serverLocator);
- callbackManager = (CallbackManager)callbackServer.getInvocationHandlers()[0];
-
+ callbackManager = new CallbackManager();
client.connect();
// We explicitly set the Marshaller since otherwise remoting tries to resolve the marshaller
@@ -106,12 +99,62 @@
client.setMarshaller(new JMSWireFormat());
client.setUnMarshaller(new JMSWireFormat());
- // We add a dummy callback handler only to trigger the addListener method on the
- // JMSServerInvocationHandler to be called, which allows the server to get hold of a reference
- // to the callback client so it can make callbacks
+ // For socket transport allow true push callbacks, with callback Connector.
+ // For http transport, simulate push callbacks.
+ boolean doPushCallbacks = "socket".equals(serverLocator.getProtocol());
+ if (doPushCallbacks)
+ {
+ if (log.isTraceEnabled()) log.trace("doing push callbacks");
+ HashMap metadata = new HashMap();
+ metadata.put(InvokerLocator.DATATYPE, "jms");
+ metadata.put(InvokerLocator.SERIALIZATIONTYPE, "jms");
- client.addListener(dummyCallbackHandler, callbackServer.getLocator());
+ String bindAddress = System.getProperty("jboss.messaging.callback.bind.address");
+ if (bindAddress != null)
+ {
+ metadata.put(Client.CALLBACK_SERVER_HOST, bindAddress);
+ }
+ String propertyPort = System.getProperty("jboss.messaging.callback.bind.port");
+ if (propertyPort != null)
+ {
+ metadata.put(Client.CALLBACK_SERVER_PORT, propertyPort);
+ }
+
+ client.addListener(callbackManager, metadata, null, true);
+ }
+ else
+ {
+ if (log.isTraceEnabled()) log.trace("simulating push callbacks");
+
+ HashMap metadata = new HashMap();
+
+ // "jboss.messaging.callback.pollPeriod" system property, if set, has the highest priority ...
+ String callbackPollPeriod = System.getProperty("jboss.messaging.callback.pollPeriod");
+ if (callbackPollPeriod == null)
+ {
+ // followed by the value configured on the HTTP connector ("callbackPollPeriod") ...
+ callbackPollPeriod = (String)serverLocator.getParameters().get("callbackPollPeriod");
+ if (callbackPollPeriod == null)
+ {
+ // followed by the hardcoded value.
+ callbackPollPeriod = CALLBACK_POLL_PERIOD_DEFAULT;
+ }
+ }
+
+ metadata.put(CallbackPoller.CALLBACK_POLL_PERIOD, callbackPollPeriod);
+
+ String reportPollingStatistics =
+ System.getProperty("jboss.messaging.callback.reportPollingStatistics");
+
+ if (reportPollingStatistics != null)
+ {
+ metadata.put(CallbackPoller.REPORT_STATISTICS, reportPollingStatistics);
+ }
+
+ client.addListener(callbackManager, metadata);
+ }
+
log.debug(this + " started");
}
@@ -122,14 +165,10 @@
// explicitly remove the callback listener, to avoid race conditions on server
// (http://jira.jboss.org/jira/browse/JBMESSAGING-535)
- client.removeListener(dummyCallbackHandler);
- dummyCallbackHandler = null;
-
- CallbackServerFactory.instance.stopCallbackServer(serverLocator.getProtocol());
-
+ client.removeListener(callbackManager);
client.disconnect();
-
- log.debug(this + " closed");
+
+ log.debug(this + " closed");
}
public Client getInvokingClient()
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -247,7 +247,18 @@
// Ignore
return new HandleMessageResponse(false, 0);
}
-
+
+ // Asynchronously confirm delivery on client
+
+ try
+ {
+ sessionExecutor.execute(new ConfirmDelivery(msgs.size()));
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted", e);
+ }
+
// Put the messages in the buffer and notify any waiting receive()
processMessages(msgs);
@@ -756,6 +767,26 @@
}
}
}
+
+ /*
+ * Used to asynchronously confirm to the server message arrival (delivery) on client.
+ */
+ private class ConfirmDelivery implements Runnable
+ {
+ int count;
+
+ ConfirmDelivery(int count)
+ {
+ this.count = count;
+ }
+
+ public void run()
+ {
+ if (trace) { log.trace("confirming delivery on client of " + count + " message(s)"); }
+ consumerDelegate.confirmDelivery(count);
+ }
+ }
+
}
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -84,10 +84,10 @@
int id = serverPeer.getNextObjectID();
ServerConnectionFactoryEndpoint endpoint =
- new ServerConnectionFactoryEndpoint(id, serverPeer, clientID, jndiBindings,
- prefetchSize, defaultTempQueueFullSize,
- defaultTempQueuePageSize,
- defaultTempQueueDownCacheSize);
+ new ServerConnectionFactoryEndpoint(id, serverPeer, clientID,
+ jndiBindings, prefetchSize,
+ defaultTempQueueFullSize,
+ defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
ClientConnectionFactoryDelegate delegate =
new ClientConnectionFactoryDelegate(id, locatorURI, serverPeer.getVersion(),
Modified: trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/server/container/InjectionAspect.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -55,7 +55,7 @@
{
MethodInvocation mi = (MethodInvocation)invocation;
- // First we inject the callback client for the connection
+ // First we inject the callback handler for the connection
ServerInvokerCallbackHandler handler =
(ServerInvokerCallbackHandler)mi.getMetaData(MetaDataConstants.JMS,
@@ -73,7 +73,7 @@
ServerConnectionEndpoint endpoint = (ServerConnectionEndpoint)advised.getEndpoint();
- endpoint.setCallbackClient(handler.getCallbackClient());
+ endpoint.setCallbackHandler(handler);
// Then we inject the remoting session id of the client
String sessionId =
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ConsumerEndpoint.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -30,6 +30,8 @@
* The rest of the methods are handled in the advice stack.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -37,11 +39,23 @@
public interface ConsumerEndpoint extends Closeable
{
/**
- * If the client buffer has previously become full because the server was sending at a faster rate than the
- * client could consume, then the server will stop sending messages.
- * When the client has emptied the buffer it then needs to inform the server that it can receive more messages
- * by calling this method
+ * If the client buffer has previously become full because the server was sending at a faster
+ * rate than the client could consume, then the server will stop sending messages. When the
+ * client has emptied the buffer it then needs to inform the server that it can receive more
+ * messages by calling this method.
+ *
* @throws JMSException
*/
- void more() throws JMSException;
+ void more() throws JMSException;
+
+ /**
+ * The server consumer endpoint needs to know at any time how messages are in transit between
+ * server and client. That is why it needs to receive confirmations every time the client
+ * received one (or more) messages. The confirmation is sent asynchronously from client to server.
+ * This is NOT a consumption acknowledgment.
+ *
+ * @param count - the number of messages received by the client in one batch.
+ */
+ void confirmDelivery(int count);
+
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -33,7 +33,7 @@
import javax.transaction.xa.Xid;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.client.remoting.CallbackServerFactory;
+import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
@@ -56,6 +56,7 @@
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.ConcurrentReaderHashSet;
import org.jboss.remoting.Client;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import org.jboss.util.id.GUID;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
@@ -118,7 +119,7 @@
private MessageStore ms;
- private Client callbackClient;
+ private ServerInvokerCallbackHandler callbackHandler;
private byte usingVersion;
@@ -427,21 +428,31 @@
}
//IOC
- public void setCallbackClient(Client client)
+ public void setCallbackHandler(ServerInvokerCallbackHandler handler)
{
- callbackClient = client;
-
- // TODO not sure if this is the best way to do this, but the callbackClient needs to have
- // its "subsystem" set, otherwise remoting cannot find the associated
- // ServerInvocationHandler on the callback server
- callbackClient.setSubsystem(CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM);
-
- // We explictly set the Marshaller since otherwise remoting tries to resolve the marshaller
- // every time which is very slow - see org.jboss.remoting.transport.socket.ProcessInvocation
- // This can make a massive difference on performance. We also do this in
- // JMSRemotingConnection.setupConnection
- callbackClient.setMarshaller(new JMSWireFormat());
- callbackClient.setUnMarshaller(new JMSWireFormat());
+ callbackHandler = handler;
+ Client callbackClient = callbackHandler.getCallbackClient();
+
+ if (callbackClient != null)
+ {
+ // TODO not sure if this is the best way to do this, but the callbackClient needs to have
+ // its "subsystem" set, otherwise remoting cannot find the associated
+ // ServerInvocationHandler on the callback server
+ callbackClient.setSubsystem(CallbackManager.JMS_CALLBACK_SUBSYSTEM);
+
+ // We explictly set the Marshaller since otherwise remoting tries to resolve the marshaller
+ // every time which is very slow - see org.jboss.remoting.transport.socket.ProcessInvocation
+ // This can make a massive difference on performance. We also do this in
+ // JMSRemotingConnection.setupConnection
+
+ callbackClient.setMarshaller(new JMSWireFormat());
+ callbackClient.setUnMarshaller(new JMSWireFormat());
+ }
+ else
+ {
+ log.debug("ServerInvokerCallbackHandler callback Client is not available: " +
+ "must be using pull callbacks");
+ }
}
// IOC
@@ -494,9 +505,9 @@
// Protected -----------------------------------------------------
- protected Client getCallbackClient()
+ protected ServerInvokerCallbackHandler getCallbackHandler()
{
- return callbackClient;
+ return callbackHandler;
}
protected int getConnectionID()
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -32,7 +32,6 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-import org.jboss.jms.client.remoting.HandleMessageResponse;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
@@ -56,6 +55,7 @@
import org.jboss.messaging.core.tx.TransactionException;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
+import org.jboss.remoting.callback.Callback;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -79,6 +79,7 @@
// Static --------------------------------------------------------
private static final int MAX_DELIVERY_ATTEMPTS = 10;
+ private static final int MESSAGES_IN_TRANSIT_WAIT_COUNT = 100;
// Attributes ----------------------------------------------------
@@ -122,6 +123,9 @@
private Object lock;
private Map deliveries;
+
+ private Object messagesInTransitLock;
+ private int messagesInTransitCount; // access only from a region guarded by messagesInTransitLock
// Constructors --------------------------------------------------
@@ -193,6 +197,9 @@
// prompt delivery
messageQueue.deliver(false);
+
+ messagesInTransitLock = new Object();
+ messagesInTransitCount = 0;
log.debug(this + " constructed");
}
@@ -254,9 +261,9 @@
{
return delivery;
}
+
+ deliveries.put(new Long(ref.getMessageID()), delivery);
- deliveries.put(new Long(ref.getMessageID()), delivery);
-
// We don't send the message as-is, instead we create a MessageProxy instance. This allows
// local fields such as deliveryCount to be handled by the proxy but global data to be
// fielded by the same underlying Message instance. This allows us to avoid expensive
@@ -404,34 +411,39 @@
{
try
{
- /*
- Set clientConsumerFull to false
- NOTE! This must be done using a Runnable on the delivery executor - this is to
- prevent the following race condition:
- 1) Messages are delivered to the client, causing it to be full
- 2) The messages are consumed very quickly on the client causing more to be called()
- 3) more() hits the server BEFORE the deliverer thread has returned from delivering to the client
- causing clientConsumerFull to be set to false and adding a deliverer to the queue.
- 4) The deliverer thread returns and sets clientConsumerFull to true
- 5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
- though the client needs messages
- */
- this.executor.execute(new Runnable() { public void run() { clientConsumerFull = false; } });
+ // Set clientConsumerFull to false.
+ //
+ // NOTE! This must be done using a Runnable on the delivery executor - this is to prevent
+ // the following race condition:
+ // 1) Messages are delivered to the client, causing it to be full.
+ // 2) The messages are consumed very quickly on the client causing more() to be called.
+ // 3) more() hits the server BEFORE the deliverer thread has returned from delivering to
+ // the client causing clientConsumerFull to be set to false and adding a deliverer to
+ // the queue.
+ // 4) The deliverer thread returns and sets clientConsumerFull to true.
+ // 5) The next deliverer runs but doesn't do anything since clientConsumerFull = true even
+ // though the client needs messages.
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ if (trace) { log.trace(ServerConsumerEndpoint.this + " is notified that client wants more() messages"); }
+ clientConsumerFull = false;
+ }
+ });
- //Run a deliverer to deliver any existing ones
- this.executor.execute(new Deliverer());
+ // Run a deliverer to deliver any existing ones
+ executor.execute(new Deliverer());
- //TODO Why do we need to wait for it to execute??
- //Why not just return immediately?
+ // TODO Why do we need to wait for it to execute? Why not just return immediately?
- //Now wait for it to execute
+ // Now wait for it to execute
Future result = new Future();
-
this.executor.execute(new Waiter(result));
-
result.getResult();
- //Now we know the deliverer has delivered any outstanding messages to the client buffer
+ // Now we know the deliverer has delivered any outstanding messages to the client buffer
messageQueue.deliver(false);
}
@@ -444,8 +456,25 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " more");
}
}
-
-
+
+ public void confirmDelivery(int count)
+ {
+ synchronized(messagesInTransitLock)
+ {
+ messagesInTransitCount -= count;
+
+ if (trace) { log.trace("confirming delivery of " + count + " message(s), messages in transit " + messagesInTransitCount); }
+
+ if (messagesInTransitCount < 0)
+ {
+ log.error(this + " has an invalid messages in transit count (" +
+ messagesInTransitCount + ")");
+ }
+
+ messagesInTransitLock.notifyAll();
+ }
+ }
+
// Public --------------------------------------------------------
public String toString()
@@ -633,18 +662,37 @@
// Flush any messages waiting to be sent to the client.
this.executor.execute(new Deliverer());
- // Now wait for it to execute.
- Future result = new Future();
- this.executor.execute(new Waiter(result));
- result.getResult();
-
+ if (trace) { log.trace(this + " flushed all remaining messages (if any) to the client"); }
+
// Now we know any deliverer has delivered any outstanding messages to the client buffer.
}
catch (InterruptedException e)
{
log.warn("Thread interrupted", e);
}
-
+
+ // Make sure there are no messages in transit between server and client
+
+ synchronized(messagesInTransitLock)
+ {
+ int loopCount = 0;
+ while(messagesInTransitCount > 0 && loopCount < MESSAGES_IN_TRANSIT_WAIT_COUNT)
+ {
+ log.debug(this + " waiting for " + messagesInTransitCount + " message(s) in transit " +
+ "to reach the client, " + (loopCount + 1) + " lock grab attempts.");
+ messagesInTransitLock.wait(500);
+ loopCount ++;
+ }
+
+ if (loopCount >= MESSAGES_IN_TRANSIT_WAIT_COUNT)
+ {
+ throw new IllegalStateException("Maximum number of lock grab attempts exceeded, " +
+ "giving up to wait for messages in transit");
+ }
+
+ if (trace) { log.trace(this + " has no messages in transit"); }
+ }
+
// Now we know that there are no in flight messages on the way to the client consumer, but
// there may be messages still in the toDeliver list since the client consumer might be full,
// so we need to cancel these.
@@ -685,7 +733,7 @@
private void checkDeliveryCount(SimpleDelivery del)
{
- //TODO - We need to put the message in a DLQ
+ // TODO - We need to put the message in a DLQ
// For now we just ack it otherwise the message will keep being retried and we'll never get
// anywhere
if (del.getReference().getDeliveryCount() > MAX_DELIVERY_ATTEMPTS)
@@ -716,8 +764,7 @@
{
public void run()
{
- // Is there anything to deliver?
- // This is ok outside lock - is volatile.
+ // Is there anything to deliver? This is ok outside lock - is volatile.
if (clientConsumerFull)
{
if (trace) { log.trace(this + " client consumer full, do nothing"); }
@@ -749,30 +796,33 @@
int serverId = connection.getServerPeer().getServerPeerID();
+ // TODO How can we ensure that messages for the same consumer aren't delivered
+ // concurrently to the same consumer on different threads?
+
+ ClientDelivery del = new ClientDelivery(list, serverId, id);
+ MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
+ Callback callback = new Callback(mm);
+
try
{
if (trace) { log.trace(ServerConsumerEndpoint.this + " handing " + list.size() + " message(s) over to the remoting layer"); }
- ClientDelivery del = new ClientDelivery(list, serverId, id);
+ synchronized(messagesInTransitLock)
+ {
+ connection.getCallbackHandler().handleCallback(callback);
+ messagesInTransitCount += list.size();
+ }
- // TODO How can we ensure that messages for the same consumer aren't delivered
- // concurrently to the same consumer on different threads?
- MessagingMarshallable mm = new MessagingMarshallable(connection.getUsingVersion(), del);
-
- MessagingMarshallable resp = (MessagingMarshallable)connection.getCallbackClient().invoke(mm);
-
if (trace) { log.trace(ServerConsumerEndpoint.this + " handed messages over to the remoting layer"); }
- HandleMessageResponse result = (HandleMessageResponse)resp.getLoad();
+ // We are NOT using Remoting's facility of acknowledging callbacks. A callback is sent
+ // asynchronously, and there is no confirmation that the callback reached the client or
+ // not.
- // For now we don't look at how many messages are accepted since they all will be.
- // The field is a placeholder for the future.
- if (result.clientIsFull())
- {
- // Stop the server sending any more messages to the client.
- // This is ok outside lock.
- clientConsumerFull = true;
- }
+ // TODO Previously, synchronous server-to-client invocations were used by the client
+ // to report back whether is full or not. This cannot be achieved with asynchronous
+ // callbacks, so the client must explicitely sent this information to the server,
+ // with an invocation on its own.
}
catch(Throwable t)
{
@@ -868,5 +918,4 @@
delList.add(new Long(messageID));
}
}
-
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConsumerAdvised.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -73,6 +73,12 @@
{
endpoint.more();
}
+
+ public void confirmDelivery(int count)
+ {
+ endpoint.confirmDelivery(count);
+ }
+
// AdvisedSupport overrides --------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -37,8 +37,8 @@
import org.jboss.aop.Dispatcher;
import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.remoting.CallbackServerFactory;
import org.jboss.jms.client.remoting.HandleMessageResponse;
+import org.jboss.jms.client.remoting.CallbackManager;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.Version;
@@ -50,6 +50,8 @@
import org.jboss.messaging.core.plugin.IdBlock;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvocationResponse;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.invocation.InternalInvocation;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.serial.io.JBossObjectInputStream;
@@ -81,7 +83,7 @@
private static final Logger log = Logger.getLogger(JMSWireFormat.class);
- private static boolean usingJBossSerialization;
+ private static boolean usingJBossSerialization = true;
// The request codes - start from zero
@@ -103,6 +105,7 @@
protected static final byte HANDLE_MESSAGE_RESPONSE = 103;
protected static final byte BROWSE_MESSAGE_RESPONSE = 104;
protected static final byte BROWSE_MESSAGES_RESPONSE = 105;
+ protected static final byte CALLBACK_LIST = 106;
// Static --------------------------------------------------------
@@ -122,21 +125,27 @@
{
trace = log.isTraceEnabled();
}
-
-
// Marshaller implementation -------------------------------------
public void write(Object obj, OutputStream out) throws IOException
{
- //Sanity check
- if (!(out instanceof MessagingObjectOutputStream))
+ DataOutputStream dos = null;
+
+ // This won't be necessary: see JBREM-597.
+ if (out instanceof MessagingObjectOutputStream)
{
- throw new IllegalStateException("Must be MessagingObjectOutputStream");
+ dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
}
+ else if (out instanceof DataOutputStream)
+ {
+ dos = (DataOutputStream)out;
+ }
+ else
+ {
+ dos = new DataOutputStream(out);
+ }
- DataOutputStream dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
-
handleVersion(obj, dos);
try
@@ -148,9 +157,26 @@
InvocationRequest req = (InvocationRequest)obj;
Object param;
-
- if (req.getParameter() instanceof MessagingMarshallable)
+
+ // Unwrap Callback.
+ if (req.getParameter() instanceof InternalInvocation)
{
+ InternalInvocation ii = (InternalInvocation) req.getParameter();
+ Object[] params = ii.getParameters();
+
+ if (params != null && params.length > 0 && params[0] instanceof Callback)
+ {
+ Callback callback = (Callback) params[0];
+ MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+ param = mm.getLoad();
+ }
+ else
+ {
+ param = req.getParameter();
+ }
+ }
+ else if (req.getParameter() instanceof MessagingMarshallable)
+ {
param = ((MessagingMarshallable)req.getParameter()).getLoad();
}
else
@@ -299,6 +325,8 @@
ClientDelivery dr = (ClientDelivery)param;
dos.writeByte(CALLBACK);
+
+ dos.writeUTF(req.getSessionId());
dr.write(dos);
@@ -409,6 +437,37 @@
if (trace) { log.trace("wrote browse message response"); }
}
+ else if (res instanceof ArrayList &&
+ ((ArrayList) res).size() > 0 &&
+ ((ArrayList) res).get(0) instanceof Callback)
+ {
+ // Comes from polled Callbacks.
+ ArrayList callbackList = (ArrayList)res;
+ dos.write(CALLBACK_LIST);
+ dos.writeUTF(resp.getSessionId());
+ dos.writeInt(callbackList.size());
+
+ Iterator it = callbackList.iterator();
+ while (it.hasNext())
+ {
+ Callback callback = (Callback)it.next();
+
+ // We don't use acknowledgeable push callbacks
+
+// Map payload = callback.getReturnPayload();
+// String guid = (String)payload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
+// dos.writeUTF(guid);
+// String listenerId = (String)payload.get(Client.LISTENER_ID_KEY);
+// dos.writeUTF(listenerId);
+// String acks = (String)payload.get(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS);
+// dos.writeUTF(acks);
+
+ MessagingMarshallable mm = (MessagingMarshallable)callback.getParameter();
+ ClientDelivery delivery = (ClientDelivery)mm.getLoad();
+ delivery.write(dos);
+ dos.flush();
+ }
+ }
else
{
dos.write(SERIALIZED);
@@ -441,14 +500,22 @@
public Object read(InputStream in, Map map) throws IOException, ClassNotFoundException
{
- // Sanity check
- if (!(in instanceof MessagingObjectInputStream))
+ DataInputStream dis = null;
+
+ // This won't be necessary: see JBREM-597.
+ if (in instanceof MessagingObjectInputStream)
{
- throw new IllegalStateException("Must be MessagingObjectInputStream");
+ dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
}
+ else if (in instanceof DataInputStream)
+ {
+ dis = (DataInputStream) in;
+ }
+ else
+ {
+ dis = new DataInputStream(in);
+ }
- DataInputStream dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
-
// First byte read is always version
byte version = dis.readByte();
@@ -693,18 +760,51 @@
}
case CALLBACK:
{
+ String sessionId = dis.readUTF();
ClientDelivery dr = new ClientDelivery();
dr.read(dis);
+
+ // Recreate Callback.
+ MessagingMarshallable mm = new MessagingMarshallable(version, dr);
+ Callback callback = new Callback(mm);
+ InternalInvocation ii
+ = new InternalInvocation(InternalInvocation.HANDLECALLBACK, new Object[]{callback});
+ InvocationRequest request
+ = new InvocationRequest(sessionId, CallbackManager.JMS_CALLBACK_SUBSYSTEM,
+ ii, null, null, null);
- InvocationRequest request =
- new InvocationRequest(null, CallbackServerFactory.JMS_CALLBACK_SUBSYSTEM,
- new MessagingMarshallable(version, dr), null, null, null);
-
if (trace) { log.trace("read callback()"); }
return request;
}
+ case CALLBACK_LIST:
+ {
+ // Recreate ArrayList of Callbacks (for Callback polling).
+ String sessionId = dis.readUTF();
+ int size = dis.readInt();
+ ArrayList callbackList = new ArrayList(size);
+ for (int i = 0; i < size; i++)
+ {
+ // We don't use acknowledgeable push callbacks
+
+// String guid = dis.readUTF();
+// String listenerId = dis.readUTF();
+// String acks = dis.readUTF();
+ ClientDelivery delivery = new ClientDelivery();
+ delivery.read(dis);
+ MessagingMarshallable mm = new MessagingMarshallable(version, delivery);
+ Callback callback = new Callback(mm);
+// HashMap payload = new HashMap();
+// payload.put(ServerInvokerCallbackHandler.CALLBACK_ID, guid);
+// payload.put(Client.LISTENER_ID_KEY, listenerId);
+// payload.put(ServerInvokerCallbackHandler.REMOTING_ACKNOWLEDGES_PUSH_CALLBACKS, acks);
+// callback.setReturnPayload(payload);
+ callbackList.add(callback);
+ }
+
+ return new InvocationResponse(sessionId, callbackList, false, null);
+ }
default:
{
throw new IllegalStateException("Invalid format type " + formatType);
@@ -823,9 +923,7 @@
ois = new ObjectInputStream(is);
}
- Object obj = ois.readObject();
-
- return obj;
+ return ois.readObject();
}
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/util/XMLUtil.java
===================================================================
--- trunk/src/main/org/jboss/jms/util/XMLUtil.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/src/main/org/jboss/jms/util/XMLUtil.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -64,6 +64,7 @@
if (Node.CDATA_SECTION_NODE == type)
{
+ //return "<![CDATA[" + n.getNodeValue() + "]]>";
return n.getNodeValue();
}
Modified: trunk/tests/bin/.testrc.example
===================================================================
--- trunk/tests/bin/.testrc.example 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/bin/.testrc.example 2006-11-23 04:42:17 UTC (rev 1629)
@@ -14,7 +14,14 @@
#
TEST_DATABASE=hsqldb
#
+# The transport to run tests with. Valid values: "socket", "http". The value specified here
+# overrides the value specified in container.xml.
#
+TEST_REMOTING=socket
+
+
+#
+#
# util
#
#TARGET_CLASS=org.jboss.test.messaging.util.SelectiveIteratorTest
Modified: trunk/tests/bin/runtest
===================================================================
--- trunk/tests/bin/runtest 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/bin/runtest 2006-11-23 04:42:17 UTC (rev 1629)
@@ -21,6 +21,7 @@
ENV_TEST_DATABASE=$TEST_DATABASE
ENV_TEST_SERIALIZATION=$TEST_SERIALIZATION
+ENV_TEST_REMOTING=$TEST_REMOTING
if [ -z "$TARGET_CLASS" -a -f $reldir/.testrc ]; then
. $reldir/.testrc
@@ -34,10 +35,15 @@
if [ "$ENV_TEST_DATABASE" != "" ]; then
TEST_DATABASE=$ENV_TEST_DATABASE
fi
+
if [ "$ENV_TEST_SERIALIZATION" != "" ]; then
TEST_SERIALIZATION=$ENV_TEST_SERIALIZATION
fi
+if [ "$ENV_TEST_REMOTING" != "" ]; then
+ TEST_REMOTING=$ENV_TEST_REMOTING
+fi
+
#
# We should use the same test execution classpath as the ant <junit> task, so we run ant to get
# it from there.
@@ -94,14 +100,14 @@
shift
done
-JAVA_OPTS="-Xmx1024M $JAVA_OPTS -Dmodule.output=$reldir/../output $REMOTE_TEST -Dtest.database=$TEST_DATABASE -Dtest.serialization=$TEST_SERIALIZATION -Dbuild.lib=../../output/lib"
+JAVA_OPTS="-Xmx1024M $JAVA_OPTS -Dmodule.output=$reldir/../output $REMOTE_TEST -Dtest.database=$TEST_DATABASE -Dtest.serialization=$TEST_SERIALIZATION -Dtest.remoting=$TEST_REMOTING -Dbuild.lib=../../output/lib"
if [ "$TARGET_TEST" != "" ]; then
TARGET_TEST="-t $TARGET_TEST"
fi
if [ "$isRemote" = "true" ]; then
- export TEST_DATABASE TEST_SERIALIZATION
+ export TEST_DATABASE TEST_SERIALIZATION TEST_REMOTING
$reldir/start-rmi-server $REMOTE_DEBUG_FLAG -use-existent-test-classpath-file
fi
Modified: trunk/tests/bin/start-rmi-server
===================================================================
--- trunk/tests/bin/start-rmi-server 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/bin/start-rmi-server 2006-11-23 04:42:17 UTC (rev 1629)
@@ -45,6 +45,11 @@
JAVA_OPTS="$JAVA_OPTS -Dtest.serialization=$TEST_SERIALIZATION"
fi
+if [ "$TEST_REMOTING" != "" ]; then
+ JAVA_OPTS="$JAVA_OPTS -Dtest.remoting=$TEST_REMOTING"
+fi
+
+
if [ "$USE_EXISTENT_TEST_CLASSPATH_FILE" = "true" ]; then
CLASSPATH_FILE="$reldir/.test.execution.classpath"
else
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/build.xml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -71,6 +71,11 @@
<property name="clustering.tests.database" value="mysql"/>
<!--
+ Default remoting configuration (must be overrided by tasks that need it set otherwise).
+ -->
+ <property name="test.remoting" value="socket"/>
+
+ <!--
Project paths.
-->
@@ -188,6 +193,8 @@
<path refid="mysql.jdbc.driver.classpath"/>
<path refid="oracle.jdbc.driver.classpath"/>
<path refid="postgres.jdbc.driver.classpath"/>
+ <path refid="apache.tomcat.classpath"/>
+ <path refid="apache.logging.classpath"/>
</path>
<path id="stress.test.execution.classpath">
@@ -270,6 +277,7 @@
<sysproperty key="test.bind.address" value="${test.bind.address}"/>
<sysproperty key="test.database" value="${functional.tests.database}"/>
<sysproperty key="test.serialization" value="${functional.tests.serialization}"/>
+ <sysproperty key="test.remoting" value="${test.remoting}"/>
<!--
<jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=rmiserver"/>
-->
@@ -304,6 +312,7 @@
<sysproperty key="test.bind.address" value="${test.bind.address}"/>
<sysproperty key="test.database" value="${stress.tests.database}"/>
<sysproperty key="test.serialization" value="${stress.tests.serialization}"/>
+ <sysproperty key="test.remoting" value="${test.remoting}"/>
<!-- <jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=rmiserver"/> -->
<classpath refid="stress.test.execution.classpath"/>
</java>
@@ -330,12 +339,18 @@
<target name="tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
<antcall target="crash-tests"/>
<antcall target="invm-tests"/>
- <antcall target="remote-tests"/>
+ <antcall target="remote-tests"/> <!-- default remoting configuration (socket) -->
+ <antcall target="remote-tests">
+ <param name="test.remoting" value="http"/>
+ </antcall>
</target>
<target name="stress-tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
<antcall target="invm-stress-tests"/>
- <antcall target="remote-stress-tests"/>
+ <antcall target="remote-stress-tests"/> <!-- default remoting configuration (socket) -->
+ <antcall target="remote-stress-tests">
+ <param name="test.remoting" value="http"/>
+ </antcall>
</target>
<target name="invm-tests" depends="tests-jar, prepare-testdirs, clear-test-logs"
@@ -378,6 +393,7 @@
<exclude name="**/jms/MemLeakTest.class"/>
<exclude name="**/jms/manual/**/*Test.class"/>
<exclude name="**/jms/clustering/*Test.class"/>
+ <exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
</fileset>
</batchtest>
</junit>
@@ -505,7 +521,7 @@
<mkdir dir="${build.tests.reports}"/>
<echo message=""/>
- <echo message="Running remote tests, fork=${junit.fork}, junit.batchtest.fork=${junit.batchtest.fork}"/>
+ <echo message="Running remote tests, fork=${junit.fork}, junit.batchtest.fork=${junit.batchtest.fork}, remoting=${test.remoting}"/>
<echo message=""/>
<junit printsummary="${junit.printsummary}"
@@ -521,6 +537,7 @@
<sysproperty key="test.bind.address" value="${test.bind.address}"/>
<sysproperty key="test.database" value="${functional.tests.database}"/>
<sysproperty key="test.serialization" value="${functional.tests.serialization}"/>
+ <sysproperty key="test.remoting" value="${test.remoting}"/>
<jvmarg value="-Xmx512M"/>
<!--
<jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=antjunit"/>
@@ -541,9 +558,9 @@
usefile="${junit.formatter.usefile}" extension="-Remote.xml"/>
-->
- <sysproperty key="messaging-test-configuration" value="Remote"/>
+ <sysproperty key="messaging-test-configuration" value="Remote-${test.remoting}"/>
<formatter classname="org.jboss.test.messaging.tools.ant.XMLJUnitMultipleResultFormatter"
- usefile="${junit.formatter.usefile}" extension="-Remote.xml"/>
+ usefile="${junit.formatter.usefile}" extension="-Remote-${test.remoting}.xml"/>
<batchtest fork="${junit.batchtest.fork}"
todir="${junit.batchtest.todir}"
@@ -591,6 +608,7 @@
<sysproperty key="test.bind.address" value="${test.bind.address}"/>
<sysproperty key="test.database" value="${stress.tests.database}"/>
<sysproperty key="test.serialization" value="${stress.tests.serialization}"/>
+ <sysproperty key="test.remoting" value="${test.remoting}"/>
<jvmarg value="-Xmx512M"/>
<!-- <jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=unittest"/> -->
<classpath>
@@ -608,9 +626,9 @@
usefile="${junit.formatter.usefile}" extension="-Remote.xml"/>
-->
- <sysproperty key="messaging-test-configuration" value="StressRemote"/>
+ <sysproperty key="messaging-test-configuration" value="StressRemote-${test.remoting}"/>
<formatter classname="org.jboss.test.messaging.tools.ant.XMLJUnitMultipleResultFormatter"
- usefile="${junit.formatter.usefile}" extension="-StressRemote.xml"/>
+ usefile="${junit.formatter.usefile}" extension="-StressRemote-${test.remoting}.xml"/>
<batchtest fork="${junit.batchtest.fork}"
todir="${junit.batchtest.todir}"
@@ -725,42 +743,136 @@
description="Runs crash tests">
<antcall target="stop-rmi-server"/>
+
+ <!--
+ ClientCrashTest over "socket"
+ -->
+
<antcall target="start-rmi-server"/>
<antcall target="crash-test">
<param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashTest"/>
</antcall>
+ <!--
+ ClientCrashTest over "http"
+ -->
+
+ <antcall target="start-rmi-server">
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <antcall target="crash-test">
+ <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashTest"/>
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <!--
+ ClientCrashTwoConnectionsTest over "socket"
+ -->
+
<antcall target="start-rmi-server"/>
<antcall target="crash-test">
<param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashTwoConnectionsTest"/>
</antcall>
+ <!--
+ ClientCrashTwoConnectionsTest over "http"
+ -->
+
+ <antcall target="start-rmi-server">
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <antcall target="crash-test">
+ <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashTwoConnectionsTest"/>
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <!--
+ ClientCrashNegativeLeaseTest over "socket"
+ -->
+
<antcall target="start-rmi-server"/>
<antcall target="crash-test">
- <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashNegativeLeaseTest"/>
+ <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashNegativeLeaseTest"/>
</antcall>
+ <!--
+ ClientCrashNegativeLeaseTest over "http"
+ -->
+
+ <antcall target="start-rmi-server">
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <antcall target="crash-test">
+ <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashNegativeLeaseTest"/>
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <!--
+ ClientCrashZeroLeaseTest over "socket"
+ -->
+
<antcall target="start-rmi-server"/>
<antcall target="crash-test">
<param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashZeroLeaseTest"/>
</antcall>
+ <!--
+ ClientCrashZeroLeaseTest over "http"
+ -->
+
+ <antcall target="start-rmi-server">
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <antcall target="crash-test">
+ <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashZeroLeaseTest"/>
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <!--
+ ClientCrashLargeLeaseTest over "socket"
+ -->
+
<antcall target="start-rmi-server"/>
<antcall target="crash-test">
<param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashLargeLeaseTest"/>
</antcall>
+ <!--
+ ClientCrashLargeLeaseTest over "http"
+ -->
+
+ <antcall target="start-rmi-server">
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <antcall target="crash-test">
+ <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashLargeLeaseTest"/>
+ <param name="test.remoting" value="http"/>
+ </antcall>
+
+ <!--
+ CallbackFailureTest over "socket"
+ -->
+
<antcall target="start-rmi-server"/>
<antcall target="crash-test">
<param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.CallbackFailureTest"/>
</antcall>
+ <!--
+ CallbackFailureTest over "http" does not make sense
+ -->
+
</target>
@@ -779,6 +891,7 @@
<sysproperty key="test.bind.address" value="${test.bind.address}"/>
<sysproperty key="test.database" value="${functional.tests.database}"/>
<sysproperty key="test.serialization" value="${functional.tests.serialization}"/>
+ <sysproperty key="test.remoting" value="${test.remoting}"/>
<jvmarg value="-Xmx512M"/>
<!--
<jvmarg line="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_shmem,server=y,suspend=y,address=antjunit"/>
@@ -799,9 +912,9 @@
usefile="${junit.formatter.usefile}" extension="-Remote.xml"/>
-->
- <sysproperty key="messaging-test-configuration" value="Crash"/>
+ <sysproperty key="messaging-test-configuration" value="Crash-${test.remoting}"/>
<formatter classname="org.jboss.test.messaging.tools.ant.XMLJUnitMultipleResultFormatter"
- usefile="${junit.formatter.usefile}" extension="-Crash.xml"/>
+ usefile="${junit.formatter.usefile}" extension="-Crash-${test.remoting}.xml"/>
<test name="${crash.test.name}"
fork="true"
Modified: trunk/tests/etc/container.xml
===================================================================
--- trunk/tests/etc/container.xml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/etc/container.xml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -68,6 +68,11 @@
<serialization-type>jms</serialization-type>
+ <!--
+ Supported values: "socket", "http"
+ -->
+ <remoting-transport>socket</remoting-transport>
+
</container>
Modified: trunk/tests/smoke/build.xml
===================================================================
--- trunk/tests/smoke/build.xml 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/smoke/build.xml 2006-11-23 04:42:17 UTC (rev 1629)
@@ -29,6 +29,7 @@
<property name="run.queue.example" value="true"/>
<property name="run.topic.example" value="true"/>
<property name="run.mdb.example" value="true"/>
+ <property name="run.http.example" value="true"/>
<property name="run.stateless.example" value="true"/>
<property name="run.ejb3mdb.example" value="false"/>
<property name="run.secure-socket.example" value="true"/>
@@ -76,12 +77,13 @@
<!-- 4.0.0, 4.0.1 NOT SUPPORTED -->
+ <!--
+ Note on installer versions.
+ When installing versions to test via the JBoss installer, make sure call by value is *not*
+ selected in the installation process. The reason for testing installer versions is to test
+ the arrangement where JBoss is configured for all pass by reference
+ -->
- <!-- Note on installer versions.
- When installing versions to test via the JBoss installer, make sure call by value is *not*
- selected in the installation process. The reason for testing installer versions is to test
- the arrangement where JBoss is configured for all pass by reference -->
-
<antcall target="installation-test">
<param name="jboss.home" value="${jboss401sp1.home}"/>
<param name="run.secure-socket.example" value="false"/>
@@ -311,6 +313,12 @@
<param name="jboss.home" value="${jboss.home}"/>
</antcall>
+ <antcall target="run-example">
+ <param name="example.name" value="http"/>
+ <param name="example.queue.name" value="SmokeTestQueue"/>
+ <param name="jboss.home" value="${jboss.home}"/>
+ </antcall>
+
<!-- ==================================================== -->
<!-- -->
<!-- End of example list -->
@@ -451,6 +459,10 @@
<equals arg1="${example.name}" arg2="secure-socket"/>
<istrue value="${run.secure-socket.example}"/>
</and>
+ <and>
+ <equals arg1="${example.name}" arg2="http"/>
+ <istrue value="${run.http.example}"/>
+ </and>
</or>
</condition>
<antcall target="run-example-internal"/>
Modified: trunk/tests/smoke/smoke.properties.example
===================================================================
--- trunk/tests/smoke/smoke.properties.example 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/smoke/smoke.properties.example 2006-11-23 04:42:17 UTC (rev 1629)
@@ -4,17 +4,20 @@
#JBoss AS 4.0.0 and 4.0.1 are not supported
-jboss401sp1.home=C:\\dev\jboss-4.0.1sp1-src\\build\\output\\jboss-4.0.1sp1
-jboss402.home=C:\\dev\\jboss-4.0.2-src\\build\\output\\jboss-4.0.2
-jboss403SP1.home=C:\\dev\\jboss-4.0.3SP1-src\\build\\output\\jboss-4.0.3SP1
-jboss403SP1-installer.home=C:\\dev\\jboss-4.0.3SP1-installer
-jboss404GA.home=C:\\dev\\jboss-4.0.4.GA-src\\build\\output\\jboss-4.0.4.GA
-jboss404GA-installer.home=C:\\dev\\jboss-4.0.4.GA0-installer
-jboss405GA.home=C:\\dev\\jboss-4.0.5.GA-src\\build\\output\\jboss-4.0.5.GA
-jboss405GA-installer.home=C:\\dev\\jboss-4.0.5.GA-installer
-jboss405GAejb3.home=C:\\dev\\jboss-4.0.5.GA-installer-ejb3
+jboss401sp1.home=C:\\work\\src\\jboss-4.0.1sp1-src\\build\\output\\jboss-4.0.1sp1
+jboss402.home=C:\\work\\src\\jboss-4.0.2-src\\build\\output\\jboss-4.0.2
+jboss403.home=C:\\work\\src\\jboss-4.0.3-src\\build\\output\\jboss-4.0.3
+jboss403-installer.home=C:\\work\\jnlp\\jboss-4.0.3
+jboss403SP1.home=C:\\work\\src\\jboss-4.0.3SP1-src\\build\\output\\jboss-4.0.3SP1
+jboss403SP1-installer.home=C:\\work\\jnlp\\jboss-4.0.3SP1
+jboss404GA.home=C:\\work\\src\\jboss-4.0.4.GA-src\\build\\output\\jboss-4.0.4.GA
+jboss404GA-installer.home=C:\\work\\jnlp\\jboss-4.0.4.GA
+jboss405GA.home=C:\\work\\src\\jboss-4.0.5.GA-src\\build\\output\\jboss-4.0.5.GA
+jboss405GA-installer.home=C:\\work\\jnlp\\jboss-4.0.5.GA
+jboss500Alpha.home=C:\\work\\src\\cvs\\jboss-head\\build\\output\\jboss-5.0.0.Alpha
+
#
# Compatibility test configuration
#
-jboss.compatibility.home=C:\\work\\messaging-compatibility\\jboss-4.0.4.GA
+jboss.compatibility.home=C:\\work\\messaging-compatibility\\jboss-4.0.5.GA
Deleted: trunk/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/src/org/jboss/test/messaging/jms/CallbackServerFactoryTest.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -1,115 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import org.jboss.jms.client.remoting.CallbackServerFactory;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.transport.Connector;
-import org.jboss.test.messaging.MessagingTestCase;
-
-/**
- *
- * A CallbackServerFactoryTest.
- *
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision$
- *
- * $Id$
- */
-public class CallbackServerFactoryTest extends MessagingTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public CallbackServerFactoryTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void testCallbackServer() throws Exception
- {
- String locatorURI1 = "socket://localhost:1234";
-
- String locatorURI2 = "sslsocket://localhost:4321";
-
- InvokerLocator locator1 = new InvokerLocator(locatorURI1);
-
- InvokerLocator locator2 = new InvokerLocator(locatorURI2);
-
- Connector server1 = CallbackServerFactory.instance.getCallbackServer(locator1);
-
- assertTrue(server1.isStarted());
-
- assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator1.getProtocol()));
-
- Connector server2 = CallbackServerFactory.instance.getCallbackServer(locator1);
-
- assertTrue(server1 == server2);
-
- Connector server3 = CallbackServerFactory.instance.getCallbackServer(locator2);
-
- assertTrue(server3.isStarted());
-
- assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator2.getProtocol()));
-
- Connector server4 = CallbackServerFactory.instance.getCallbackServer(locator2);
-
- assertTrue(server3 == server4);
-
- assertFalse(server1 == server3);
-
-
- CallbackServerFactory.instance.stopCallbackServer(locator1.getProtocol());
-
- assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator1.getProtocol()));
-
- CallbackServerFactory.instance.stopCallbackServer(locator2.getProtocol());
-
- assertTrue(CallbackServerFactory.instance.containsCallbackServer(locator2.getProtocol()));
-
- CallbackServerFactory.instance.stopCallbackServer(locator1.getProtocol());
-
- assertFalse(CallbackServerFactory.instance.containsCallbackServer(locator1.getProtocol()));
-
- CallbackServerFactory.instance.stopCallbackServer(locator2.getProtocol());
-
- assertFalse(CallbackServerFactory.instance.containsCallbackServer(locator2.getProtocol()));
-
- assertFalse(server1.isStarted());
-
- assertFalse(server3.isStarted());
-
-
- }
-
-
- // Inner classes -------------------------------------------------
-
-}
-
Copied: trunk/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java (from rev 1628, branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/ConsumerClosedTest.java)
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -1125,10 +1125,16 @@
MessageConsumer cons1 = sess.createConsumer(queue);
+ log.trace("closing cons1");
+
cons1.close();
+ log.trace("cons1 closed");
+
MessageConsumer cons2 = sess.createConsumer(queue);
+ log.trace("cons2 created");
+
Message r1 = cons2.receive();
Message r2 = cons2.receive();
Message r3 = cons2.receive();
Copied: trunk/tests/src/org/jboss/test/messaging/jms/RemotingConnectionConfigurationTest.java (from rev 1628, branches/Branch_HTTP_Experiment/tests/src/org/jboss/test/messaging/jms/RemotingConnectionConfigurationTest.java)
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -55,6 +55,8 @@
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvocationResponse;
import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.callback.Callback;
+import org.jboss.remoting.invocation.InternalInvocation;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.jms.message.MessageTest;
import org.jboss.util.id.GUID;
@@ -1115,7 +1117,7 @@
MessagingMarshallable mm = new MessagingMarshallable((byte)77, dr);
- InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+ InvocationRequest ir = new InvocationRequest("dummySessionId", null, mm, null, null, null);
wf.write(ir, oos);
@@ -1133,6 +1135,9 @@
//Second byte should be CALLBACK
assertEquals(JMSWireFormat.CALLBACK, dis.readByte());
+ //Next should be sessionID
+ assertEquals("dummySessionId", dis.readUTF());
+
//Next int should be server id
assertEquals(76543, dis.readInt());
@@ -1200,8 +1205,16 @@
InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
- mm = (MessagingMarshallable)ir2.getParameter();
+ InternalInvocation ii = (InternalInvocation) ir2.getParameter();
+ Object[] parameters = ii.getParameters();
+
+ assertNotNull(parameters);
+
+ Callback callback = (Callback) parameters[0];
+
+ mm = (MessagingMarshallable)callback.getParameter();
+
assertEquals(77, mm.getVersion());
ClientDelivery dr2 = (ClientDelivery)mm.getLoad();
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -127,6 +127,8 @@
public static String USER_TRANSACTION_JNDI_NAME = "UserTransaction";
public static String JCA_JMS_CONNECTION_FACTORY_JNDI_NAME = "java:/JCAConnectionFactory";
+ public static long HTTP_CONNECTOR_CALLBACK_POLL_PERIOD = 102;
+
static
{
try
@@ -182,15 +184,14 @@
private boolean transaction;
private boolean database;
private boolean jca;
- private boolean remotingSocket;
- private boolean remotingMultiplex;
+ private boolean remoting;
private boolean security;
private List toUnbindAtExit;
private String ipAddressOrHostName;
- //There may be many service containers on the same machine, so we need to distinguish them
- //so we don't start up multiple servers with services running on the same port
+ // There may be many service containers on the same machine, so we need to distinguish them
+ // so we don't start up multiple servers with services running on the same port
private int serverIndex;
// Static --------------------------------------------------------
@@ -365,14 +366,12 @@
DEFAULTDS_MANAGED_CONNECTION_POOL_OBJECT_NAME);
startWrapperDataSourceService();
}
- if (remotingSocket)
+
+ if (remoting)
{
- startRemoting(false);
+ startRemoting();
}
- if (remotingMultiplex)
- {
- startRemoting(true);
- }
+
if (security)
{
startSecurityManager();
@@ -395,9 +394,10 @@
loadJNDIContexts();
+ String transport = config.getRemotingTransport();
log.info("remoting = \"" +
- (remotingSocket ? "socket" : (remotingMultiplex ? "multiplex" : "disabled")) + "\", " +
- "serialization = \"" + "jms" + "\", " +
+ (remoting ? transport : "disabled") + "\", " +
+ "serialization = \"" + config.getSerializationType() + "\", " +
"database = \"" + getDatabaseType() + "\"");
log.debug(this + " started");
}
@@ -1116,39 +1116,45 @@
stopService(managedConnFactoryObjectName);
}
- private void startRemoting(boolean multiplex) throws Exception
+ private void startRemoting() throws Exception
{
SerializationStreamFactory.setManagerClassName(
"jms", "org.jboss.jms.server.remoting.MessagingSerializationManager");
RemotingJMXWrapper mbean;
+ // TODO - use remoting-service.xml parameters, not these ...
+
//String serializationType = config.getSerializationType();
-
String serializationType = "jms";
+ String transport = config.getRemotingTransport();
String params = "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
"unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
"serializationtype=" + serializationType + "&" +
"dataType=jms&" +
- "timeout=0&" +
"socket.check_connection=false&" +
- "leasePeriod=20000";
+ "clientLeasePeriod=20000&" +
+ "callbackStore=org.jboss.remoting.callback.BlockingCallbackStore";
- int portNumber = 9111 + serverIndex;
-
- String locatorURI;
- if (multiplex)
+ // specific parameters per transport
+
+ if ("http".equals(transport))
{
- locatorURI = "multiplex://" + ipAddressOrHostName + ":" + portNumber + params;
+ params += "&callbackPollPeriod=" + HTTP_CONNECTOR_CALLBACK_POLL_PERIOD;
}
else
{
- locatorURI = "socket://" + ipAddressOrHostName + ":" + portNumber + params;
+ params += "timeout=0&";
}
- log.debug("Using the following locator uri:" + locatorURI);
+// int freePort = PortUtil.findFreePort(ipAddressOrHostName);
+ int freePort = 9111;
+ String locatorURI = transport + "://" + ipAddressOrHostName + ":" + freePort + params;
+
+ log.debug("Using locator uri: " + locatorURI);
+
InvokerLocator locator = new InvokerLocator(locatorURI);
log.debug("Started remoting connector on uri:" + locator.getLocatorURI());
@@ -1299,7 +1305,7 @@
transaction = true;
database = true;
jca = true;
- remotingSocket = true;
+ remoting = true;
security = true;
}
else if ("transaction".equals(tok))
@@ -1328,22 +1334,13 @@
}
else if ("remoting".equals(tok))
{
- remotingSocket = true;
+ remoting = true;
if (minus)
{
- remotingSocket = false;
+ remoting = false;
}
}
- else if ("remoting-multiplex".equals(tok))
- {
- remotingMultiplex = true;
- if (minus)
- {
- remotingMultiplex = false;
- }
-
- }
else if ("security".equals(tok))
{
security = true;
@@ -1357,7 +1354,7 @@
transaction = false;
database = false;
jca = false;
- remotingSocket = false;
+ remoting = false;
security = false;
}
else
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainerConfiguration.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -75,6 +75,7 @@
private String database;
private Map dbConfigurations;
private String serializationType;
+ private String remotingTransport;
// Constructors --------------------------------------------------
@@ -140,6 +141,14 @@
{
return serializationType;
}
+
+ /**
+ * @return the transport the container wants the Remoting Connector to use.
+ */
+ public String getRemotingTransport()
+ {
+ return remotingTransport;
+ }
// Package protected ---------------------------------------------
@@ -152,6 +161,7 @@
Reader reader = new InputStreamReader(is);
String currentDatabase = null;
String currentSerializationType = null;
+ String currentRemotingTransport = null;
try
{
@@ -190,6 +200,10 @@
{
currentSerializationType = XMLUtil.getTextContent(n);
}
+ else if ("remoting-transport".equals(name))
+ {
+ currentRemotingTransport = XMLUtil.getTextContent(n);
+ }
else
{
throw new Exception("Unexpected child <" + name + "> of node " +
@@ -200,6 +214,7 @@
setCurrentDatabase(currentDatabase);
setCurrentSerializationType(currentSerializationType);
+ setCurrentRemotingTransport(currentRemotingTransport);
}
finally
{
@@ -232,6 +247,16 @@
serializationType = xmlConfigSerializationType;
}
}
+
+ private void setCurrentRemotingTransport(String xmlRemotingTransport)
+ {
+ remotingTransport = System.getProperty("test.remoting");
+ if (remotingTransport == null)
+ {
+ remotingTransport = xmlRemotingTransport;
+ }
+ }
+
private void validate() throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-11-23 03:35:08 UTC (rev 1628)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-11-23 04:42:17 UTC (rev 1629)
@@ -264,8 +264,8 @@
"server/default/deploy/" + databaseType + "-persistence-service.xml";
}
- log.info("********* LOADING CONFIG FILE: " + persistenceConfigFile);
-
+ log.info("Loading persistence configuration file " + persistenceConfigFile);
+
URL persistenceConfigFileURL = getClass().getClassLoader().getResource(persistenceConfigFile);
if (persistenceConfigFileURL == null)
{
More information about the jboss-cvs-commits
mailing list