[jboss-cvs] JBoss Messaging SVN: r4289 - in trunk: src/config and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 23 09:37:10 EDT 2008
Author: ataylor
Date: 2008-05-23 09:37:09 -0400 (Fri, 23 May 2008)
New Revision: 4289
Added:
trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java
trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
Removed:
trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
Modified:
trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
trunk/src/config/jbm-configuration.xml
trunk/src/config/jbm-standalone-beans.xml
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
Log:
changed server side keepalive
Modified: trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/examples/jms/src/org/jboss/jms/example/QueueExample.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -48,7 +48,6 @@
Queue queue = (Queue) initialContext.lookup("/queue/testQueue");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
connection = cf.createConnection();
- connection2 = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("This is a text message!");
@@ -59,14 +58,6 @@
TextMessage message2 = (TextMessage) messageConsumer.receive(5000);
log.info("message received from queue");
log.info("message = " + message2.getText());
- try
- {
- Thread.sleep(200000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
}
catch (NamingException e)
{
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/config/jbm-configuration.xml 2008-05-23 13:37:09 UTC (rev 4289)
@@ -22,7 +22,7 @@
<remoting-host>localhost</remoting-host>
<!-- timeout in seconds -->
- <remoting-timeout>5</remoting-timeout>
+ <remoting-timeout>5000</remoting-timeout>
<!-- true to disable invm communication when the client and the server are in the same JVM. -->
<!-- it is not allowed to disable invm communication when the remoting-transport is set to INVM -->
Modified: trunk/src/config/jbm-standalone-beans.xml
===================================================================
--- trunk/src/config/jbm-standalone-beans.xml 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/config/jbm-standalone-beans.xml 2008-05-23 13:37:09 UTC (rev 4289)
@@ -45,6 +45,9 @@
<property name="remotingService">
<inject bean="RemotingService"/>
</property>
+ <property name="cleanUpNotifier">
+ <inject bean="RemotingService"/>
+ </property>
<property name="configuration">
<inject bean="Configuration"/>
</property>
Copied: trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java (from rev 4282, trunk/src/main/org/jboss/messaging/core/client/ServerPonger.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/ServerPinger.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,8 @@
+package org.jboss.messaging.core.client;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface ServerPinger extends Runnable
+{
+}
Copied: trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java (from rev 4282, trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ServerPingerImpl.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.client.impl;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.client.ServerPinger;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.apache.mina.common.IoSession;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ServerPingerImpl implements PacketHandler
+{
+ private static Logger log = Logger.getLogger(ServerPingerImpl.class);
+ private static boolean traceEnabled = log.isTraceEnabled();
+ private long id;
+ KeepAliveHandler keepAliveHandler;
+
+ public ServerPingerImpl(KeepAliveHandler keepAliveHandler, Long id)
+ {
+ this.keepAliveHandler = keepAliveHandler;
+ this.id = id;
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public void handle(Packet packet, PacketReturner sender)
+ {
+ Ping ping = (Ping) packet;
+ if(traceEnabled)
+ {
+ log.trace("received ping:" + ping);
+ }
+ Pong pong = keepAliveHandler.ping(ping);
+
+ if(pong != null)
+ {
+ try
+ {
+ sender.send(pong);
+ }
+ catch (Exception e)
+ {
+ log.warn("error sending pong to server", e);
+ }
+ }
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ServerPongerImpl.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -1,99 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.client.impl;
-
-import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
-import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.*;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.client.ServerPonger;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.apache.mina.common.IoSession;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- */
-public class ServerPongerImpl implements PacketHandler, ServerPonger
-{
- private static Logger log = Logger.getLogger(ServerPongerImpl.class);
- private static boolean traceEnabled = log.isTraceEnabled();
- IoSession session;
- private long id;
- long interval;
- long timeout;
- CleanUpNotifier cleanUpNotifier;
- KeepAliveHandler keepAliveHandler;
- CountDownLatch latch = new CountDownLatch(1);
-
- public ServerPongerImpl(CleanUpNotifier cleanUpNotifier, KeepAliveHandler keepAliveHandler, IoSession session, long id, long timeout, long interval)
- {
- this.cleanUpNotifier = cleanUpNotifier;
- this.keepAliveHandler = keepAliveHandler;
- this.session = session;
- this.id = id;
- this.timeout = timeout;
- this.interval = interval;
- }
-
-
- public void run()
- {
- boolean pinged;
- latch = new CountDownLatch(1);
- try
- {
- pinged = latch.await(timeout + interval, TimeUnit.MILLISECONDS);
- if(!pinged)
- {
- log.warn("no ping received from server, cleaning up connection.");
- cleanUpNotifier.fireCleanup(session.getId(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "no ping received from server"));
- }
- }
- catch (InterruptedException e)
- {
- }
- }
-
- public long getID()
- {
- return id;
- }
-
- public void handle(Packet packet, PacketReturner sender)
- {
- Ping ping = (Ping) packet;
- latch.countDown();
- if(traceEnabled)
- {
- log.trace("received ping:" + ping);
- }
- Pong pong = keepAliveHandler.ping(ping);
- if(pong != null)
- {
- session.write(pong);
- }
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -46,7 +46,7 @@
public static final String REMOTING_ENABLE_SSL_SYSPROP_KEY = "jbm.remoting.enable.ssl";
public static final int DEFAULT_REMOTING_PORT = 5400;
- public static final int DEFAULT_KEEP_ALIVE_INTERVAL = 0; // in seconds
+ public static final int DEFAULT_KEEP_ALIVE_INTERVAL = 10; // in seconds
public static final int DEFAULT_KEEP_ALIVE_TIMEOUT = 5; // in seconds
public static final int DEFAULT_REQRES_TIMEOUT = 5; // in seconds
public static final boolean DEFAULT_INVM_DISABLED = false;
Added: trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/KeepAliveFactory.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,26 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface KeepAliveFactory
+{
+
+ Ping ping(long sessionID);
+
+ Pong pong(long sessionID, Ping ping);
+
+ boolean isPinging(long sessionID);
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingService.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -11,6 +11,7 @@
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.core.server.ClientPinger;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -23,6 +24,8 @@
PacketDispatcher getDispatcher();
Configuration getConfiguration();
+
+ ServerKeepAliveFactory getKeepAliveFactory();
void addInterceptor(Interceptor interceptor);
@@ -31,6 +34,4 @@
void addRemotingSessionListener(RemotingSessionListener listener);
void removeRemotingSessionListener(RemotingSessionListener listener);
-
- void setClientPinger(ClientPinger clientPinger);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ClientKeepAliveHandler.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -37,7 +37,7 @@
public Pong ping(Ping ping)
{
Pong pong = new Pong(ping.getSessionID(), false);
- pong.setTargetID(0);
+ pong.setTargetID(ping.getResponseTargetID());
return pong;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -27,9 +27,8 @@
import org.jboss.messaging.core.client.ConnectionParams;
import org.jboss.messaging.core.client.Location;
import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.ServerPonger;
import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
-import org.jboss.messaging.core.client.impl.ServerPongerImpl;
+import org.jboss.messaging.core.client.impl.ServerPingerImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.*;
@@ -70,9 +69,7 @@
KeepAliveHandler keepAliveHandler;
- private ScheduledExecutorService scheduledExecutor;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -172,14 +169,15 @@
}
session = future.getSession();
- ServerPongerImpl pinger = new ServerPongerImpl(this, keepAliveHandler, session, 0, connectionParams.getKeepAliveTimeout() * 1000, connectionParams.getKeepAliveInterval() * 1000);
-
+ ServerPingerImpl pinger = new ServerPingerImpl(keepAliveHandler, (long) 0);
+ /*
getDispatcher().register(pinger);
if (connectionParams.getKeepAliveInterval() > 0)
{
scheduledExecutor = new ScheduledThreadPoolExecutor(1);
scheduledExecutor.scheduleAtFixedRate(pinger, 0, connectionParams.getKeepAliveInterval(), TimeUnit.SECONDS);
- }
+ }*/
+ dispatcher.register(pinger);
return new MinaSession(session, handler);
}
@@ -189,7 +187,6 @@
{
return false;
}
-
CloseFuture closeFuture = session.close().awaitUninterruptibly();
boolean closed = closeFuture.isClosed();
@@ -215,10 +212,7 @@
connector = null;
session = null;
- if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
- {
- scheduledExecutor.shutdown();
- }
+
return closed;
}
@@ -262,10 +256,6 @@
public synchronized void fireCleanup(long sessionID, MessagingException me)
{
- if (scheduledExecutor != null && !scheduledExecutor.isShutdown())
- {
- scheduledExecutor.shutdown();
- }
for (RemotingSessionListener listener: listeners)
{
listener.sessionDestroyed(sessionID, me);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -67,7 +67,7 @@
private List<Interceptor> filters = new CopyOnWriteArrayList<Interceptor>();
- private ClientPinger clientPinger;
+ private ServerKeepAliveFactory factory;
// Static --------------------------------------------------------
@@ -75,11 +75,18 @@
public MinaService(Configuration config)
{
+ this(config, new ServerKeepAliveFactory());
+ }
+
+ public MinaService(Configuration config, ServerKeepAliveFactory factory)
+ {
assert config != null;
+ assert factory != null;
validate(config);
this.config = config;
+ this.factory = factory;
dispatcher = new PacketDispatcherImpl(filters);
}
@@ -109,12 +116,6 @@
listeners.remove(listener);
}
- public void setClientPinger(ClientPinger clientPinger)
- {
- this.clientPinger = clientPinger;
- clientPinger.registerCleanUpNotifier(this);
- }
-
// TransportService implementation -------------------------------
public void start() throws Exception
@@ -213,6 +214,11 @@
return config;
}
+ public ServerKeepAliveFactory getKeepAliveFactory()
+ {
+ return factory;
+ }
+
/**
* This method must only be called by tests which requires
* to insert Filters (e.g. to simulate network failures)
@@ -229,15 +235,28 @@
public void fireCleanup(long sessionID, MessagingException me)
{
- for (RemotingSessionListener listener : listeners)
+ if (factory.getSessions().contains(sessionID))
{
- listener.sessionDestroyed(sessionID, me);
+ for (RemotingSessionListener listener : listeners)
+ {
+ listener.sessionDestroyed(sessionID, me);
+ }
+ factory.getSessions().remove(sessionID);
}
}
// Public --------------------------------------------------------
+ public void setKeepAliveFactory(ServerKeepAliveFactory factory)
+ {
+ assert factory != null;
+ this.factory = factory;
+ }
+
+ // Public --------------------------------------------------------
+
+
public void setRemotingConfiguration(Configuration remotingConfig)
{
assert started == false;
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/ServerKeepAliveFactory.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina;
+
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class ServerKeepAliveFactory implements KeepAliveFactory
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger
+ .getLogger(ServerKeepAliveFactory.class);
+
+ // Attributes ----------------------------------------------------
+
+ // FIXME session mapping must be cleaned when the server session is closed:
+ // either normally or exceptionally
+ /**
+ * Key = server session ID Value = client session ID
+ */
+ private List<Long> sessions = new ArrayList<Long>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // KeepAliveFactory implementation -------------------------------
+
+ public Ping ping(long sessionID)
+ {
+ return new Ping(sessionID);
+ }
+
+ public boolean isPinging(long sessionID)
+ {
+ return sessions.contains(sessionID);
+ }
+
+ public Pong pong(long sessionID, Ping ping)
+ {
+ long clientSessionID = ping.getSessionID();
+ return new Pong(sessionID, sessions.contains(clientSessionID));
+ }
+
+ public List<Long> getSessions()
+ {
+ return sessions;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/ClientPinger.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -15,31 +15,5 @@
*/
void run();
- /**
- * pong received from client
- * @param pong the pong
- */
- void pong(Pong pong);
- /**
- * register a connection.
- *
- * @param remotingSessionID the session id
- * @param sender the sender
- */
- void registerConnection(long remotingSessionID, PacketReturner sender);
-
- /**
- * unregister a connection.
- *
- * @param remotingSessionID the session id
- */
- void unregister(long remotingSessionID);
-
- /**
- * register the cleanup notifier to use
- *
- * @param cleanUpNotifier the notifier
- */
- void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -28,6 +28,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.PacketReturner;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.security.SecurityStore;
@@ -91,7 +92,8 @@
CreateConnectionResponse createConnection(String username, String password,
long remotingClientSessionID, String clientAddress,
- int incrementVersion) throws Exception;
+ int incrementVersion,
+ PacketReturner sender) throws Exception;
DeploymentManager getDeploymentManager();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ClientPingerImpl.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -25,6 +25,9 @@
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.KeepAliveFactory;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.ServerConnection;
@@ -40,7 +43,7 @@
/**
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
-public class ClientPingerImpl implements ClientPinger
+public class ClientPingerImpl implements ClientPinger, PacketHandler
{
private static Logger log = Logger.getLogger(ClientPingerImpl.class);
@@ -61,122 +64,76 @@
* the cleanupnotifier to use on failed pings
*/
private CleanUpNotifier cleanUpNotifier;
+ private KeepAliveFactory keepAliveFactory;
+ private PacketReturner sender;
+ long id = 0;
+ private Pong pong = null;
- public ClientPingerImpl(MessagingServer server)
+ public ClientPingerImpl(MessagingServer server, KeepAliveFactory keepAliveFactory, CleanUpNotifier cleanUpNotifier, final PacketReturner sender)
{
this.server = server;
+ this.keepAliveFactory = keepAliveFactory;
+ this.cleanUpNotifier = cleanUpNotifier;
+ this.sender = sender;
}
public void run()
{
- try
+ id = server.getRemotingService().getDispatcher().generateID();
+ server.getRemotingService().getDispatcher().register(this);
+ Ping ping = keepAliveFactory.ping(sender.getSessionID());
+ ping.setTargetID(0);
+ ping.setResponseTargetID(id);
+ while(keepAliveFactory.isPinging(sender.getSessionID()))
{
synchronized (this)
{
- replies.clear();
- //ping all the sessions
- for (Long sessionId : connections.keySet())
+ try
{
- try
- {
- Ping ping = new Ping(sessionId);
- ping.setTargetID(0);
- connections.get(sessionId).getPacketReturner().send(ping);
- replies.add(sessionId);
- if(isTraceEnabled)
- {
- log.trace("sending " + ping);
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ wait(server.getConfiguration().getKeepAliveInterval() * 1000);
}
- //wait for the keep alive timeout period
- try
+ catch (InterruptedException e)
{
+ }
+ }
+ pong = null;
+ try
+ {
+ sender.send(ping);
+ synchronized (this)
+ {
wait(server.getConfiguration().getKeepAliveTimeout() * 1000);
}
- catch (InterruptedException e)
+ if(pong == null)
{
+ cleanUpNotifier.fireCleanup(sender.getSessionID(), new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "unable to ping client"));
+ break;
}
}
- //at this point cleanup any replies we havent received
- for (Long reply : replies)
+ catch (Exception e)
{
- if(cleanUpNotifier != null)
- cleanUpNotifier.fireCleanup(reply, new MessagingException(MessagingException.CONNECTION_TIMEDOUT, "unable to ping client"));
- connections.remove(reply);
+ log.warn("problem cleaning up session: " + sender.getSessionID(), e);
}
}
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ server.getRemotingService().getDispatcher().unregister(id);
}
- /**
- * pong received from client
- * @param pong
- */
- public void pong(Pong pong)
+ public long getID()
{
- if(isTraceEnabled)
- {
- log.trace("received reply" + pong);
- }
- replies.remove(pong.getSessionID());
+ return id;
}
- /**
- * register a connection.
- *
- * @param remotingSessionID
- * @param sender
- */
- public void registerConnection(long remotingSessionID, PacketReturner sender)
+ public void handle(Packet packet, PacketReturner sender)
{
- if (connections.get(remotingSessionID) == null)
+ Pong pong = (Pong) packet;
+ if(isTraceEnabled)
{
- connections.put(remotingSessionID, new ConnectionHolder(remotingSessionID, sender));
+ log.trace("received reply" + pong);
}
- else
- {
- connections.get(remotingSessionID).increment();
- }
-
+ this.pong = pong;
}
/**
- * unregister a connection.
- *
- * @param remotingSessionID
- */
- public void unregister(long remotingSessionID)
- {
- ConnectionHolder connectionHolder = connections.get(remotingSessionID);
- if(connectionHolder != null)
- {
- connectionHolder.decrement();
- if(connectionHolder.get() == 0)
- {
- connections.remove(remotingSessionID);
- }
- }
- }
-
- /**
- * register the cleanup notifier to use
- *
- * @param cleanUpNotifier
- */
- public void registerCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
- {
- this.cleanUpNotifier = cleanUpNotifier;
- }
-
- /**
* simple holder class for sessions
*/
class ConnectionHolder
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -40,12 +40,12 @@
import org.jboss.messaging.core.persistence.impl.nullpm.NullStorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
-import org.jboss.messaging.core.remoting.ConnectorRegistrySingleton;
-import org.jboss.messaging.core.remoting.Interceptor;
-import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.mina.MinaService;
import org.jboss.messaging.core.remoting.impl.mina.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.impl.mina.ServerKeepAliveFactory;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateConnectionResponse;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.security.SecurityStore;
@@ -96,7 +96,6 @@
private Deployer queueSettingsDeployer;
private JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
private DeploymentManager deploymentManager = new FileDeploymentManager();
- private ClientPinger clientPinger;
// plugins
@@ -112,6 +111,7 @@
private ResourceManager resourceManager = new ResourceManagerImpl(0);
private ScheduledExecutorService scheduledExecutor;
private MessagingServerPacketHandler serverPacketHandler;
+ private CleanUpNotifier cleanUpNotifier = null;
// Constructors ---------------------------------------------------------------------------------
/**
@@ -137,6 +137,7 @@
this.configuration = configuration;
createTransport = true;
remotingService = new MinaService(configuration);
+ cleanUpNotifier = (CleanUpNotifier) remotingService;
}
// lifecycle methods ----------------------------------------------------------------
@@ -172,8 +173,6 @@
}
// Start the wired components
securityDeployer.start();
- clientPinger = new ClientPingerImpl(this);
- remotingService.setClientPinger(clientPinger);
remotingService.addRemotingSessionListener(connectionManager);
memoryManager.start();
deploymentManager.start(1);
@@ -181,9 +180,8 @@
deploymentManager.registerDeployer(queueSettingsDeployer);
postOffice.start();
deploymentManager.start(2);
- serverPacketHandler = new MessagingServerPacketHandler(this, clientPinger);
+ serverPacketHandler = new MessagingServerPacketHandler(this);
getRemotingService().getDispatcher().register(serverPacketHandler);
- serverPacketHandler.start();
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : configuration.getDefaultInterceptors())
{
@@ -217,7 +215,6 @@
queueSettingsDeployer.stop();
deploymentManager.stop();
remotingService.removeRemotingSessionListener(connectionManager);
- serverPacketHandler.stop();
connectionManager = null;
memoryManager.stop();
memoryManager = null;
@@ -283,7 +280,12 @@
{
this.storageManager = storageManager;
}
-
+
+ public void setCleanUpNotifier(CleanUpNotifier cleanUpNotifier)
+ {
+ this.cleanUpNotifier = cleanUpNotifier;
+ }
+
public PostOffice getPostOffice()
{
return postOffice;
@@ -322,7 +324,8 @@
public CreateConnectionResponse createConnection(final String username, final String password,
final long remotingClientSessionID, final String clientAddress,
- final int incrementVersion)
+ final int incrementVersion,
+ final PacketReturner sender)
throws Exception
{
log.trace("creating a new connection for user " + username);
@@ -348,9 +351,20 @@
queueSettingsRepository,
postOffice, securityStore, connectionManager);
- remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection, clientPinger));
+ remotingService.getDispatcher().register(new ServerConnectionPacketHandler(connection));
- return new CreateConnectionResponse(connection.getID(), version);
+ CreateConnectionResponse createConnectionResponse = new CreateConnectionResponse(connection.getID(), version);
+ if(cleanUpNotifier != null)
+ {
+ if(!getRemotingService().getKeepAliveFactory().isPinging(sender.getSessionID()))
+ {
+ getRemotingService().getKeepAliveFactory().getSessions().add(sender.getSessionID());
+ ClientPinger clientPinger = new ClientPingerImpl(this, getRemotingService().getKeepAliveFactory(), cleanUpNotifier, sender);
+ new Thread(clientPinger).start();
+ }
+ }
+
+ return createConnectionResponse;
}
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -43,39 +43,20 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
-public class MessagingServerPacketHandler extends ServerPacketHandlerSupport implements MessagingComponent
+public class MessagingServerPacketHandler extends ServerPacketHandlerSupport
{
private static final Logger log = Logger.getLogger(MessagingServerPacketHandler.class);
private final MessagingServer server;
- private final ClientPinger clientPinger;
private ScheduledExecutorService scheduledExecutor;
- public MessagingServerPacketHandler(final MessagingServer server, ClientPinger clientPinger)
+ public MessagingServerPacketHandler(final MessagingServer server)
{
this.server = server;
- this.clientPinger = clientPinger;
}
-
- public void start() throws Exception
- {
- if (server.getConfiguration().getKeepAliveInterval() > 0)
- {
- scheduledExecutor = new ScheduledThreadPoolExecutor(1);
- scheduledExecutor.scheduleAtFixedRate(clientPinger, 0, server.getConfiguration().getKeepAliveInterval(), TimeUnit.SECONDS);
- }
- }
-
- public void stop() throws Exception
- {
- if (server.getConfiguration().getKeepAliveInterval() > 0)
- {
- scheduledExecutor.shutdownNow();
- }
- }
/*
* The advantage to use String as ID is that we can leverage Java 5 UUID to
* generate these IDs. However theses IDs are 128 bite long and it increases
@@ -103,15 +84,14 @@
CreateConnectionResponse createConnectionResponse = server.createConnection(request.getUsername(), request.getPassword(),
request.getRemotingSessionID(),
sender.getRemoteAddress(),
- request.getVersion());
- clientPinger.registerConnection(request.getRemotingSessionID(), sender);
+ request.getVersion(),
+ sender);
response = createConnectionResponse;
}
else if(type == EmptyPacket.PONG)
{
Pong decodedPong = (Pong) packet;
- clientPinger.pong(decodedPong);
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java 2008-05-23 13:17:14 UTC (rev 4288)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConnectionPacketHandler.java 2008-05-23 13:37:09 UTC (rev 4289)
@@ -40,12 +40,10 @@
public class ServerConnectionPacketHandler extends ServerPacketHandlerSupport
{
private final ServerConnection connection;
- final ClientPinger clientPinger;
- public ServerConnectionPacketHandler(final ServerConnection connection, final ClientPinger clientPinger)
+ public ServerConnectionPacketHandler(final ServerConnection connection)
{
this.connection = connection;
- this.clientPinger = clientPinger;
}
public long getID()
@@ -72,7 +70,7 @@
connection.stop();
break;
case EmptyPacket.CLOSE:
- clientPinger.unregister(connection.getRemotingClientSessionID());
+ //clientPinger.unregister(connection.getRemotingClientSessionID());
connection.close();
break;
default:
More information about the jboss-cvs-commits
mailing list