[hornetq-commits] JBoss hornetq SVN: r11387 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242: src/main/org/hornetq/api/core/client and 11 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 21 13:56:20 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-21 13:56:19 -0400 (Wed, 21 Sep 2011)
New Revision: 11387

Added:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
Modified:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
Log:
JBPAPP-7242 - Back porting JBPAPP-7230 & JBPAPP-7229 - ClientID fixes

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/HornetQException.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -135,6 +135,12 @@
    public static final int DUPLICATE_ID_REJECTED = 113;
 
    
+   /**
+    * A Session Metadata was set in duplication 
+    */
+   public static final int DUPLICATE_METADATA = 114;
+
+   
    // Native Error codes ----------------------------------------------
 
    /**

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/api/core/client/ClientSession.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -569,6 +569,15 @@
    void addMetaData(String key, String data) throws HornetQException;
 
    /**
+    * Attach any metadata to the session. Throws an exception if there's already a metadata available.
+    * You can use this metadata to ensure that there is no other session with the same meta-data you are passing as an argument.
+    * This is useful to simulate unique client-ids, where you may want to avoid multiple instances of your client application connected.
+    * 
+    * @throws HornetQException 
+    */
+   void addUniqueMetaData(String key, String data) throws HornetQException;
+
+   /**
     * Attach any metadata to the session.
     * Sends a Metadata using the older version
     * @deprecated Use {@link ClientSession#addMetaData(String, String)}

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -64,6 +64,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -1101,6 +1102,11 @@
       metadata.put(key, data);
       channel.sendBlocking(new SessionAddMetaDataMessageV2(key, data));
    }
+   
+   public void addUniqueMetaData(String key, String data) throws HornetQException
+   {
+      channel.sendBlocking(new SessionUniqueAddMetaDataMessage(key, data));
+   }
 
    public ClientSessionFactoryInternal getSessionFactory()
    {

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -578,4 +578,12 @@
    {
       return session.isCompressLargeMessages();
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+    */
+   public void addUniqueMetaData(String key, String data) throws HornetQException
+   {
+      session.addUniqueMetaData(key, data);
+   }
 }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -79,6 +79,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -490,6 +491,23 @@
                   session.addMetaData(message.getKey(), message.getData());
                   break;
                }
+               case PacketImpl.SESS_UNIQUE_ADD_METADATA:
+               {
+                  SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage)packet;
+                  if (session.addUniqueMetaData(message.getKey(), message.getData()))
+                  {
+                     response = new NullResponseMessage();
+                  }
+                  else
+                  {
+                     response = new HornetQExceptionMessage(new HornetQException(HornetQException.DUPLICATE_METADATA,
+                                                     "Metadata " + message.getKey() +
+                                                              "=" +
+                                                              message.getData() +
+                                                              " had been set already"));
+                  }
+                  break;
+               }
             }
          }
          catch (HornetQXAException e)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -43,6 +43,7 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA2;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_UNIQUE_ADD_METADATA;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_BINDINGQUERY_RESP;
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_CLOSE;
@@ -138,6 +139,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionUniqueAddMetaDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXACommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionXAForgetMessage;
@@ -524,6 +526,11 @@
             packet = new SessionAddMetaDataMessageV2();
             break;
          }
+         case SESS_UNIQUE_ADD_METADATA:
+         {
+            packet = new SessionUniqueAddMetaDataMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -181,19 +181,22 @@
    public static final byte REPLICATION_COMPARE_DATA = 102;
 
    public static final byte REPLICATION_SYNC = 103;
-
-   // HA
    
    public static final byte SESS_ADD_METADATA = 104;
    
    public static final byte SESS_ADD_METADATA2 = 105;
    
+   public static final byte SESS_UNIQUE_ADD_METADATA = 106;
+
+   // HA
+
    public static final byte CLUSTER_TOPOLOGY = 110;
 
    public static final byte NODE_ANNOUNCE = 111;
 
    public static final byte SUBSCRIBE_TOPOLOGY = 112;
 
+ 
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessageV2.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -30,7 +30,7 @@
    private String key;
    private String data;
    /**
-    * It won require confirmation during failover / reconnect
+    * It's not required confirmation during failover / reconnect
     */
    private boolean requiresConfirmation = true;
 
@@ -39,6 +39,11 @@
       super(PacketImpl.SESS_ADD_METADATA2);
    }
    
+   protected SessionAddMetaDataMessageV2(byte packetCode)
+   {
+      super(packetCode);
+   }
+   
    public SessionAddMetaDataMessageV2(String k, String d)
    {
       this();
@@ -46,6 +51,13 @@
       data = d;
    }
    
