[jboss-cvs] JBoss Messaging SVN: r3385 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/client/delegate and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 29 08:27:34 EST 2007
Author: jmesnil
Date: 2007-11-29 08:27:33 -0500 (Thu, 29 Nov 2007)
New Revision: 3385
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java
Removed:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/wireformat/
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* added packets and codec to add/remove connection factory callbacks
* deleted JBoss Remoting wireformat in org.jboss.jms.wireformat
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.client.delegate;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETTOPOLOGY;
+
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
@@ -30,18 +32,21 @@
import org.jboss.jms.client.container.JMSClientVMIdentifier;
import org.jboss.jms.client.plugin.LoadBalancingPolicy;
-import org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
import org.jboss.jms.delegate.ConnectionFactoryDelegate;
import org.jboss.jms.delegate.CreateConnectionResult;
import org.jboss.jms.delegate.IDBlock;
import org.jboss.jms.delegate.TopologyResult;
import org.jboss.jms.exception.MessagingNetworkFailureException;
-import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
-import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.GetTopologyRequest;
import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
import org.jboss.messaging.util.Version;
import org.jboss.messaging.util.WeakHashSet;
@@ -122,18 +127,43 @@
}
}
- private void addCallback(ClientConnectionFactoryDelegate delegate) throws Throwable
+ private void addCallback(final ClientConnectionFactoryDelegate delegate) throws Throwable
{
- remoting.getCallbackManager().setConnectionfactoryCallbackHandler(new ConnectionFactoryCallbackHandler(this, remoting));
+ PacketDispatcher.client.register(new PacketHandler() {
- ConnectionFactoryAddCallbackRequest request =
- new ConnectionFactoryAddCallbackRequest (JMSClientVMIdentifier.instance,
- remoting.getRemotingClient().getSessionId(),
- delegate.getID(),
- Version.instance().getProviderIncrementingVersion());
+ public String getID()
+ {
+ return delegate.getID();
+ }
- remoting.getRemotingClient().invoke(request, null);
+ public void handle(AbstractPacket packet, PacketSender sender)
+ {
+ PacketType type = packet.getType();
+ if (type == RESP_GETTOPOLOGY)
+ {
+ GetTopologyResponse response = (GetTopologyResponse) packet;
+ TopologyResult topology = response.getTopology();
+ updateFailoverInfo(topology.getDelegates(), topology.getFailoverMap());
+ } else
+ {
+ System.err.println("ClientClusteredConnectionFactoryDelegate.handle() unhandled packet: " + packet);
+ }
+ }
+ });
+
+ UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getNewRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, true);
+ sendOneWay(remoting.getNewRemotingClient(), delegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
+ // remoting.getCallbackManager().setConnectionfactoryCallbackHandler(new ConnectionFactoryCallbackHandler(this, remoting));
+
+// ConnectionFactoryAddCallbackRequest request =
+// new ConnectionFactoryAddCallbackRequest (JMSClientVMIdentifier.instance,
+// remoting.getRemotingClient().getSessionId(),
+// delegate.getID(),
+// Version.instance().getProviderIncrementingVersion());
+//
+// remoting.getRemotingClient().invoke(request, null);
+
}
private void addShutdownHook()
@@ -143,13 +173,18 @@
private void removeCallback() throws Throwable
{
- ConnectionFactoryRemoveCallbackRequest request =
- new ConnectionFactoryRemoveCallbackRequest (JMSClientVMIdentifier.instance,
- remoting.getRemotingClient().getSessionId(),
- currentDelegate.getID(),
- Version.instance().getProviderIncrementingVersion());
+ PacketDispatcher.client.unregister(currentDelegate.getID());
+
+ UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getNewRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, false);
+ sendOneWay(remoting.getNewRemotingClient(), currentDelegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
- remoting.getRemotingClient().invoke(request, null);
+// ConnectionFactoryRemoveCallbackRequest request =
+// new ConnectionFactoryRemoveCallbackRequest (JMSClientVMIdentifier.instance,
+// remoting.getRemotingClient().getSessionId(),
+// currentDelegate.getID(),
+// Version.instance().getProviderIncrementingVersion());
+//
+// remoting.getRemotingClient().invoke(request, null);
}
protected void finalize() throws Throwable
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -336,7 +336,6 @@
try
{
- System.err.println("### " + serverLocatorURI);
InvokerLocator locator = new InvokerLocator(serverLocatorURI);
client = new org.jboss.messaging.core.remoting.Client();
client.connect(locator.getHost(), locator.getPort() + 1000, TCP);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -35,6 +35,7 @@
import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.jms.exception.MessagingNetworkFailureException;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.Client;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.util.Streamable;
@@ -155,15 +156,21 @@
// Protected ------------------------------------------------------------------------------------
- protected void sendOneWay(AbstractPacket packet) throws JMSException
+ protected void sendOneWay(AbstractPacket packet)
{
- assert newClient != null;
+ sendOneWay(newClient, id, version, packet);
+ }
+
+ protected static void sendOneWay(Client client, String targetID, byte version, AbstractPacket packet)
+ {
+ assert client != null;
+ assertValidID(targetID);
assert packet != null;
packet.setVersion(version);
- packet.setTargetID(id);
+ packet.setTargetID(targetID);
- newClient.sendOneWay(packet);
+ client.sendOneWay(packet);
}
protected AbstractPacket sendBlocking(AbstractPacket request) throws JMSException
@@ -171,7 +178,7 @@
return sendBlocking(newClient, id, version, request);
}
- protected static AbstractPacket sendBlocking(org.jboss.messaging.core.remoting.Client client, String targetID, byte version, AbstractPacket request) throws JMSException
+ protected static AbstractPacket sendBlocking(Client client, String targetID, byte version, AbstractPacket request) throws JMSException
{
assert client != null;
assertValidID(targetID);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -24,13 +24,7 @@
import java.util.Map;
import org.jboss.jms.client.container.ClientConsumer;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.wireformat.ClientDelivery;
-import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.contract.Message;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.HandleCallbackException;
import org.jboss.remoting.callback.InvokerCallbackHandler;
@@ -78,57 +72,7 @@
public void handleCallback(Callback callback) throws HandleCallbackException
{
- Object parameter = callback.getParameter();
-
- if (parameter instanceof ClientDelivery)
- {
- ClientDelivery dr = (ClientDelivery)parameter;
-
- Message msg = dr.getMessage();
-
- MessageProxy proxy = JBossMessage.
- createThinDelegate(dr.getDeliveryId(), (JBossMessage)msg, dr.getDeliveryCount());
-
- ClientConsumer handler =
- (ClientConsumer)callbackHandlers.get(dr.getConsumerId());
-
- if (handler == null)
- {
- // This should never happen since we wait for all deliveries to arrive before closing
- // the consumer
-
- throw new IllegalStateException(this + " callback handler not found, message arrived after consumer is closed.");
- }
-
- try
- {
- handler.handleMessage(proxy);
- }
- catch (Exception e)
- {
- log.error("Failed to handle message", e);
- throw new HandleCallbackException(e.getMessage(), e);
- }
- }
- else if (parameter instanceof ConnectionFactoryUpdate)
- {
- if (connectionfactoryCallbackHandler == null)
- {
- log.warn("ConnectionFactoryUpdate was received but there is no callbackHandler set");
- }
- else
- {
- ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)parameter;
-
- if (trace) { log.trace(this + " receiving cluster view change " + viewChange); }
-
- connectionfactoryCallbackHandler.handleMessage(viewChange);
- }
- }
- else
- {
- throw new HandleCallbackException("Unknow callback type: " + callback);
- }
+ throw new IllegalStateException("JBoss Remoting must no longer be used");
}
// Public ---------------------------------------------------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -22,12 +22,12 @@
package org.jboss.jms.client.remoting;
+import java.lang.ref.WeakReference;
+
import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
import org.jboss.logging.Logger;
-import org.jboss.remoting.ConnectionListener;
import org.jboss.remoting.Client;
-import java.lang.ref.WeakReference;
+import org.jboss.remoting.ConnectionListener;
/**
* This class will manage ConnectionFactory messages updates
@@ -66,17 +66,7 @@
public void handleMessage(Object message)
{
- if (trace) { log.trace(this + " handling " + message); }
-
- ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)message;
-
- ClientClusteredConnectionFactoryDelegate delegate = delegateRef.get();
-
- if (delegate!=null)
- {
- delegate.updateFailoverInfo(viewChange.getTopology().getDelegates(),
- viewChange.getTopology().getFailoverMap());
- }
+ throw new IllegalStateException("JBoss Remoting must no longer be used");
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -22,15 +22,15 @@
package org.jboss.jms.delegate;
-import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
-import org.jboss.jms.wireformat.PacketSupport;
-import org.jboss.messaging.util.Streamable;
-import java.util.Map;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.util.HashMap;
import java.util.Iterator;
-import java.io.DataOutputStream;
-import java.io.DataInputStream;
+import java.util.Map;
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.messaging.util.Streamable;
+
/**
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
* @version <tt>$Revision$</tt>
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -25,6 +25,7 @@
import org.jboss.jms.delegate.ConnectionEndpoint;
import org.jboss.messaging.core.contract.MessagingComponent;
+import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
@@ -62,10 +63,17 @@
void addConnectionFactoryCallback(String uniqueName, String JVMID, String remotingSessionID, ServerInvokerCallbackHandler handler);
+ void addConnectionFactoryCallback(String uniqueName, String VMID,
+ String remotingSessionID, PacketSender sender);
+
void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
-
+
+ void removeConnectionFactoryCallback(String uniqueName, String vmid, PacketSender sender);
+
ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName);
+ PacketSender[] getConnectionFactorySenders(String uniqueName);
+
/**
* @param clientToServer - true if the failure has been detected on a direct connection from
* client to this server, false if the failure has been detected while trying to send a
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -46,7 +46,6 @@
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
-import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.ClusterNotification;
import org.jboss.messaging.core.contract.ClusterNotificationListener;
@@ -228,8 +227,6 @@
// Registering with the dispatcher should always be the last thing otherwise a client could
// use a partially initialised object
- Dispatcher.instance.registerTarget(id, advised);
-
PacketDispatcher.server.register(advised.new ConnectionFactoryAdvisedPacketHandler(id));
// Replicate the change - we will ignore this locally
@@ -280,8 +277,6 @@
}
}
- Dispatcher.instance.unregisterTarget(endpoint.getID(), endpoint);
-
PacketDispatcher.server.unregister(endpoint.getID());
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -39,6 +39,7 @@
import org.jboss.messaging.core.contract.ClusterNotification;
import org.jboss.messaging.core.contract.ClusterNotificationListener;
import org.jboss.messaging.core.contract.Replicator;
+import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.Util;
import org.jboss.remoting.Client;
@@ -207,18 +208,35 @@
getCFInfo(uniqueName).addClient(JVMID, handler);
}
+ public void addConnectionFactoryCallback(String uniqueName, String vmID,
+ String remotingSessionID, PacketSender sender)
+ {
+ remotingSessions.put(remotingSessionID, vmID);
+ getCFInfo(uniqueName).addClient(vmID, sender);
+ }
/** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
public synchronized void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
{
getCFInfo(uniqueName).removeHandler(JVMID, handler);
}
+ public synchronized void removeConnectionFactoryCallback(String uniqueName, String vmid,
+ PacketSender sender)
+ {
+ getCFInfo(uniqueName).removeSender(vmid, sender);
+ }
+
/** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
public synchronized ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName)
{
return getCFInfo(uniqueName).getAllHandlers();
}
+ public synchronized PacketSender[] getConnectionFactorySenders(String uniqueName)
+ {
+ return getCFInfo(uniqueName).getAllSenders();
+ }
+
// ClusterNotificationListener implementation ---------------------------------------------------
@@ -416,6 +434,8 @@
String uniqueName;
Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<ServerInvokerCallbackHandler>> clientHandlersByVM;
ConcurrentHashSet<ServerInvokerCallbackHandler> clientHandlers;
+ Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<PacketSender>> clientSendersByVM;
+ ConcurrentHashSet<PacketSender> clientSenders;
public ConnectionFactoryCallbackInformation(String uniqueName)
@@ -423,14 +443,22 @@
this.uniqueName = uniqueName;
this.clientHandlersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<ServerInvokerCallbackHandler>>();
this.clientHandlers = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
+ this.clientSendersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<PacketSender>>();
+ this.clientSenders = new ConcurrentHashSet<PacketSender>();
}
+
public void addClient(String vmID, ServerInvokerCallbackHandler handler)
{
clientHandlers.add(handler);
getHandlersList(vmID).add(handler);
}
+ public void addClient(String vmID, PacketSender sender)
+ {
+ clientSenders.add(sender);
+ getSendersList(vmID).add(sender);
+ }
public ServerInvokerCallbackHandler[] getAllHandlers(String vmID)
{
Set<ServerInvokerCallbackHandler> list = getHandlersList(vmID);
@@ -438,17 +466,34 @@
return (ServerInvokerCallbackHandler[])list.toArray(array);
}
+ public PacketSender[] getAllSenders(String vmID)
+ {
+ Set<PacketSender> list = getSendersList(vmID);
+ return (PacketSender[]) list.toArray(new PacketSender[list.size()]);
+ }
+
public ServerInvokerCallbackHandler[] getAllHandlers()
{
ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[clientHandlers.size()];
return (ServerInvokerCallbackHandler[])clientHandlers.toArray(array);
}
+
+ public PacketSender[] getAllSenders()
+ {
+ return (PacketSender[]) clientSenders.toArray(new PacketSender[clientSenders.size()]);
+ }
public void removeHandler(String vmID, ServerInvokerCallbackHandler handler)
{
clientHandlers.remove(handler);
getHandlersList(vmID).remove(handler);
}
+
+ public void removeSender(String vmID, PacketSender sender)
+ {
+ clientSenders.remove(sender);
+ getSendersList(vmID).remove(sender);
+ }
private ConcurrentHashSet<ServerInvokerCallbackHandler> getHandlersList(String vmID)
{
@@ -461,6 +506,18 @@
}
return perVMList;
}
+
+ private ConcurrentHashSet<PacketSender> getSendersList(String vmID)
+ {
+ ConcurrentHashSet<PacketSender> perVMList = clientSendersByVM.get(vmID);
+ if (perVMList == null)
+ {
+ perVMList = new ConcurrentHashSet<PacketSender>();
+ clientSendersByVM.put(vmID, perVMList);
+ perVMList = clientSendersByVM.get(vmID);
+ }
+ return perVMList;
+ }
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -38,7 +38,6 @@
import org.jboss.jms.exception.MessagingJMSException;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Channel;
import org.jboss.messaging.core.contract.Filter;
@@ -249,8 +248,6 @@
iterator = null;
- Dispatcher.instance.unregisterTarget(id, this);
-
PacketDispatcher.server.unregister(id);
closed = true;
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -52,7 +52,6 @@
import org.jboss.jms.tx.MessagingXid;
import org.jboss.jms.tx.TransactionRequest;
import org.jboss.jms.tx.ClientTransaction.SessionTxState;
-import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.jms.wireformat.JMSWireFormat;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
@@ -266,8 +265,6 @@
serverPeer.addSession(sessionID, ep);
- Dispatcher.instance.registerTarget(sessionID, sessionAdvised);
-
PacketDispatcher.server.register(advised.new SessionAdvisedPacketHandler(sessionID));
log.trace("created and registered " + ep);
@@ -424,8 +421,6 @@
cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
- Dispatcher.instance.unregisterTarget(id, this);
-
PacketDispatcher.server.unregister(id);
closed = true;
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -34,12 +34,12 @@
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.server.connectionfactory.JNDIBindings;
import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
-import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
-import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
import org.jboss.messaging.util.ExceptionUtil;
-import org.jboss.remoting.callback.Callback;
+import org.jboss.messaging.util.Version;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
@@ -260,8 +260,6 @@
connAdvised = new ConnectionAdvised(endpoint);
}
- Dispatcher.instance.registerTarget(connectionID, connAdvised);
-
PacketDispatcher.server.register(connAdvised.new ConnectionAdvisedPacketHandler(connectionID));
log.trace("created and registered " + endpoint);
@@ -292,6 +290,13 @@
log.debug("Adding callbackHandler on ConnectionFactory");
serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, remotingSessionID, callbackHandler);
}
+
+ public void addSender(String VMID, String remotingSessionID,
+ PacketSender sender) throws JMSException
+ {
+ log.debug("Adding PacketSender on ConnectionFactory");
+ serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, remotingSessionID, sender);
+ }
public void removeCallback(String VMID, String remotingSessionID,
ServerInvokerCallbackHandler callbackHandler) throws JMSException
@@ -299,6 +304,13 @@
log.debug("Removing callbackHandler on ConnectionFactory");
serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
}
+
+ public void removeSender(String VMID, String remotingSessionID,
+ PacketSender sender) throws JMSException
+ {
+ log.debug("Removing PacketSender on ConnectionFactory");
+ serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, sender);
+ }
public TopologyResult getTopology() throws JMSException
{
@@ -336,16 +348,26 @@
ServerInvokerCallbackHandler[] clientFactoriesToUpdate = serverPeer.getConnectionManager().getConnectionFactoryCallback(this.uniqueName);
log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + clientFactoriesToUpdate.length);
- ConnectionFactoryUpdate message =
- new ConnectionFactoryUpdate(uniqueName, delegates, failoverMap);
-
- Callback callback = new Callback(message);
-
- for (ServerInvokerCallbackHandler o: clientFactoriesToUpdate)
+ PacketSender[] senders = serverPeer.getConnectionManager().getConnectionFactorySenders(uniqueName);
+ GetTopologyResponse packet = new GetTopologyResponse(getTopology());
+ packet.setVersion(Version.instance().getProviderIncrementingVersion());
+ packet.setTargetID(id);
+
+ for (PacketSender sender : senders)
{
- log.debug("Updating CF on callback " + o);
- o.handleCallbackOneway(callback);
+ sender.send(packet);
}
+
+// ConnectionFactoryUpdate message =
+// new ConnectionFactoryUpdate(uniqueName, delegates, failoverMap);
+//
+// Callback callback = new Callback(message);
+//
+// for (ServerInvokerCallbackHandler o: clientFactoriesToUpdate)
+// {
+// log.debug("Updating CF on callback " + o);
+// o.handleCallbackOneway(callback);
+// }
}
public void updateTopology(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -38,7 +38,6 @@
import org.jboss.jms.server.destination.TopicService;
import org.jboss.jms.server.messagecounter.MessageCounter;
import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
@@ -50,6 +49,7 @@
import org.jboss.messaging.core.contract.Replicator;
import org.jboss.messaging.core.impl.SimpleDelivery;
import org.jboss.messaging.core.impl.tx.Transaction;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.PacketHandler;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -528,8 +528,8 @@
messageQueue.getLocalDistributor().remove(this);
}
- Dispatcher.instance.unregisterTarget(id, this);
-
+ PacketDispatcher.server.unregister(id);
+
// If this is a consumer of a non durable subscription then we want to unbind the
// subscription and delete all its data.
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -62,7 +62,6 @@
import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
import org.jboss.jms.server.messagecounter.MessageCounter;
import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.Channel;
@@ -1239,8 +1238,8 @@
sp.removeSession(id);
- Dispatcher.instance.unregisterTarget(id, this);
-
+ PacketDispatcher.server.unregister(id);
+
closed = true;
}
@@ -1836,8 +1835,6 @@
advised = new ConsumerAdvised(ep);
}
- Dispatcher.instance.registerTarget(consumerID, advised);
-
PacketDispatcher.server.register(ep.new ServerConsumerEndpointPacketHandler());
ClientConsumerDelegate stub =
@@ -2154,8 +2151,6 @@
advised = new ConsumerAdvised(ep);
}
- Dispatcher.instance.registerTarget(consumerID, advised);
-
PacketDispatcher.server.register(ep.new ServerConsumerEndpointPacketHandler());
ClientConsumerDelegate stub =
@@ -2223,8 +2218,6 @@
advised = new BrowserAdvised(ep);
}
- Dispatcher.instance.registerTarget(browserID, advised);
-
PacketDispatcher.server.register(ep.new ServerBrowserEndpointHandler());
ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -21,6 +21,7 @@
*/
package org.jboss.jms.server.endpoint.advised;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTAOPSTACK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETTOPOLOGY;
@@ -43,6 +44,7 @@
import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
/**
@@ -98,6 +100,13 @@
((ServerConnectionFactoryEndpoint)endpoint).addCallback(vmID, remotingSessionID,
callbackHandler);
}
+
+ public void addSender(String vmID, String remotingSessionID,
+ PacketSender sender) throws JMSException
+ {
+ ((ServerConnectionFactoryEndpoint)endpoint).addSender(vmID, remotingSessionID,
+ sender);
+ }
public void removeCallback(String vmID, String remotingSessionID,
ServerInvokerCallbackHandler callbackHandler) throws JMSException
@@ -105,6 +114,13 @@
((ServerConnectionFactoryEndpoint)endpoint).removeCallback(vmID, remotingSessionID,
callbackHandler);
}
+
+ public void removeSender(String vmID, String remotingSessionID,
+ PacketSender sender) throws JMSException
+ {
+ ((ServerConnectionFactoryEndpoint)endpoint).removeSender(vmID, remotingSessionID,
+ sender);
+ }
public TopologyResult getTopology() throws JMSException
{
@@ -194,6 +210,15 @@
TopologyResult topology = getTopology();
response = new GetTopologyResponse(topology);
+ } else if (type == MSG_UPDATECALLBACK)
+ {
+ UpdateCallbackMessage message = (UpdateCallbackMessage) packet;
+ if (message.isAdd())
+ {
+ addSender(message.getClientVMID(), message.getRemotingSessionID(), sender);
+ } else {
+ removeSender(message.getClientVMID(), message.getRemotingSessionID(), sender);
+ }
} else
{
response = new JMSExceptionMessage(new MessagingJMSException(
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -28,11 +28,7 @@
import javax.management.MBeanServer;
-import org.jboss.jms.exception.MessagingShutdownException;
-import org.jboss.jms.wireformat.RequestSupport;
-import org.jboss.jms.wireformat.CallbackRequestSupport;
import org.jboss.logging.Logger;
-import org.jboss.messaging.util.Util;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.ServerInvocationHandler;
import org.jboss.remoting.ServerInvoker;
@@ -123,30 +119,7 @@
public Object invoke(InvocationRequest invocation) throws Throwable
{
- if (trace) { log.trace("invoking " + invocation); }
-
- invokeLock.readLock().acquire();
- try
- {
- if (closed)
- {
- throw new MessagingShutdownException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
- }
-
- RequestSupport request = (RequestSupport)invocation.getParameter();
-
- if (request instanceof CallbackRequestSupport)
- {
- performCallbackRequest(request);
- }
-
- return request.serverInvoke();
- }
- finally
- {
- invokeLock.readLock().release();
- }
-
+ throw new IllegalStateException("JBoss Remoting must no longer be used");
}
public void addListener(InvokerCallbackHandler callbackHandler)
@@ -217,31 +190,5 @@
// Private --------------------------------------------------------------------------------------
- private void performCallbackRequest(RequestSupport request)
- {
- CallbackRequestSupport cReq =
- (CallbackRequestSupport)request;
-
- String remotingSessionId = cReq.getRemotingSessionID();
-
- ServerInvokerCallbackHandler callbackHandler = null;
- synchronized(callbackHandlers)
- {
- callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
- }
- if (callbackHandler != null)
- {
- log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId) + " UID=" + remotingSessionId);
-
- cReq.setCallbackHandler(callbackHandler);
- }
- else
- {
- throw new IllegalStateException("Cannot find callback handler " +
- "for session id " + remotingSessionId);
- }
- }
-
-
// Inner classes --------------------------------------------------------------------------------
}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -74,7 +74,6 @@
assert port > 0;
assert transport != null;
- System.err.println("### connect to " + host + ":" + port + " ###");
connector = new NioSocketConnector();
MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class AddCallbackMessageCodec extends
+ AbstractPacketCodec<UpdateCallbackMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public AddCallbackMessageCodec()
+ {
+ super(MSG_UPDATECALLBACK);
+ }
+
+ // AbstractPackedCodec overrides----------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, UpdateCallbackMessage message,
+ IoBuffer out) throws Exception
+ {
+ String remotingSessionID = message.getRemotingSessionID();
+ String clientVMID = message.getClientVMID();
+ boolean add = message.isAdd();
+
+ int bodyLength = sizeof(remotingSessionID) + sizeof(clientVMID) + 1;
+
+ out.putInt(bodyLength);
+ putString(out, remotingSessionID);
+ putString(out, clientVMID);
+ putBoolean(out, add);
+ }
+
+ @Override
+ protected UpdateCallbackMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+ String remotingSessionID = getString(in);
+ String clientVMID = getString(in);
+ boolean add = getBoolean(in);
+
+ return new UpdateCallbackMessage(remotingSessionID, clientVMID, add);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -25,6 +25,7 @@
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageRequest;
import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
@@ -111,6 +112,8 @@
addCodec(GetTopologyResponse.class, GetTopologyResponseCodec.class);
+ addCodec(UpdateCallbackMessage.class, AddCallbackMessageCodec.class);
+
addCodec(CreateSessionRequest.class, CreateSessionRequestCodec.class);
addCodec(CreateSessionResponse.class, CreateSessionResponseCodec.class);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -16,13 +16,16 @@
NULL ((byte) 1),
MSG_JMSEXCEPTION ((byte) 2),
TEXT ((byte) 3),
+
// Connection factory
REQ_CREATECONNECTION ((byte)10),
RESP_CREATECONNECTION ((byte)11),
REQ_GETCLIENTAOPSTACK ((byte)12),
RESP_GETCLIENTAOPSTACK ((byte)13),
REQ_GETTOPOLOGY ((byte)14),
- RESP_GETTOPOLOGY ((byte)15),
+ RESP_GETTOPOLOGY ((byte)15),
+ MSG_UPDATECALLBACK ((byte)16),
+
// Connection
REQ_IDBLOCK ((byte)20),
RESP_IDBLOCK ((byte)21),
@@ -37,6 +40,7 @@
REQ_GETCLIENTID ((byte)30),
RESP_GETCLIENTID ((byte)31),
MSG_SETCLIENTID ((byte)32),
+
// Session
REQ_CREATECONSUMER ((byte)40),
RESP_CREATECONSUMER ((byte)41),
@@ -56,9 +60,11 @@
MSG_CANCELDELIVERY ((byte)55),
MSG_CANCELDELIVERIES ((byte)56),
MSG_UNSUBSCRIBE ((byte)57),
+
// Consumer
MSG_CHANGERATE ((byte)70),
- // Browse
+
+ // Browser
MSG_BROWSER_RESET ((byte)80),
REQ_BROWSER_HASNEXTMESSAGE ((byte)81),
RESP_BROWSER_HASNEXTMESSAGE ((byte)82),
@@ -66,12 +72,13 @@
RESP_BROWSER_NEXTMESSAGEBLOCK ((byte)84),
REQ_BROWSER_NEXTMESSAGE ((byte)85),
RESP_BROWSER_NEXTMESSAGE ((byte)86),
+
// Misc
REQ_CLOSING ((byte)90),
RESP_CLOSING ((byte)91),
MSG_CLOSE ((byte)92);
- public final byte type;
+ private final byte type;
PacketType(byte type)
{
Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class UpdateCallbackMessage extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String remotingSessionID;
+
+ private final String clientVMID;
+
+ private final boolean add;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public UpdateCallbackMessage(String remotingSessionID, String clientVMID,
+ boolean add)
+ {
+ super(MSG_UPDATECALLBACK);
+
+ assertValidID(remotingSessionID);
+ assertValidID(clientVMID);
+
+ this.remotingSessionID = remotingSessionID;
+ this.clientVMID = clientVMID;
+ this.add = add;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getRemotingSessionID()
+ {
+ return remotingSessionID;
+ }
+
+ public String getClientVMID()
+ {
+ return clientVMID;
+ }
+
+ /**
+ * @return the add
+ */
+ public boolean isAdd()
+ {
+ return add;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", remotingSessionID=" + remotingSessionID
+ + ", clientVMID=" + clientVMID + ", add=" + add + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -23,6 +23,7 @@
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STOPCONNECTION;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_HASNEXTMESSAGE;
@@ -137,6 +138,7 @@
import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
import org.jboss.messaging.core.remoting.wireformat.TextPacket;
import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
import org.jboss.messaging.util.Version;
/**
@@ -431,6 +433,22 @@
assertSameTopology(response.getTopology(), decodedResponse.getTopology());
}
+ public void testUpdateCallbackMessage() throws Exception
+ {
+ UpdateCallbackMessage message = new UpdateCallbackMessage(randomString(), randomString(), true);
+ addVersion(message);
+
+ AbstractPacket decodedPacket = encodeAndDecode(message);
+
+ assertTrue(decodedPacket instanceof UpdateCallbackMessage);
+
+ UpdateCallbackMessage decodedMessage = (UpdateCallbackMessage) decodedPacket;
+ assertEquals(MSG_UPDATECALLBACK, decodedMessage.getType());
+ assertEquals(message.getRemotingSessionID(), decodedMessage.getRemotingSessionID());
+ assertEquals(message.getClientVMID(), decodedMessage.getClientVMID());
+ assertEquals(message.isAdd(), decodedMessage.isAdd());
+ }
+
public void testCreateSessionRequest() throws Exception
{
CreateSessionRequest request = new CreateSessionRequest(true, 0, false);
Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-11-29 13:27:33 UTC (rev 3385)
@@ -1,200 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
-import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
-import org.jboss.jms.wireformat.JMSWireFormat;
-import org.jboss.jms.wireformat.NullResponse;
-import org.jboss.jms.wireformat.PacketSupport;
-import org.jboss.jms.wireformat.RequestSupport;
-import org.jboss.jms.wireformat.ResponseSupport;
-import org.jboss.remoting.InvocationRequest;
-
-/**
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class WireFormatTest extends JMSTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private TestWireFormat wf;
-
- // Constructors --------------------------------------------------
-
- public WireFormatTest(String name)
- {
- super(name);
- }
-
- // TestCase overrides -------------------------------------------
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- wf = new TestWireFormat();
- }
-
- public void testSerialized() throws Exception
- {
- wf.testSerialized();
- }
-
- // Responses
-
- // Connection Factory
-
- public void testConnectionFactoryAddCabllack() throws Exception
- {
- wf.testConnectionFactoryAddCabllack();
- }
-
- public void testConnectionFactoryRemoveCabllack() throws Exception
- {
- wf.testConnectionFactoryRemoveCabllack();
- }
-
-
- // Browser
-
- public void testNullResponse() throws Exception
- {
- wf.testNullResponse();
- }
-
-
- //We just check the first byte to make sure serialization is not be used.
-
- private class TestWireFormat extends JMSWireFormat
- {
- private static final long serialVersionUID = -832216627671902129L;
-
- private void testPacket(PacketSupport req, int id) throws Exception
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- OutputStream oos = new DataOutputStream(bos);
-
- InvocationRequest ir = new InvocationRequest("session123", null, req, null, null, null);
-
- wf.write(ir, oos);
-
- byte[] bytes = bos.toByteArray();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-
- DataInputStream dis = new DataInputStream(bis);
-
- int theId = dis.readInt();
-
- assertEquals(id, theId);
- }
-
- public void testSerialized() throws Exception
- {
- Serializable obj = new SerializableObject("uyuiyiu", 234234);
-
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
-
- OutputStream oos = new DataOutputStream(bos);
-
- wf.write(obj, oos);
-
- byte[] bytes = bos.toByteArray();
-
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-
- DataInputStream dis = new DataInputStream(bis);
-
- int theId = dis.readInt();
-
- assertEquals(PacketSupport.SERIALIZED, theId);
- }
-
- // Requests
-
- // Connection Factory
-
- public void testConnectionFactoryAddCabllack() throws Exception
- {
- RequestSupport req =
- new ConnectionFactoryAddCallbackRequest("12", "23", "24",(byte)0);
-
- testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_ADDCALLBACK);
- }
-
- public void testConnectionFactoryRemoveCabllack() throws Exception
- {
- RequestSupport req =
- new ConnectionFactoryRemoveCallbackRequest("12", "23", "24",(byte)0);
-
- testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_REMOVECALLBACK);
- }
-
- // Responses
-
- // Connection
-
- public void testNullResponse() throws Exception
- {
- ResponseSupport resp = new NullResponse();
-
- testPacket(resp, PacketSupport.NULL_RESPONSE);
- }
-
- }
-
- public static class SerializableObject implements Serializable
- {
- /** The serialVersionUID */
- private static final long serialVersionUID = 1L;
-
- public SerializableObject()
- {
- }
-
- SerializableObject(String s, long l)
- {
- this.s = s;
- this.l = l;
- }
-
- public String s;
-
- public long l;
- }
-}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list