[jboss-cvs] JBoss Messaging SVN: r3385 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/client/delegate and 13 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 29 08:27:34 EST 2007


Author: jmesnil
Date: 2007-11-29 08:27:33 -0500 (Thu, 29 Nov 2007)
New Revision: 3385

Added:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java
Removed:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/wireformat/
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Modified:
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
   branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
* added packets and codec to add/remove connection factory callbacks
* deleted JBoss Remoting wireformat in org.jboss.jms.wireformat

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -21,6 +21,8 @@
  */
 package org.jboss.jms.client.delegate;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.RESP_GETTOPOLOGY;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
@@ -30,18 +32,21 @@
 
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
 import org.jboss.jms.client.plugin.LoadBalancingPolicy;
-import org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.CreateConnectionResult;
 import org.jboss.jms.delegate.IDBlock;
 import org.jboss.jms.delegate.TopologyResult;
 import org.jboss.jms.exception.MessagingNetworkFailureException;
-import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
-import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.GetTopologyRequest;
 import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
+import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
 import org.jboss.messaging.util.Version;
 import org.jboss.messaging.util.WeakHashSet;
 
@@ -122,18 +127,43 @@
       }
    }
 
-   private void addCallback(ClientConnectionFactoryDelegate delegate) throws Throwable
+   private void addCallback(final ClientConnectionFactoryDelegate delegate) throws Throwable
    {
-      remoting.getCallbackManager().setConnectionfactoryCallbackHandler(new ConnectionFactoryCallbackHandler(this, remoting));
+      PacketDispatcher.client.register(new PacketHandler() {
 
-      ConnectionFactoryAddCallbackRequest request =
-         new ConnectionFactoryAddCallbackRequest (JMSClientVMIdentifier.instance,
-               remoting.getRemotingClient().getSessionId(),
-               delegate.getID(),
-               Version.instance().getProviderIncrementingVersion());
+         public String getID()
+         {
+            return delegate.getID();
+         }
 
-      remoting.getRemotingClient().invoke(request, null);
+         public void handle(AbstractPacket packet, PacketSender sender)
+         {
+            PacketType type = packet.getType();
+            if (type == RESP_GETTOPOLOGY)
+            {
+               GetTopologyResponse response = (GetTopologyResponse) packet;
+               TopologyResult topology = response.getTopology();
+               updateFailoverInfo(topology.getDelegates(), topology.getFailoverMap());               
+            } else 
+            {
+               System.err.println("ClientClusteredConnectionFactoryDelegate.handle() unhandled packet: " + packet);
+            }
+         }        
+      });
+      
+      UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getNewRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, true);
+      sendOneWay(remoting.getNewRemotingClient(), delegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
 
+      // remoting.getCallbackManager().setConnectionfactoryCallbackHandler(new ConnectionFactoryCallbackHandler(this, remoting));
+
+//      ConnectionFactoryAddCallbackRequest request =
+//         new ConnectionFactoryAddCallbackRequest (JMSClientVMIdentifier.instance,
+//               remoting.getRemotingClient().getSessionId(),
+//               delegate.getID(),
+//               Version.instance().getProviderIncrementingVersion());
+//
+//      remoting.getRemotingClient().invoke(request, null);
+
    }
 
    private void addShutdownHook()
@@ -143,13 +173,18 @@
 
    private void removeCallback() throws Throwable
    {
-      ConnectionFactoryRemoveCallbackRequest request =
-         new ConnectionFactoryRemoveCallbackRequest (JMSClientVMIdentifier.instance,
-               remoting.getRemotingClient().getSessionId(),
-               currentDelegate.getID(),
-               Version.instance().getProviderIncrementingVersion());
+      PacketDispatcher.client.unregister(currentDelegate.getID());
+      
+      UpdateCallbackMessage message = new UpdateCallbackMessage(remoting.getNewRemotingClient().getSessionID(), JMSClientVMIdentifier.instance, false);
+      sendOneWay(remoting.getNewRemotingClient(), currentDelegate.getID(), Version.instance().getProviderIncrementingVersion(), message);
 
-      remoting.getRemotingClient().invoke(request, null);
+//      ConnectionFactoryRemoveCallbackRequest request =
+//         new ConnectionFactoryRemoveCallbackRequest (JMSClientVMIdentifier.instance,
+//               remoting.getRemotingClient().getSessionId(),
+//               currentDelegate.getID(),
+//               Version.instance().getProviderIncrementingVersion());
+//
+//      remoting.getRemotingClient().invoke(request, null);
    }
 
    protected void finalize() throws Throwable

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -336,7 +336,6 @@
       
       try
       {
-         System.err.println("### " + serverLocatorURI);
          InvokerLocator locator = new InvokerLocator(serverLocatorURI);
          client = new org.jboss.messaging.core.remoting.Client();
          client.connect(locator.getHost(), locator.getPort() + 1000, TCP);

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -35,6 +35,7 @@
 import org.jboss.jms.exception.MessagingJMSException;
 import org.jboss.jms.exception.MessagingNetworkFailureException;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.remoting.Client;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
 import org.jboss.messaging.util.Streamable;
@@ -155,15 +156,21 @@
 
    // Protected ------------------------------------------------------------------------------------     
    
-   protected void sendOneWay(AbstractPacket packet) throws JMSException
+   protected void sendOneWay(AbstractPacket packet) 
    {
-      assert newClient != null;
+      sendOneWay(newClient, id, version, packet);
+   }
+   
+   protected static void sendOneWay(Client client, String targetID, byte version, AbstractPacket packet)
+   {
+      assert client != null;
+      assertValidID(targetID);
       assert packet != null;
       
       packet.setVersion(version);
-      packet.setTargetID(id);
+      packet.setTargetID(targetID);
       
-      newClient.sendOneWay(packet);
+      client.sendOneWay(packet);      
    }
    
    protected AbstractPacket sendBlocking(AbstractPacket request) throws JMSException
@@ -171,7 +178,7 @@
       return sendBlocking(newClient, id, version, request);
    }
    