+   protected SessionAddMetaDataMessageV2(final byte packetCode, String k, String d)
+   {
+      super(packetCode);
+      key = k;
+      data = d;
+   }
+   
    public SessionAddMetaDataMessageV2(String k, String d, boolean requiresConfirmation)
    {
       this();

Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java	                        (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionUniqueAddMetaDataMessage.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+
+/**
+ * A SessionUniqueAddMetaDataMessageV2
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class SessionUniqueAddMetaDataMessage extends SessionAddMetaDataMessageV2
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public SessionUniqueAddMetaDataMessage()
+   {
+      super(SESS_UNIQUE_ADD_METADATA);
+   }
+   
+
+   public SessionUniqueAddMetaDataMessage(String key, String data)
+   {
+      super(SESS_UNIQUE_ADD_METADATA, key, data);
+   }
+   
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -128,13 +128,13 @@
 
       this.scheduledThreadPool = scheduledThreadPool;
 
-      this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server,
-                                                                                                     interceptors));
+      this.protocolMap.put(ProtocolType.CORE,
+                           new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
       // difference between Stomp and Stomp over Web Sockets is handled in NettyAcceptor.getPipeline()
-      this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server,
-                                                                                                       interceptors));
-      this.protocolMap.put(ProtocolType.STOMP_WS, new StompProtocolManagerFactory().createProtocolManager(server,
-                                                                                                          interceptors));
+      this.protocolMap.put(ProtocolType.STOMP,
+                           new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
+      this.protocolMap.put(ProtocolType.STOMP_WS,
+                           new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
    }
 
    // RemotingService implementation -------------------------------
@@ -374,9 +374,9 @@
       if (config.isBackup())
       {
          serverSideReplicatingConnection = entry.connection;
-      }      
+      }
    }
-   
+
    public void connectionDestroyed(final Object connectionID)
    {
       ConnectionEntry conn = connections.get(connectionID);
@@ -423,7 +423,7 @@
 
       // Connections should only fail when TTL is exceeded
    }
-   
+
    public void connectionReadyForWrites(final Object connectionID, final boolean ready)
    {
    }
@@ -497,76 +497,85 @@
       {
          while (!closed)
          {
-            long now = System.currentTimeMillis();
+            try
+            {
+               long now = System.currentTimeMillis();
 
-            Set<Object> idsToRemove = new HashSet<Object>();
+               Set<Object> idsToRemove = new HashSet<Object>();
 
-            for (ConnectionEntry entry : connections.values())
-            {
-               RemotingConnection conn = entry.connection;
+               for (ConnectionEntry entry : connections.values())
+               {
+                  RemotingConnection conn = entry.connection;
 
-               boolean flush = true;
+                  boolean flush = true;
 
-               if (entry.ttl != -1)
-               {
-                  if (now >= entry.lastCheck + entry.ttl)
+                  if (entry.ttl != -1)
                   {
-                     if (!conn.checkDataReceived())
+                     if (now >= entry.lastCheck + entry.ttl)
                      {
-                        idsToRemove.add(conn.getID());
+                        if (!conn.checkDataReceived())
+                        {
+                           idsToRemove.add(conn.getID());
 
-                        flush = false;
+                           flush = false;
+                        }
+                        else
+                        {
+                           entry.lastCheck = now;
+                        }
                      }
-                     else
-                     {
-                        entry.lastCheck = now;
-                     }
                   }
+
+                  if (flush)
+                  {
+                     conn.flush();
+                  }
                }
 
-               if (flush)
+               for (Object id : idsToRemove)
                {
-                  conn.flush();
+                  RemotingConnection conn = removeConnection(id);
+                  if (conn != null)
+                  {
+                     HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                                                "Did not receive data from " + conn.getRemoteAddress() +
+                                                                         ". It is likely the client has exited or crashed without " +
+                                                                         "closing its connection, or the network between the server and client has failed. " +
+                                                                         "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
+                                                                         "Please check user manual for more information." +
+                                                                         " The connection will now be closed.");
+                     conn.fail(me);
+                  }
                }
-            }
 
-            for (Object id : idsToRemove)
-            {
-               RemotingConnection conn = removeConnection(id);
+               synchronized (this)
+               {
+                  long toWait = pauseInterval;
 
-               HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                                          "Did not receive data from " + conn.getRemoteAddress() +
-                                                                   ". It is likely the client has exited or crashed without " +
-                                                                   "closing its connection, or the network between the server and client has failed. " +
-                                                                   "You also might have configured connection-ttl and client-failure-check-period incorrectly. " +
-                                                                   "Please check user manual for more information." +
-                                                                   " The connection will now be closed.");
-               conn.fail(me);
-            }
+                  long start = System.currentTimeMillis();
 
-            synchronized (this)
-            {
-               long toWait = pauseInterval;
-
-               long start = System.currentTimeMillis();
-
-               while (!closed && toWait > 0)
-               {
-                  try
+                  while (!closed && toWait > 0)
                   {
-                     wait(toWait);
-                  }
-                  catch (InterruptedException e)
-                  {
-                  }
+                     try
+                     {
+                        wait(toWait);
+                     }
+                     catch (InterruptedException e)
+                     {
+                     }
 
-                  now = System.currentTimeMillis();
+                     now = System.currentTimeMillis();
 
-                  toWait -= now - start;
+                     toWait -= now - start;
 
-                  start = now;
+                     start = now;
+                  }
                }
             }
+            catch (Throwable e)
+            {
+               log.warn(e.getMessage(), e);
+            }
          }
       }
    }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/HornetQServer.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -126,6 +126,9 @@
 
    List<ServerSession> getSessions(String connectionID);
 
+   /** will return true if there is any session wth this key */
+   boolean lookupSession(String metakey, String metavalue);
+
    ClusterManager getClusterManager();
 
    SimpleString getNodeID();

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/ServerSession.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -124,6 +124,8 @@
 
    void addMetaData(String key, String data);
 
+   boolean addUniqueMetaData(String key, String data);
+
    String getMetaData(String key);
 
    String[] getTargetAddresses();

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -966,6 +966,23 @@
       sessions.remove(name);
    }
 
+   public boolean lookupSession(String key, String value)
+   {
+      // getSessions is called here in a try to minimize locking the Server while this check is being done
+      Set<ServerSession> allSessions = getSessions();
+      
+      for (ServerSession session : allSessions)
+      {
+         String metaValue = session.getMetaData(key);
+         if (metaValue != null && metaValue.equals(value))
+         {
+            return true;
+         }
+      }
+      
+      return false;
+   }
+   
    public synchronized List<ServerSession> getSessions(final String connectionID)
    {
       Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -1173,6 +1173,21 @@
       metaData.put(key, data);
    }
 
+
+   public boolean addUniqueMetaData(String key, String data)
+   {
+      if (server.lookupSession(key, data))
+      {
+         // There is a duplication of this property
+         return false;
+      }
+      else
+      {
+         addMetaData(key, data);
+         return true;
+      }
+   }
+
    public String getMetaData(String key)
    {
       String data = null;

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/src/main/org/hornetq/jms/client/HornetQConnection.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -181,6 +181,18 @@
       {
          throw new IllegalStateException("setClientID can only be called directly after the connection is created");
       }
+      
+      try
+      {
+         initialSession.addUniqueMetaData("jms-client-id", clientID);
+      }
+      catch (HornetQException e)
+      {
+         if (e.getCode() == HornetQException.DUPLICATE_METADATA)
+         {
+            throw new IllegalStateException("clientID=" + clientID + " was already set into another connection");
+         }
+      }
 
       this.clientID = clientID;
       try

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/ConnectionTest.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -88,8 +88,21 @@
       connection.setClientID(clientID);
 
       ProxyAssertSupport.assertEquals(clientID, connection.getClientID());
+      
+      Connection connection2 = JMSTest.cf.createConnection();
+      try
+      {
+         connection2.setClientID(clientID);
+         fail("setClientID was expected to throw an exception");
+      }
+      catch (JMSException e)
+      {
+         // expected
+      }
 
       connection.close();
+
+      connection2.setClientID(clientID);
    }
 
    public void testSetClientAfterStart() throws Exception

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -1417,6 +1417,15 @@
          // TODO Auto-generated method stub
          
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.api.core.client.ClientSession#addUniqueMetaData(java.lang.String, java.lang.String)
+       */
+      public void addUniqueMetaData(String key, String data) throws HornetQException
+      {
+         // TODO Auto-generated method stub
+         
+      }
    }
 
 }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2011-09-21 17:06:02 UTC (rev 11386)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7242/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java	2011-09-21 17:56:19 UTC (rev 11387)
