[jboss-cvs] JBoss Messaging SVN: r6634 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Apr 30 09:54:07 EDT 2009


Author: ataylor
Date: 2009-04-30 09:54:06 -0400 (Thu, 30 Apr 2009)
New Revision: 6634

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/CloseListener.java
Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java
Log:
fixed temp queues not removed on close

Added: trunk/src/main/org/jboss/messaging/core/remoting/CloseListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/CloseListener.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/CloseListener.java	2009-04-30 13:54:06 UTC (rev 6634)
@@ -0,0 +1,30 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public interface CloseListener
+{
+   void connectionClosed();
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-04-30 13:09:49 UTC (rev 6633)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java	2009-04-30 13:54:06 UTC (rev 6634)
@@ -12,13 +12,13 @@
 
 package org.jboss.messaging.core.remoting;
 
-import java.util.List;
-
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 
+import java.util.List;
+
 /**
  * A RemotingConnection
  * 
@@ -62,4 +62,6 @@
    void freeze();
   
    Connection getTransportConnection();
+
+   void addClosingListener(CloseListener listener);
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-30 13:09:49 UTC (rev 6633)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-30 13:54:06 UTC (rev 6634)
@@ -12,6 +12,22 @@
 
 package org.jboss.messaging.core.remoting.impl;
 
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.ChannelHandler;
+import org.jboss.messaging.core.remoting.CloseListener;
+import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
+import org.jboss.messaging.core.remoting.FailureListener;
+import org.jboss.messaging.core.remoting.Interceptor;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
@@ -71,37 +87,6 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.ChannelHandler;
-import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
-import org.jboss.messaging.core.remoting.FailureListener;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
@@ -154,6 +139,21 @@
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.SimpleIDGenerator;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -226,6 +226,8 @@
 
    private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
 
+   private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>();
+
    private final long blockingCallTimeout;
 
    private Runnable pinger;
@@ -361,6 +363,11 @@
       return this.transportConnection;
    }
 
+   public void addClosingListener(CloseListener listener)
+   {
+      closeListeners.add(listener);
+   }
+
    public List<FailureListener> getFailureListeners()
    {
       return new ArrayList<FailureListener>(failureListeners);
@@ -440,7 +447,7 @@
       log.warn("Connection failure has been detected " + me.getMessage() + ":" + me.getCode());
 
       // Then call the listeners
-      callListeners(me);
+      callFailureListeners(me);
 
       internalClose();
 
@@ -472,10 +479,7 @@
 
       internalClose();
 
-      // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1421
-      // This affects clustering, so I'm keeping this out for now
-      // We need to inform Listeners about the connection being closed
-      // callListeners(null);
+      callClosingListeners();
    }
 
    public boolean isExpired(final long now)
@@ -554,7 +558,7 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   private void callListeners(final MessagingException me)
+   private void callFailureListeners(final MessagingException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
 
@@ -579,6 +583,26 @@
       }
    }
 
+   private void callClosingListeners()
+   {
+      final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
+
+      for (final CloseListener listener : listenersClone)
+      {
+         try
+         {
+            listener.connectionClosed();
+         }
+         catch (final Throwable t)
+         {
+            // Failure of one listener to execute shouldn't prevent others
+            // from
+            // executing
+            log.error("Failed to execute failure listener", t);
+         }
+      }
+   }
+
    private void internalClose()
    {
       if (future != null)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-30 13:09:49 UTC (rev 6633)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-30 13:54:06 UTC (rev 6634)
@@ -12,23 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.MBeanServer;
-
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
@@ -106,6 +89,22 @@
 import org.jboss.messaging.utils.UUIDGenerator;
 import org.jboss.messaging.utils.VersionLoader;
 
+import javax.management.MBeanServer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
 /**
  * The messaging server implementation
  *
@@ -1363,6 +1362,8 @@
 
       connection.addFailureListener(session);
 
+      connection.addClosingListener(session);
+
       return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-30 13:09:49 UTC (rev 6633)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-30 13:54:06 UTC (rev 6634)
@@ -11,20 +11,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
 import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.client.impl.ClientMessageImpl;
 import org.jboss.messaging.core.client.management.impl.ManagementHelper;
@@ -34,6 +20,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.Notification;
+import static org.jboss.messaging.core.management.NotificationType.CONSUMER_CREATED;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.BindingType;
@@ -42,6 +29,7 @@
 import org.jboss.messaging.core.postoffice.QueueBinding;
 import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.CloseListener;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -97,6 +85,17 @@
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.TypedProperties;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
 /*
  * Session implementation 
  * 
@@ -105,7 +104,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
-public class ServerSessionImpl implements ServerSession, FailureListener
+public class ServerSessionImpl implements ServerSession, FailureListener, CloseListener
 {
    // Constants -----------------------------------------------------------------------------
 
@@ -1116,6 +1115,28 @@
       return true;
    }
 
+   public void connectionClosed()
+   {
+      try
+      {
+         for (Runnable runner : failureRunners)
+         {
+            try
+            {
+               runner.run();
+            }
+            catch (Throwable t)
+            {
+               log.error("Failed to execute failure runner", t);
+            }
+         }
+      }
+      catch (Throwable t)
+      {
+         log.error("Failed fire listeners " + this);
+      }
+
+   }
    // Public
    // ----------------------------------------------------------------------------
 
@@ -1314,7 +1335,7 @@
                {
                   try
                   {
-                     postOffice.removeBinding(name);
+                     Binding b = postOffice.removeBinding(name);
                   }
                   catch (Exception e)
                   {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java	2009-04-30 13:09:49 UTC (rev 6633)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java	2009-04-30 13:54:06 UTC (rev 6634)
@@ -22,8 +22,6 @@
 
 package org.jboss.messaging.tests.integration.client;
 
-import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
-
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.client.ClientMessage;
 import org.jboss.messaging.core.client.ClientProducer;
@@ -32,13 +30,18 @@
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.CloseListener;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
 import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.impl.ServerSessionImpl;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
 import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A TemporaryQueueTest
  *
@@ -113,15 +116,25 @@
       session.close();
    }
 
-   public void _testDeleteTemporaryQueueAfterConnectionIsClosed() throws Exception
+   public void testDeleteTemporaryQueueAfterConnectionIsClosed() throws Exception
    {
       SimpleString queue = randomSimpleString();
       SimpleString address = randomSimpleString();
 
       session.createTemporaryQueue(address, queue);
+      RemotingConnectionImpl conn = (RemotingConnectionImpl) server.getRemotingService().getConnections().iterator().next();
 
+      final CountDownLatch latch = new CountDownLatch(1);
+      conn.addClosingListener(new CloseListener()
+      {
+         public void connectionClosed()
+         {
+            latch.countDown();
+         }
+      });
       session.close();
-
+      //wait for the closing listeners to be fired
+      assertTrue("connection close listeners not fired", latch.await(1, TimeUnit.SECONDS));
       session = sf.createSession(false, true, true);
       session.start();
       
@@ -139,9 +152,9 @@
    }
    
    /**
-    * @see ServerSessionImpl#doHandleCreateQueue()
+    * @see org.jboss.messaging.core.server.impl.ServerSessionImpl#doHandleCreateQueue(org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage) 
     */
-   public void _testDeleteTemporaryQueueAfterConnectionIsClosed_2() throws Exception
+   public void testDeleteTemporaryQueueAfterConnectionIsClosed_2() throws Exception
    {
       SimpleString queue = randomSimpleString();
       SimpleString address = randomSimpleString();
@@ -156,13 +169,11 @@
       session.close();
 
       // let some time for the server to clean the connections
-      Thread.sleep(1000);
+      //Thread.sleep(1000);
 
       session2.start();
       
       ClientConsumer consumer = session2.createConsumer(queue);
-      ClientMessage message = consumer.receive(500);
-      assertNotNull(message);
 
       session2.close();
    }
@@ -181,10 +192,19 @@
                                                      .getConnections()
                                                      .iterator()
                                                      .next();
+      final CountDownLatch latch = new CountDownLatch(1);
+      remotingConnection.addClosingListener(new CloseListener()
+      {
+         public void connectionClosed()
+         {
+            latch.countDown();
+         }
+      });
       remotingConnection.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "simulate a client failure"));
 
+
       // let some time for the server to clean the connections
-      Thread.sleep(1000);
+      latch.await(1, TimeUnit.SECONDS);
 
       assertEquals(0, server.getConnectionCount());
 




More information about the jboss-cvs-commits mailing list