-   protected static AbstractPacket sendBlocking(org.jboss.messaging.core.remoting.Client client, String targetID, byte version, AbstractPacket request) throws JMSException
+   protected static AbstractPacket sendBlocking(Client client, String targetID, byte version, AbstractPacket request) throws JMSException
    {
       assert client != null;
       assertValidID(targetID);

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -24,13 +24,7 @@
 import java.util.Map;
 
 import org.jboss.jms.client.container.ClientConsumer;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.wireformat.ClientDelivery;
-import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.contract.Message;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.HandleCallbackException;
 import org.jboss.remoting.callback.InvokerCallbackHandler;
@@ -78,57 +72,7 @@
 
    public void handleCallback(Callback callback) throws HandleCallbackException
    {
-      Object parameter = callback.getParameter();
-
-      if (parameter instanceof ClientDelivery)
-      {
-         ClientDelivery dr = (ClientDelivery)parameter;
-          
-         Message msg = dr.getMessage();
-         
-         MessageProxy proxy = JBossMessage.
-            createThinDelegate(dr.getDeliveryId(), (JBossMessage)msg, dr.getDeliveryCount());
-
-         ClientConsumer handler =
-            (ClientConsumer)callbackHandlers.get(dr.getConsumerId());
-
-         if (handler == null)
-         {
-            // This should never happen since we wait for all deliveries to arrive before closing
-            // the consumer
-
-            throw new IllegalStateException(this + " callback handler not found, message arrived after consumer is closed.");
-         }
-
-         try
-         {
-            handler.handleMessage(proxy);
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to handle message", e);
-            throw new HandleCallbackException(e.getMessage(), e);
-         }
-      }
-      else if (parameter instanceof ConnectionFactoryUpdate)
-      {
-         if (connectionfactoryCallbackHandler == null)
-         {
-            log.warn("ConnectionFactoryUpdate was received but there is no callbackHandler set");
-         }
-         else
-         {
-            ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)parameter;
-
-            if (trace) { log.trace(this + " receiving cluster view change " + viewChange); }
-
-            connectionfactoryCallbackHandler.handleMessage(viewChange);
-         }
-      }
-      else
-      {
-         throw new HandleCallbackException("Unknow callback type: " + callback);
-      }
+      throw new IllegalStateException("JBoss Remoting must no longer be used");
    }
 
    // Public ---------------------------------------------------------------------------------------

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -22,12 +22,12 @@
 
 package org.jboss.jms.client.remoting;
 
+import java.lang.ref.WeakReference;
+
 import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
 import org.jboss.logging.Logger;
-import org.jboss.remoting.ConnectionListener;
 import org.jboss.remoting.Client;