@@ -98,7 +98,7 @@
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
       JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
       Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
 
       TopicControl topicControl = createManagementControl();
       Assert.assertEquals(3, topicControl.getSubscriptionCount());
@@ -118,7 +118,7 @@
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
       JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
       Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"_2", subscriptionName + "2");
 
       TopicControl topicControl = createManagementControl();
 
@@ -145,7 +145,7 @@
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
       TopicSubscriber subs1 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
       Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+      TopicSubscriber subs2 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID + "2", subscriptionName + "2");
 
       TopicControl topicControl = createManagementControl();
       Assert.assertEquals(3, topicControl.listAllSubscriptions().length);
@@ -171,7 +171,7 @@
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
       JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
       Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
 
       TopicControl topicControl = createManagementControl();
       String jsonString = topicControl.listDurableSubscriptionsAsJSON();
@@ -179,7 +179,7 @@
       Assert.assertEquals(2, infos.length);
       Assert.assertEquals(clientID, infos[0].getClientID());
       Assert.assertEquals(subscriptionName, infos[0].getName());
-      Assert.assertEquals(clientID, infos[1].getClientID());
+      Assert.assertEquals(clientID+"2", infos[1].getClientID());
       Assert.assertEquals(subscriptionName + "2", infos[1].getName());
 
       jsonString = topicControl.listNonDurableSubscriptionsAsJSON();
@@ -344,7 +344,7 @@
       Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
       JMSUtil.createDurableSubscriber(connection_1, topic, clientID, subscriptionName);
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName + "2");
+      JMSUtil.createDurableSubscriber(connection_2, topic, clientID+"2", subscriptionName + "2");
 
       JMSUtil.sendMessages(topic, 3);
 
@@ -438,7 +438,7 @@
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
       JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
       Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+      JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2");
 
       TopicControl topicControl = createManagementControl();
 
@@ -460,7 +460,7 @@
       Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
       MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
       Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
-      MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+      MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID+"2", subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
 
       TopicControl topicControl = createManagementControl();
 



More information about the hornetq-commits mailing list