[jboss-cvs] JBoss Messaging SVN: r4463 - in trunk: src/main/org/jboss/messaging/core/client/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 13 08:56:57 EDT 2008
Author: timfox
Date: 2008-06-13 08:56:57 -0400 (Fri, 13 Jun 2008)
New Revision: 4463
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
Log:
More tests and tweaks
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientConsumer.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -27,5 +27,7 @@
void close() throws MessagingException;
- boolean isClosed();
+ boolean isClosed();
+
+ boolean isDirect();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -56,10 +56,9 @@
// Constructors ---------------------------------------------------------------------------------
- public ClientBrowserImpl(final long serverTargetID, final ClientSessionInternal session,
- final RemotingConnection remotingConnection)
+ public ClientBrowserImpl(final ClientSessionInternal session, final long serverTargetID)
{
- this.remotingConnection = remotingConnection;
+ this.remotingConnection = session.getConnection().getRemotingConnection();
this.serverTargetID = serverTargetID;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -75,6 +75,8 @@
private final boolean direct;
+ private final Runner runner = new Runner();
+
private volatile Thread receiverThread;
private volatile Thread onMessageThread;
@@ -101,9 +103,7 @@
* (and its PacketDispatcher) to a single server.
*/
public ClientConsumerImpl(final ClientSessionInternal session, final long targetID,
- final long clientTargetID,
- final ExecutorService sessionExecutor,
- final RemotingConnection remotingConnection,
+ final long clientTargetID,
final int clientWindowSize,
final boolean direct)
{
@@ -113,9 +113,9 @@
this.session = session;
- this.sessionExecutor = sessionExecutor;
+ this.sessionExecutor = session.getExecutorService();
- this.remotingConnection = remotingConnection;
+ this.remotingConnection = session.getConnection().getRemotingConnection();
this.clientWindowSize = clientWindowSize;
@@ -230,6 +230,8 @@
throw new MessagingException(MessagingException.ILLEGAL_STATE,"Cannot set MessageHandler - consumer is in receive(...)");
}
+ log.info("Setting handler");
+
waitForOnMessageToComplete();
this.handler = handler;
@@ -285,6 +287,11 @@
{
return closed;
}
+
+ public boolean isDirect()
+ {
+ return direct;
+ }
// ClientConsumerInternal implementation
// --------------------------------------------------------------
@@ -388,6 +395,11 @@
{
return buffer.size();
}
+
+ public int getCreditsToSend()
+ {
+ return creditsToSend;
+ }
// Public
// ---------------------------------------------------------------------------------------
@@ -403,7 +415,7 @@
private void queueExecutor()
{
- sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
+ sessionExecutor.execute(runner);
}
private void flowControl(final int messageBytes) throws MessagingException
@@ -509,5 +521,14 @@
// Inner classes
// --------------------------------------------------------------------------------
+
+ private class Runner implements Runnable
+ {
+ public void run()
+ {
+ callOnMessage();
+ }
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -30,4 +30,6 @@
long getIgnoreDeliveryMark();
int getBufferSize();
+
+ int getCreditsToSend();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -104,7 +104,7 @@
private final boolean cacheProducers;
- private final ExecutorService executor;
+ private final ExecutorService executorService;
private final RemotingConnection remotingConnection;
@@ -163,7 +163,7 @@
this.cacheProducers = cacheProducers;
//TODO - we should use OrderedExecutorFactory and a pool here
- executor = Executors.newSingleThreadExecutor();
+ executorService = Executors.newSingleThreadExecutor();
this.xa = xa;
@@ -302,7 +302,7 @@
}
ClientConsumerInternal consumer =
- new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, clientWindowSize, direct);
+ new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, clientWindowSize, direct);
addConsumer(consumer);
@@ -330,7 +330,7 @@
SessionCreateBrowserResponseMessage response = (SessionCreateBrowserResponseMessage)remotingConnection.sendBlocking(serverTargetID, serverTargetID, request);
- ClientBrowser browser = new ClientBrowserImpl(response.getBrowserTargetID(), this, remotingConnection);
+ ClientBrowser browser = new ClientBrowserImpl(this, response.getBrowserTargetID());
addBrowser(browser);
@@ -508,7 +508,7 @@
}
finally
{
- executor.shutdown();
+ executorService.shutdown();
connection.removeSession(this);
@@ -633,6 +633,11 @@
return new HashMap<SimpleString, ClientProducerInternal>(producerCache);
}
+ public ExecutorService getExecutorService()
+ {
+ return executorService;
+ }
+
// XAResource implementation --------------------------------------------------------------------
public void commit(final Xid xid, final boolean onePhase) throws XAException
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -8,6 +8,7 @@
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import org.jboss.messaging.core.client.ClientBrowser;
import org.jboss.messaging.core.client.ClientSession;
@@ -48,4 +49,6 @@
Set<ClientBrowser> getBrowsers();
Map<SimpleString, ClientProducerInternal> getProducerCache();
+
+ ExecutorService getExecutorService();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MessagingIOSessionDataStructureFactory.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -25,15 +25,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionAttributeMap;
import org.apache.mina.common.IoSessionDataStructureFactory;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteRequestQueue;
-import org.apache.mina.util.CircularQueue;
/**
*
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaAcceptor.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -21,11 +21,6 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import org.apache.mina.common.*;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
-import org.jboss.messaging.core.remoting.Acceptor;
-import org.jboss.messaging.core.remoting.CleanUpNotifier;
-import org.jboss.messaging.core.remoting.RemotingService;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
@@ -33,6 +28,16 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.messaging.core.remoting.Acceptor;
+import org.jboss.messaging.core.remoting.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.RemotingService;
+
/**
* A Mina TCP Acceptor that supports SSL
*
@@ -141,7 +146,7 @@
//register pinger
if (remotingService.getConfiguration().getKeepAliveInterval() > 0)
{
- remotingService.registerPinger(new MinaSession(session, null));
+ remotingService.registerPinger(new MinaSession(session));
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -6,7 +6,26 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
-import org.apache.mina.common.*;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
+import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoService;
+import org.apache.mina.common.IoServiceListener;
+import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.jboss.messaging.core.client.ConnectionParams;
@@ -17,19 +36,19 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.ping.Pinger;
import org.jboss.messaging.core.ping.impl.PingerImpl;
-import org.jboss.messaging.core.remoting.*;
+import org.jboss.messaging.core.remoting.CleanUpNotifier;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.RemotingConnector;
+import org.jboss.messaging.core.remoting.RemotingSession;
+import org.jboss.messaging.core.remoting.ResponseHandler;
+import org.jboss.messaging.core.remoting.TransportType;
import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addSSLFilter;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
import org.jboss.messaging.core.remoting.impl.wireformat.Pong;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt>
@@ -130,7 +149,7 @@
{
if (session != null && session.isConnected())
{
- return new MinaSession(session, handler);
+ return new MinaSession(session);
}
threadPool = Executors.newCachedThreadPool();
@@ -151,7 +170,7 @@
}
session = future.getSession();
- MinaSession minaSession = new MinaSession(session, handler);
+ MinaSession minaSession = new MinaSession(session);
//register a handler for dealing with server pings
dispatcher.register(new PacketHandler()
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaSession.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -22,25 +22,20 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(MinaConnector.class);
-
-
+
// Attributes ----------------------------------------------------
private final IoSession session;
- private MinaHandler handler;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public MinaSession(IoSession session, MinaHandler handler)
+ public MinaSession(IoSession session)
{
assert session != null;
this.session = session;
-
- this.handler = handler;
}
// Public --------------------------------------------------------
@@ -53,15 +48,6 @@
public void write(Packet packet)
{
-// try
-// {
-// handler.checkWrite(session);
-// }
-// catch (Exception e)
-// {
-// log.error("Failed to acquire sem", e);
-// }
-
session.write(packet);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2008-06-13 09:59:51 UTC (rev 4462)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientSessionImplTest.java 2008-06-13 12:56:57 UTC (rev 4463)
@@ -2082,7 +2082,7 @@
RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
//In ClientSessionImpl constructor
- EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+ EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
final long sessionTargetID = 7617622;
final SimpleString queueName = new SimpleString("gyugg");
@@ -2406,7 +2406,7 @@
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
- EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+ EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
@@ -2482,7 +2482,7 @@
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
- EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+ EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
@@ -2501,6 +2501,8 @@
EasyMock.expect(rc.sendBlocking(sessionTargetID, sessionTargetID, request)).andReturn(resp);
+ //EasyMock.expect(conn.getRemotingConnection()).andReturn(value)
+
EasyMock.expect(rc.getPacketDispatcher()).andReturn(pd);
pd.register(new ClientConsumerPacketHandler(null, clientTargetID));
@@ -2549,7 +2551,7 @@
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
- EasyMock.expect(conn.getRemotingConnection()).andReturn(rc);
+ EasyMock.expect(conn.getRemotingConnection()).andStubReturn(rc);
EasyMock.expect(conn.getConnectionFactory()).andReturn(cf);
More information about the jboss-cvs-commits
mailing list