-import java.lang.ref.WeakReference;
+import org.jboss.remoting.ConnectionListener;
 
 /**
  * This class will manage ConnectionFactory messages updates
@@ -66,17 +66,7 @@
 
    public void handleMessage(Object message)
    {
-      if (trace) { log.trace(this + " handling " + message); }
-
-      ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)message;
-
-      ClientClusteredConnectionFactoryDelegate delegate = delegateRef.get();
-
-      if (delegate!=null)
-      {
-         delegate.updateFailoverInfo(viewChange.getTopology().getDelegates(),
-                                           viewChange.getTopology().getFailoverMap());
-      }
+      throw new IllegalStateException("JBoss Remoting must no longer be used");
    }
 
 

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/delegate/TopologyResult.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -22,15 +22,15 @@
 
 package org.jboss.jms.delegate;
 
-import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
-import org.jboss.jms.wireformat.PacketSupport;
-import org.jboss.messaging.util.Streamable;
-import java.util.Map;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.io.DataOutputStream;
-import java.io.DataInputStream;
+import java.util.Map;
 
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.messaging.util.Streamable;
+
 /**
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  * @version <tt>$Revision$</tt>

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/ConnectionManager.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -25,6 +25,7 @@
 
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.messaging.core.contract.MessagingComponent;
+import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 
@@ -62,10 +63,17 @@
 
    void addConnectionFactoryCallback(String uniqueName, String JVMID, String remotingSessionID, ServerInvokerCallbackHandler handler);
 
+   void addConnectionFactoryCallback(String uniqueName, String VMID,
+         String remotingSessionID, PacketSender sender);
+
    void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
-   
+
+   void removeConnectionFactoryCallback(String uniqueName, String vmid, PacketSender sender);
+
    ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName);
 
+   PacketSender[] getConnectionFactorySenders(String uniqueName);
+
    /**
     * @param clientToServer - true if the failure has been detected on a direct connection from
     *        client to this server, false if the failure has been detected while trying to send a

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -46,7 +46,6 @@
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
 import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
-import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.ClusterNotification;
 import org.jboss.messaging.core.contract.ClusterNotificationListener;
@@ -228,8 +227,6 @@
 
       // Registering with the dispatcher should always be the last thing otherwise a client could
       // use a partially initialised object
-      Dispatcher.instance.registerTarget(id, advised);
-
       PacketDispatcher.server.register(advised.new ConnectionFactoryAdvisedPacketHandler(id));
       
       // Replicate the change - we will ignore this locally
@@ -280,8 +277,6 @@
          }
       }
 
-      Dispatcher.instance.unregisterTarget(endpoint.getID(), endpoint);
-      
       PacketDispatcher.server.unregister(endpoint.getID());
    }
 

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -39,6 +39,7 @@
 import org.jboss.messaging.core.contract.ClusterNotification;
 import org.jboss.messaging.core.contract.ClusterNotificationListener;
 import org.jboss.messaging.core.contract.Replicator;
+import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.Util;
 import org.jboss.remoting.Client;
@@ -207,18 +208,35 @@
       getCFInfo(uniqueName).addClient(JVMID, handler);
    }
 
+   public void addConnectionFactoryCallback(String uniqueName, String vmID,
+         String remotingSessionID, PacketSender sender)
+   {
+      remotingSessions.put(remotingSessionID, vmID);
+      getCFInfo(uniqueName).addClient(vmID, sender);      
+   }
    /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
    public synchronized void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
    {
       getCFInfo(uniqueName).removeHandler(JVMID, handler);
    }
 
+   public synchronized void removeConnectionFactoryCallback(String uniqueName, String vmid,
+         PacketSender sender)
+   {
+      getCFInfo(uniqueName).removeSender(vmid, sender);   
+   }
+   
    /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
    public synchronized ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName)
    {
       return getCFInfo(uniqueName).getAllHandlers();
    }
 
+   public synchronized PacketSender[] getConnectionFactorySenders(String uniqueName)
+   {
+      return getCFInfo(uniqueName).getAllSenders();
+   }
+
    // ClusterNotificationListener implementation ---------------------------------------------------
 
 
@@ -416,6 +434,8 @@
       String uniqueName;
       Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<ServerInvokerCallbackHandler>> clientHandlersByVM;
       ConcurrentHashSet<ServerInvokerCallbackHandler> clientHandlers;
+      Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<PacketSender>> clientSendersByVM;
+      ConcurrentHashSet<PacketSender> clientSenders;
 
 
       public ConnectionFactoryCallbackInformation(String uniqueName)
@@ -423,14 +443,22 @@
          this.uniqueName = uniqueName;
          this.clientHandlersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<ServerInvokerCallbackHandler>>();
          this.clientHandlers = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
+         this.clientSendersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<PacketSender>>();
+         this.clientSenders = new ConcurrentHashSet<PacketSender>();
       }
 
+
       public void addClient(String vmID, ServerInvokerCallbackHandler handler)
       {
          clientHandlers.add(handler);
          getHandlersList(vmID).add(handler);
       }
 
+      public void addClient(String vmID, PacketSender sender)
+      {
+         clientSenders.add(sender);
+         getSendersList(vmID).add(sender);
+      }
       public ServerInvokerCallbackHandler[] getAllHandlers(String vmID)
       {
          Set<ServerInvokerCallbackHandler> list = getHandlersList(vmID);
@@ -438,17 +466,34 @@
          return (ServerInvokerCallbackHandler[])list.toArray(array);
       }
 
+      public PacketSender[] getAllSenders(String vmID)
+      {
+         Set<PacketSender> list = getSendersList(vmID);
+         return (PacketSender[]) list.toArray(new PacketSender[list.size()]);
+      }
+
       public ServerInvokerCallbackHandler[] getAllHandlers()
       {
          ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[clientHandlers.size()];
          return (ServerInvokerCallbackHandler[])clientHandlers.toArray(array);
       }
+      
+      public PacketSender[] getAllSenders()
+      {
+         return (PacketSender[]) clientSenders.toArray(new PacketSender[clientSenders.size()]);
+      }
 
       public void removeHandler(String vmID, ServerInvokerCallbackHandler handler)
       {
          clientHandlers.remove(handler);
          getHandlersList(vmID).remove(handler);
       }
+      
+      public void removeSender(String vmID, PacketSender sender)
+      {
+         clientSenders.remove(sender);
+         getSendersList(vmID).remove(sender);
+      }
 
       private ConcurrentHashSet<ServerInvokerCallbackHandler> getHandlersList(String vmID)
       {
@@ -461,6 +506,18 @@
          }
          return perVMList;
       }
+      
+      private ConcurrentHashSet<PacketSender> getSendersList(String vmID)
+      {
+         ConcurrentHashSet<PacketSender> perVMList = clientSendersByVM.get(vmID);
+         if (perVMList == null)
+         {
+            perVMList = new ConcurrentHashSet<PacketSender>();
+            clientSendersByVM.put(vmID, perVMList);
+            perVMList = clientSendersByVM.get(vmID);
+         }
+         return perVMList;
+      }
 
    }
    

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -38,7 +38,6 @@
 import org.jboss.jms.exception.MessagingJMSException;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Channel;
 import org.jboss.messaging.core.contract.Filter;
@@ -249,8 +248,6 @@
       
       iterator = null;
       
-      Dispatcher.instance.unregisterTarget(id, this);
-      
       PacketDispatcher.server.unregister(id);
       
       closed = true;

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -52,7 +52,6 @@
 import org.jboss.jms.tx.MessagingXid;
 import org.jboss.jms.tx.TransactionRequest;
 import org.jboss.jms.tx.ClientTransaction.SessionTxState;
-import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.jms.wireformat.JMSWireFormat;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
@@ -266,8 +265,6 @@
          
          serverPeer.addSession(sessionID, ep);
 
-         Dispatcher.instance.registerTarget(sessionID, sessionAdvised);
-
          PacketDispatcher.server.register(advised.new SessionAdvisedPacketHandler(sessionID));
          
          log.trace("created and registered " + ep);
@@ -424,8 +421,6 @@
    
          cm.unregisterConnection(jmsClientVMID, remotingClientSessionID);
    
-         Dispatcher.instance.unregisterTarget(id, this);
-         
          PacketDispatcher.server.unregister(id);
 
          closed = true;

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -34,12 +34,12 @@
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.connectionfactory.JNDIBindings;
 import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
-import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
-import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
 import org.jboss.messaging.util.ExceptionUtil;
-import org.jboss.remoting.callback.Callback;
+import org.jboss.messaging.util.Version;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 /**
@@ -260,8 +260,6 @@
          connAdvised = new ConnectionAdvised(endpoint);
       }
       
-      Dispatcher.instance.registerTarget(connectionID, connAdvised);
-      
       PacketDispatcher.server.register(connAdvised.new ConnectionAdvisedPacketHandler(connectionID));
 
       log.trace("created and registered " + endpoint);
@@ -292,6 +290,13 @@
       log.debug("Adding callbackHandler on ConnectionFactory");
       serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, remotingSessionID, callbackHandler);
    }
+   
+   public void addSender(String VMID, String remotingSessionID,
+         PacketSender sender) throws JMSException
+   {
+      log.debug("Adding PacketSender on ConnectionFactory");
+      serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, remotingSessionID, sender);
+   }
 
    public void removeCallback(String VMID, String remotingSessionID,
                            ServerInvokerCallbackHandler callbackHandler) throws JMSException
@@ -299,6 +304,13 @@
       log.debug("Removing callbackHandler on ConnectionFactory");
       serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
    }
+   
+   public void removeSender(String VMID, String remotingSessionID,
+         PacketSender sender) throws JMSException
+   {
+      log.debug("Removing PacketSender on ConnectionFactory");
+      serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, sender);
+   }
 
    public TopologyResult getTopology() throws JMSException
    {
@@ -336,16 +348,26 @@
       ServerInvokerCallbackHandler[] clientFactoriesToUpdate = serverPeer.getConnectionManager().getConnectionFactoryCallback(this.uniqueName);
       log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + clientFactoriesToUpdate.length);
 
-      ConnectionFactoryUpdate message =
-         new ConnectionFactoryUpdate(uniqueName, delegates, failoverMap);
-
-      Callback callback = new Callback(message);
-
-      for (ServerInvokerCallbackHandler o: clientFactoriesToUpdate)
+      PacketSender[] senders = serverPeer.getConnectionManager().getConnectionFactorySenders(uniqueName);
+      GetTopologyResponse packet = new GetTopologyResponse(getTopology());
+      packet.setVersion(Version.instance().getProviderIncrementingVersion());
+      packet.setTargetID(id);
+      
+      for (PacketSender sender : senders)
       {
-         log.debug("Updating CF on callback " + o);
-         o.handleCallbackOneway(callback);
+         sender.send(packet);
       }
+      
+//      ConnectionFactoryUpdate message =
+//         new ConnectionFactoryUpdate(uniqueName, delegates, failoverMap);
+//
+//      Callback callback = new Callback(message);
+//
+//      for (ServerInvokerCallbackHandler o: clientFactoriesToUpdate)
+//      {
+//         log.debug("Updating CF on callback " + o);
+//         o.handleCallbackOneway(callback);
+//      }
    }
 
    public void updateTopology(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -38,7 +38,6 @@
 import org.jboss.jms.server.destination.TopicService;
 import org.jboss.jms.server.messagecounter.MessageCounter;
 import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.DeliveryObserver;
@@ -50,6 +49,7 @@
 import org.jboss.messaging.core.contract.Replicator;
 import org.jboss.messaging.core.impl.SimpleDelivery;
 import org.jboss.messaging.core.impl.tx.Transaction;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.PacketHandler;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
@@ -528,8 +528,8 @@
       	messageQueue.getLocalDistributor().remove(this);
       }
 
-      Dispatcher.instance.unregisterTarget(id, this);
-
+      PacketDispatcher.server.unregister(id);
+      
       // If this is a consumer of a non durable subscription then we want to unbind the
       // subscription and delete all its data.
 

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -62,7 +62,6 @@
 import org.jboss.jms.server.endpoint.advised.ConsumerAdvised;
 import org.jboss.jms.server.messagecounter.MessageCounter;
 import org.jboss.jms.server.selector.Selector;
-import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.Channel;
@@ -1239,8 +1238,8 @@
       
       sp.removeSession(id);
             
-      Dispatcher.instance.unregisterTarget(id, this);
-      
+      PacketDispatcher.server.unregister(id);
+
       closed = true;
    }            
    
@@ -1836,8 +1835,6 @@
          advised = new ConsumerAdvised(ep);
       }
       
-      Dispatcher.instance.registerTarget(consumerID, advised);
-
       PacketDispatcher.server.register(ep.new ServerConsumerEndpointPacketHandler());
       
       ClientConsumerDelegate stub =
@@ -2154,8 +2151,6 @@
          advised = new ConsumerAdvised(ep);
       }
       
-      Dispatcher.instance.registerTarget(consumerID, advised);
-      
       PacketDispatcher.server.register(ep.new ServerConsumerEndpointPacketHandler());
 
       ClientConsumerDelegate stub =
@@ -2223,8 +2218,6 @@
          advised = new BrowserAdvised(ep);
       }
       
-      Dispatcher.instance.registerTarget(browserID, advised);
-
       PacketDispatcher.server.register(ep.new ServerBrowserEndpointHandler());
       
       ClientBrowserDelegate stub = new ClientBrowserDelegate(browserID);

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -21,6 +21,7 @@
  */
 package org.jboss.jms.server.endpoint.advised;
 
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_CREATECONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETCLIENTAOPSTACK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_GETTOPOLOGY;
@@ -43,6 +44,7 @@
 import org.jboss.messaging.core.remoting.wireformat.GetTopologyResponse;
 import org.jboss.messaging.core.remoting.wireformat.JMSExceptionMessage;
 import org.jboss.messaging.core.remoting.wireformat.PacketType;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 /**
@@ -98,6 +100,13 @@
       ((ServerConnectionFactoryEndpoint)endpoint).addCallback(vmID,  remotingSessionID,
                             callbackHandler);
    }
+   
+   public void addSender(String vmID, String remotingSessionID,
+         PacketSender sender) throws JMSException
+   {
+      ((ServerConnectionFactoryEndpoint)endpoint).addSender(vmID,  remotingSessionID,
+            sender);
+   }
 
    public void removeCallback(String vmID, String remotingSessionID,
                            ServerInvokerCallbackHandler callbackHandler) throws JMSException
@@ -105,6 +114,13 @@
       ((ServerConnectionFactoryEndpoint)endpoint).removeCallback(vmID,  remotingSessionID,
                             callbackHandler);
    }
+   
+   public void removeSender(String vmID, String remotingSessionID,
+         PacketSender sender) throws JMSException
+   {
+      ((ServerConnectionFactoryEndpoint)endpoint).removeSender(vmID,  remotingSessionID,
+          sender);
+   }
 
    public TopologyResult getTopology() throws JMSException
    {
@@ -194,6 +210,15 @@
                TopologyResult topology = getTopology();
 
                response = new GetTopologyResponse(topology);
+            } else if (type == MSG_UPDATECALLBACK)
+            {
+               UpdateCallbackMessage message = (UpdateCallbackMessage) packet;
+               if (message.isAdd())
+               {
+                  addSender(message.getClientVMID(), message.getRemotingSessionID(), sender);               
+               } else {
+                  removeSender(message.getClientVMID(), message.getRemotingSessionID(), sender);
+               }
             } else
             {
                response = new JMSExceptionMessage(new MessagingJMSException(

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -28,11 +28,7 @@
 
 import javax.management.MBeanServer;
 
-import org.jboss.jms.exception.MessagingShutdownException;
-import org.jboss.jms.wireformat.RequestSupport;
-import org.jboss.jms.wireformat.CallbackRequestSupport;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.util.Util;
 import org.jboss.remoting.InvocationRequest;
 import org.jboss.remoting.ServerInvocationHandler;
 import org.jboss.remoting.ServerInvoker;
@@ -123,30 +119,7 @@
 
    public Object invoke(InvocationRequest invocation) throws Throwable
    {      
-      if (trace) { log.trace("invoking " + invocation); }
-      
-      invokeLock.readLock().acquire();
-      try
-      {	              
-         if (closed)
-         {
-            throw new MessagingShutdownException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
-         }
-           
-         RequestSupport request = (RequestSupport)invocation.getParameter();
-         
-         if (request instanceof CallbackRequestSupport)
-         {
-            performCallbackRequest(request);
-         }
-      
-         return request.serverInvoke();
-      }
-      finally
-      {
-      	invokeLock.readLock().release();
-      }
-      
+      throw new IllegalStateException("JBoss Remoting must no longer be used");
    }
 
    public void addListener(InvokerCallbackHandler callbackHandler)
@@ -217,31 +190,5 @@
    
    // Private --------------------------------------------------------------------------------------
 
-   private void performCallbackRequest(RequestSupport request)
-   {
-      CallbackRequestSupport cReq =
-         (CallbackRequestSupport)request;
-
-      String remotingSessionId = cReq.getRemotingSessionID();
-
-      ServerInvokerCallbackHandler callbackHandler = null;
-      synchronized(callbackHandlers)
-      {
-               callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
-      }
-      if (callbackHandler != null)
-      {
-         log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId) + " UID=" + remotingSessionId);
-
-         cReq.setCallbackHandler(callbackHandler);
-      }
-      else
-      {
-         throw new IllegalStateException("Cannot find callback handler " +
-                                         "for session id " + remotingSessionId);
-      }
-   }
-
-
    // Inner classes --------------------------------------------------------------------------------
 }

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -74,7 +74,6 @@
       assert port > 0;
       assert transport != null;
 
-      System.err.println("### connect to " + host + ":" + port + " ###");
       connector = new NioSocketConnector();
 
       MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/AddCallbackMessageCodec.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class AddCallbackMessageCodec extends
+      AbstractPacketCodec<UpdateCallbackMessage>
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public AddCallbackMessageCodec()
+   {
+      super(MSG_UPDATECALLBACK);
+   }
+
+   // AbstractPackedCodec overrides----------------------------------
+
+   @Override
+   protected void encodeBody(IoSession session, UpdateCallbackMessage message,
+         IoBuffer out) throws Exception
+   {
+      String remotingSessionID = message.getRemotingSessionID();
+      String clientVMID = message.getClientVMID();
+      boolean add = message.isAdd();
+
+      int bodyLength = sizeof(remotingSessionID) + sizeof(clientVMID) + 1;
+
+      out.putInt(bodyLength);
+      putString(out, remotingSessionID);
+      putString(out, clientVMID);
+      putBoolean(out, add);
+   }
+
+   @Override
+   protected UpdateCallbackMessage decodeBody(IoSession session, IoBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (in.remaining() < bodyLength)
+      {
+         return null;
+      }
+      String remotingSessionID = getString(in);
+      String clientVMID = getString(in);
+      boolean add = getBoolean(in);
+
+      return new UpdateCallbackMessage(remotingSessionID, clientVMID, add);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -25,6 +25,7 @@
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryRequest;
 import org.jboss.messaging.core.remoting.wireformat.AcknowledgeDeliveryResponse;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
 import org.jboss.messaging.core.remoting.wireformat.AddTemporaryDestinationMessage;
 import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageRequest;
 import org.jboss.messaging.core.remoting.wireformat.BrowserHasNextMessageResponse;
@@ -111,6 +112,8 @@
 
       addCodec(GetTopologyResponse.class, GetTopologyResponseCodec.class);
 
+      addCodec(UpdateCallbackMessage.class, AddCallbackMessageCodec.class);
+
       addCodec(CreateSessionRequest.class, CreateSessionRequestCodec.class);
 
       addCodec(CreateSessionResponse.class, CreateSessionResponseCodec.class);

Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -16,13 +16,16 @@
    NULL                           ((byte) 1),
    MSG_JMSEXCEPTION               ((byte) 2),
    TEXT                           ((byte) 3),
+
    // Connection factory
    REQ_CREATECONNECTION           ((byte)10),
    RESP_CREATECONNECTION          ((byte)11),
    REQ_GETCLIENTAOPSTACK          ((byte)12),
    RESP_GETCLIENTAOPSTACK         ((byte)13),
    REQ_GETTOPOLOGY                ((byte)14),
-   RESP_GETTOPOLOGY               ((byte)15),   
+   RESP_GETTOPOLOGY               ((byte)15), 
+   MSG_UPDATECALLBACK             ((byte)16),
+   
    // Connection
    REQ_IDBLOCK                    ((byte)20),
    RESP_IDBLOCK                   ((byte)21),
@@ -37,6 +40,7 @@
    REQ_GETCLIENTID                ((byte)30),
    RESP_GETCLIENTID               ((byte)31),
    MSG_SETCLIENTID                ((byte)32),
+   
    // Session
    REQ_CREATECONSUMER             ((byte)40),
    RESP_CREATECONSUMER            ((byte)41),  
@@ -56,9 +60,11 @@
    MSG_CANCELDELIVERY             ((byte)55),
    MSG_CANCELDELIVERIES           ((byte)56),
    MSG_UNSUBSCRIBE                ((byte)57),
+   
    // Consumer 
    MSG_CHANGERATE                 ((byte)70),
-   // Browse
+   
+   // Browser
    MSG_BROWSER_RESET              ((byte)80),
    REQ_BROWSER_HASNEXTMESSAGE     ((byte)81),
    RESP_BROWSER_HASNEXTMESSAGE    ((byte)82),
@@ -66,12 +72,13 @@
    RESP_BROWSER_NEXTMESSAGEBLOCK  ((byte)84),
    REQ_BROWSER_NEXTMESSAGE        ((byte)85),
    RESP_BROWSER_NEXTMESSAGE       ((byte)86),
+
    // Misc
    REQ_CLOSING                    ((byte)90),
    RESP_CLOSING                   ((byte)91),
    MSG_CLOSE                      ((byte)92);
    
-   public final byte type;
+   private final byte type;
 
    PacketType(byte type)
    {

Added: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java	                        (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/UpdateCallbackMessage.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.Assert.assertValidID;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class UpdateCallbackMessage extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final String remotingSessionID;
+
+   private final String clientVMID;
+
+   private final boolean add;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public UpdateCallbackMessage(String remotingSessionID, String clientVMID,
+         boolean add)
+   {
+      super(MSG_UPDATECALLBACK);
+
+      assertValidID(remotingSessionID);
+      assertValidID(clientVMID);
+
+      this.remotingSessionID = remotingSessionID;
+      this.clientVMID = clientVMID;
+      this.add = add;
+   }
+
+   // Public --------------------------------------------------------
+
+   public String getRemotingSessionID()
+   {
+      return remotingSessionID;
+   }
+
+   public String getClientVMID()
+   {
+      return clientVMID;
+   }
+
+   /**
+    * @return the add
+    */
+   public boolean isAdd()
+   {
+      return add;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", remotingSessionID=" + remotingSessionID
+            + ", clientVMID=" + clientVMID + ", add=" + add + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -23,6 +23,7 @@
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STARTCONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_STOPCONNECTION;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UNSUBSCRIBE;
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_UPDATECALLBACK;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.NULL;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_ACKDELIVERY;
 import static org.jboss.messaging.core.remoting.wireformat.PacketType.REQ_BROWSER_HASNEXTMESSAGE;
@@ -137,6 +138,7 @@
 import org.jboss.messaging.core.remoting.wireformat.StopConnectionMessage;
 import org.jboss.messaging.core.remoting.wireformat.TextPacket;
 import org.jboss.messaging.core.remoting.wireformat.UnsubscribeMessage;
+import org.jboss.messaging.core.remoting.wireformat.UpdateCallbackMessage;
 import org.jboss.messaging.util.Version;
 
 /**
@@ -431,6 +433,22 @@
       assertSameTopology(response.getTopology(), decodedResponse.getTopology());
    }
 
+   public void testUpdateCallbackMessage() throws Exception
+   {
+      UpdateCallbackMessage message = new UpdateCallbackMessage(randomString(), randomString(), true);
+      addVersion(message);
+
+      AbstractPacket decodedPacket = encodeAndDecode(message);
+
+      assertTrue(decodedPacket instanceof UpdateCallbackMessage);
+
+      UpdateCallbackMessage decodedMessage = (UpdateCallbackMessage) decodedPacket;
+      assertEquals(MSG_UPDATECALLBACK, decodedMessage.getType());
+      assertEquals(message.getRemotingSessionID(), decodedMessage.getRemotingSessionID());
+      assertEquals(message.getClientVMID(), decodedMessage.getClientVMID());
+      assertEquals(message.isAdd(), decodedMessage.isAdd());
+   }
+   
    public void testCreateSessionRequest() throws Exception
    {
       CreateSessionRequest request = new CreateSessionRequest(true, 0, false);

Deleted: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-11-28 19:21:43 UTC (rev 3384)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-11-29 13:27:33 UTC (rev 3385)
@@ -1,200 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.test.messaging.jms;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
-import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
-import org.jboss.jms.wireformat.JMSWireFormat;
-import org.jboss.jms.wireformat.NullResponse;
-import org.jboss.jms.wireformat.PacketSupport;
-import org.jboss.jms.wireformat.RequestSupport;
-import org.jboss.jms.wireformat.ResponseSupport;
-import org.jboss.remoting.InvocationRequest;
-
-/**
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class WireFormatTest extends JMSTestCase
-{
-   // Constants -----------------------------------------------------
-      
-   // Static --------------------------------------------------------
-      
-   // Attributes ----------------------------------------------------
-   
-   private TestWireFormat wf;
-   
-   // Constructors --------------------------------------------------
-
-   public WireFormatTest(String name)
-   {
-      super(name);
-   }
-
-   // TestCase overrides -------------------------------------------
-
-   public void setUp() throws Exception
-   {
-      super.setUp();
-      
-      wf = new TestWireFormat();            
-   }
-
-   public void testSerialized() throws Exception
-   {
-      wf.testSerialized();
-   }
-
-   // Responses
-   
-   // Connection Factory
-
-   public void testConnectionFactoryAddCabllack() throws Exception
-   {
-      wf.testConnectionFactoryAddCabllack();
-   }
-
-   public void testConnectionFactoryRemoveCabllack() throws Exception
-   {
-      wf.testConnectionFactoryRemoveCabllack();
-   }
-
-   
-   // Browser
-   
-   public void testNullResponse() throws Exception
-   {                 
-      wf.testNullResponse();
-   }
-
-
-   //We just check the first byte to make sure serialization is not be used.
-   
-   private class TestWireFormat extends JMSWireFormat
-   {      
-		private static final long serialVersionUID = -832216627671902129L;
-
-		private void testPacket(PacketSupport req, int id) throws Exception
-      {
-         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-         
-         OutputStream oos = new DataOutputStream(bos);
-         
-         InvocationRequest ir = new InvocationRequest("session123", null, req, null, null, null);   
-         
-         wf.write(ir, oos);
-                  
-         byte[] bytes = bos.toByteArray();
-         
-         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-                  
-         DataInputStream dis = new DataInputStream(bis); 
-                 
-         int theId = dis.readInt();
-         
-         assertEquals(id, theId);
-      }
-      
-      public void testSerialized() throws Exception
-      {
-         Serializable obj = new SerializableObject("uyuiyiu", 234234);
-         
-         ByteArrayOutputStream bos = new ByteArrayOutputStream();
-         
-         OutputStream oos = new DataOutputStream(bos);
-
-         wf.write(obj, oos);
-                  
-         byte[] bytes = bos.toByteArray();
-         
-         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-                  
-         DataInputStream dis = new DataInputStream(bis); 
-                 
-         int theId = dis.readInt();
-         
-         assertEquals(PacketSupport.SERIALIZED, theId);
-      }
-      
-      // Requests
-      
-      // Connection Factory
-      
-      public void testConnectionFactoryAddCabllack() throws Exception
-      {
-         RequestSupport req =
-            new ConnectionFactoryAddCallbackRequest("12", "23", "24",(byte)0);
-
-         testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_ADDCALLBACK);
-      }
-
-      public void testConnectionFactoryRemoveCabllack() throws Exception
-      {
-         RequestSupport req =
-            new ConnectionFactoryRemoveCallbackRequest("12", "23", "24",(byte)0);
-
-         testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_REMOVECALLBACK);
-      }
-      
-      // Responses
-            
-      // Connection
-      
-      public void testNullResponse() throws Exception
-      { 
-         ResponseSupport resp =  new NullResponse();
-                 
-         testPacket(resp, PacketSupport.NULL_RESPONSE);                           
-      }
-      
-   }
-   
-   public static class SerializableObject implements Serializable
-   {      
-      /** The serialVersionUID */
-      private static final long serialVersionUID = 1L;
-
-      public SerializableObject()
-      {         
-      }
-
-      SerializableObject(String s, long l)
-      {
-         this.s = s;
-         this.l = l;
-      }
-      
-      public String s;
-      
-      public long l;      
-   }
-}
\ No newline at end of file




More information about the jboss-cvs-commits mailing list