[jboss-cvs] JBoss Messaging SVN: r4567 - in trunk: src/main/org/jboss/messaging/core/message/impl and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 24 04:50:05 EDT 2008
Author: ataylor
Date: 2008-06-24 04:50:05 -0400 (Tue, 24 Jun 2008)
New Revision: 4567
Added:
trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
Removed:
trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
Log:
added second failure listener to handle cleanup at the core level not JMS. ALso made MesagingBufferFactory an interface
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -21,9 +21,6 @@
*/
package org.jboss.messaging.core.client.impl;
-import java.util.HashSet;
-import java.util.Set;
-
import org.jboss.messaging.core.client.ClientConnectionFactory;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.RemotingSessionListener;
@@ -37,6 +34,9 @@
import org.jboss.messaging.core.version.Version;
import org.jboss.messaging.util.ConcurrentHashSet;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* The client-side Connection connectionFactory class.
*
@@ -89,6 +89,8 @@
this.serverTargetID = serverTargetID;
this.remotingConnection = connection;
+
+ this.remotingConnection.addRemotingSessionListener(new JBMFailureListener());
this.serverVersion = serverVersion;
@@ -144,7 +146,7 @@
{
checkClosed();
- remotingConnection.setRemotingSessionListener(listener);
+ remotingConnection.addRemotingSessionListener(listener);
}
public synchronized void close() throws MessagingException
@@ -258,6 +260,21 @@
}
}
+ private class JBMFailureListener implements RemotingSessionListener
+ {
+ public void sessionDestroyed(long sessionID, MessagingException me)
+ {
+ try
+ {
+ cleanUp();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to cleanup connection", e);
+ }
+ }
+
+ }
// Inner Classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,10 +22,6 @@
package org.jboss.messaging.core.client.impl;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.exception.MessagingException;
@@ -37,6 +33,10 @@
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -258,80 +258,21 @@
}
}
}
-
- //TODO - should combine close() and cleanup() functionality in one method - there is currently duplication
public void close() throws MessagingException
{
- if (closed)
- {
- return;
- }
-
- try
- {
- // Now we wait for any current handler runners to run.
- waitForOnMessageToComplete();
-
- closed = true;
-
- if (receiverThread != null)
- {
- synchronized (this)
- {
- // Wake up any receive() thread that might be waiting
- notify();
- }
- }
-
- handler = null;
-
- receiverThread = null;
-
- remotingConnection.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE));
-
- dispatcher.unregister(clientTargetID);
- }
- finally
- {
- session.removeConsumer(this);
- }
+ doCleanUp(true);
}
public synchronized void cleanUp()
{
- try
+ try
{
- // Now we wait for any current handler runners to run.
- waitForOnMessageToComplete();
-
- closed = true;
-
- if (receiverThread != null)
- {
- synchronized (this)
- {
- // Wake up any receive() thread that might be waiting
- notify();
- }
- }
-
- handler = null;
-
- receiverThread = null;
-
- dispatcher.unregister(clientTargetID);
+ doCleanUp(false);
}
- finally
+ catch (MessagingException e)
{
- try
- {
- session.removeConsumer(this);
- }
- catch (MessagingException e)
- {
- log.warn("Unable to clean up consumer:" + this);
- }
+ log.warn("problem cleaning up: " + this);
}
}
@@ -568,7 +509,47 @@
log.error("RuntimeException thrown from handler", e);
}
}
-
+
+ public void doCleanUp(boolean sendCloseMessage) throws MessagingException
+ {
+ if (closed)
+ {
+ return;
+ }
+
+ try
+ {
+ // Now we wait for any current handler runners to run.
+ waitForOnMessageToComplete();
+
+ closed = true;
+
+ if (receiverThread != null)
+ {
+ synchronized (this)
+ {
+ // Wake up any receive() thread that might be waiting
+ notify();
+ }
+ }
+
+ handler = null;
+
+ receiverThread = null;
+
+ if(sendCloseMessage)
+ {
+ remotingConnection.sendBlocking(targetID, sessionTargetID, new PacketImpl(PacketImpl.CLOSE));
+ }
+
+ dispatcher.unregister(clientTargetID);
+ }
+ finally
+ {
+ session.removeConsumer(this);
+ }
+ }
+
// Inner classes
// --------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -21,64 +21,22 @@
*/
package org.jboss.messaging.core.client.impl;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientBrowser;
-import org.jboss.messaging.core.client.ClientConnectionFactory;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.*;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionAddDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCancelMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateBrowserResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateProducerResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.MessagingBufferFactory;
-import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.*;
+import org.jboss.messaging.util.*;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -148,6 +106,8 @@
private final boolean blockOnAcknowledge;
+ private MessagingBufferFactory messagingBufferFactory;
+
//For testing only
private boolean forceNotSameRM;
@@ -200,6 +160,8 @@
this.autoCommitSends = autoCommitSends;
this.blockOnAcknowledge = blockOnAcknowledge;
+
+ messagingBufferFactory = new MessagingBufferFactoryImpl();
}
// ClientSession implementation -----------------------------------------------------------------
@@ -530,19 +492,19 @@
public ClientMessage createClientMessage(byte type, boolean durable, long expiration, long timestamp, byte priority)
{
- MessagingBuffer body = MessagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
+ MessagingBuffer body = messagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
return new ClientMessageImpl(type, durable, expiration, timestamp, priority, body);
}
public ClientMessage createClientMessage(byte type, boolean durable)
{
- MessagingBuffer body = MessagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
+ MessagingBuffer body = messagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
return new ClientMessageImpl(type, durable, body);
}
public ClientMessage createClientMessage(boolean durable)
{
- MessagingBuffer body = MessagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
+ MessagingBuffer body = messagingBufferFactory.createMessagingBuffer(remotingConnection.getLocation().getTransport(), 1024);
return new ClientMessageImpl(durable, body);
}
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -26,7 +26,6 @@
import org.jboss.messaging.core.message.Message;
import static org.jboss.messaging.util.DataConstants.*;
import org.jboss.messaging.util.MessagingBuffer;
-import org.jboss.messaging.util.MessagingBufferFactory;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
@@ -160,7 +159,7 @@
//TODO - this can be optimised
byte[] bytes = new byte[len];
buffer.getBytes(bytes);
- body = MessagingBufferFactory.createMessagingBuffer(buffer, len);
+ body = buffer.createNewBuffer(len);
body.putBytes(bytes);
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -45,7 +45,7 @@
void sendOneWay(long targetID, long executorID, Packet packet) throws MessagingException;
- void setRemotingSessionListener(RemotingSessionListener newListener);
+ void addRemotingSessionListener(RemotingSessionListener newListener);
PacketDispatcher getPacketDispatcher();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl;
@@ -30,6 +30,9 @@
import org.jboss.messaging.core.remoting.*;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @author <a href="ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -55,17 +58,17 @@
private RemotingSession session;
- private RemotingSessionListener listener;
+ private List<RemotingSessionListener> sessionListeners = new ArrayList<RemotingSessionListener>();
// Constructors ---------------------------------------------------------------------------------
public RemotingConnectionImpl(final Location location, ConnectionParams connectionParams) throws IllegalArgumentException
{
- if(location == null)
+ if (location == null)
{
throw new IllegalArgumentException("location must not be null");
}
- if(connectionParams == null)
+ if (connectionParams == null)
{
throw new IllegalArgumentException("connection params must not be null");
}
@@ -88,9 +91,9 @@
}
connector = ConnectorRegistryFactory.getRegistry().getConnector(location, connectionParams);
-
+
session = connector.connect();
-
+
if (log.isDebugEnabled())
log.debug("Using " + connector + " to connect to " + location);
@@ -106,13 +109,13 @@
{
if (connector != null)
{
- if (listener != null)
+ for (RemotingSessionListener sessionListener : sessionListeners)
{
- connector.removeSessionListener(listener);
+ connector.removeSessionListener(sessionListener);
}
-
+
RemotingConnector connectorFromRegistry = ConnectorRegistryFactory.getRegistry().removeConnector(location);
-
+
if (connectorFromRegistry != null)
{
connectorFromRegistry.disconnect();
@@ -210,24 +213,32 @@
}
}
- public synchronized void setRemotingSessionListener(final RemotingSessionListener newListener)
+ public synchronized void addRemotingSessionListener(final RemotingSessionListener newListener)
{
- if (listener != null && newListener != null)
+ if (newListener == null)
{
- throw new IllegalStateException("FailureListener already set to " + listener);
+ throw new IllegalStateException("FailureListener cannot be null");
}
+ if (sessionListeners.contains(newListener))
+ {
+ throw new IllegalStateException("FailureListener already set");
+ }
+
if (newListener != null)
{
+ sessionListeners.add(newListener);
connector.addSessionListener(newListener);
}
- else
- {
- connector.removeSessionListener(listener);
- }
- this.listener = newListener;
}
+ public synchronized void removeRemotingSessionListener(final RemotingSessionListener listener)
+ {
+ sessionListeners.remove(listener);
+ connector.removeSessionListener(listener);
+ }
+
+
public PacketDispatcher getPacketDispatcher()
{
return connector.getDispatcher();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/IoBufferWrapper.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,18 +22,14 @@
package org.jboss.messaging.core.remoting.impl.mina;
-import static org.jboss.messaging.util.DataConstants.FALSE;
-import static org.jboss.messaging.util.DataConstants.NOT_NULL;
-import static org.jboss.messaging.util.DataConstants.NULL;
-import static org.jboss.messaging.util.DataConstants.TRUE;
-
-import java.nio.charset.Charset;
-
import org.apache.mina.common.IoBuffer;
import org.jboss.messaging.core.logging.Logger;
+import static org.jboss.messaging.util.DataConstants.*;
import org.jboss.messaging.util.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
+import java.nio.charset.Charset;
+
/**
*
* A BufferWrapper
@@ -113,7 +109,12 @@
{
return new IoBufferWrapper(buffer.slice());
}
-
+
+ public MessagingBuffer createNewBuffer(int len)
+ {
+ return new IoBufferWrapper(len);
+ }
+
public int remaining()
{
return buffer.remaining();
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnection.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,35 +22,15 @@
package org.jboss.messaging.jms.client;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.jms.XAConnection;
-import javax.jms.XAQueueConnection;
-import javax.jms.XAQueueSession;
-import javax.jms.XASession;
-import javax.jms.XATopicConnection;
-import javax.jms.XATopicSession;
-
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -424,17 +404,6 @@
exceptionListener.onException(je);
}
-
- try
- {
- //FIXME - this should not be called from the jms layer
- //We need cleanup to also occur when core connections are dead, and the user may not be using the jms api
- ((ClientConnectionInternal)connection).cleanUp();
- }
- catch (Exception e)
- {
- log.error("Failed to cleanup connection", e);
- }
}
}
Modified: trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/util/ByteBufferWrapper.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -90,8 +90,13 @@
{
return new ByteBufferWrapper(buffer.slice());
}
-
- public void rewind()
+
+ public MessagingBuffer createNewBuffer(int len)
+ {
+ return new ByteBufferWrapper(ByteBuffer.allocate(len));
+ }
+
+ public void rewind()
{
buffer.rewind();
}
Modified: trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBuffer.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -115,5 +115,7 @@
void rewind();
- MessagingBuffer slice();
+ MessagingBuffer slice();
+
+ MessagingBuffer createNewBuffer(int len);
}
Deleted: trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -1,67 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.util;
-
-import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
-
-import java.nio.ByteBuffer;
-
-/**
- * a factory class for creating an appropriate type of MessagingBuffer.
- *
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
- */
-public class MessagingBufferFactory
-{
- public static MessagingBuffer createMessagingBuffer(TransportType transportType, int len)
- {
- if (transportType == TransportType.TCP)
- {
- return new IoBufferWrapper(len);
- }
- else if(transportType == TransportType.INVM)
- {
- return new ByteBufferWrapper(ByteBuffer.allocate(len));
- }
- else
- {
- throw new IllegalArgumentException("No Messaging Buffer for transport");
- }
- }
-
- public static MessagingBuffer createMessagingBuffer(MessagingBuffer buffer, int len)
- {
- if (buffer instanceof IoBufferWrapper)
- {
- return new IoBufferWrapper(len);
- }
- else if(buffer instanceof ByteBufferWrapper)
- {
- return new ByteBufferWrapper(ByteBuffer.allocate(len));
- }
- else
- {
- throw new IllegalArgumentException("No Messaging Buffer for transport");
- }
- }
-}
Added: trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/MessagingBufferFactory.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -0,0 +1,11 @@
+package org.jboss.messaging.util;
+
+import org.jboss.messaging.core.remoting.TransportType;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public interface MessagingBufferFactory
+{
+ MessagingBuffer createMessagingBuffer(TransportType transportType, int len);
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionFactoryImplTest.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -23,10 +23,7 @@
package org.jboss.messaging.tests.unit.core.client.impl;
import org.easymock.EasyMock;
-import org.jboss.messaging.core.client.ClientConnection;
-import org.jboss.messaging.core.client.ClientConnectionFactory;
-import org.jboss.messaging.core.client.ConnectionParams;
-import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.*;
import org.jboss.messaging.core.client.impl.ClientConnectionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientConnectionImpl;
import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
@@ -220,7 +217,6 @@
RemotingConnectionFactory rcf = EasyMock.createStrictMock(RemotingConnectionFactory.class);
RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
PacketDispatcher dispatcher = EasyMock.createStrictMock(PacketDispatcher.class);
ClientConnectionFactory cf =
@@ -232,7 +228,7 @@
EasyMock.expect(rcf.createRemotingConnection(location, params)).andReturn(rc);
rc.start();
-
+
Version clientVersion = VersionLoader.load();
CreateConnectionRequest request =
@@ -246,9 +242,9 @@
new CreateConnectionResponse(connTargetID, serverVersion);
EasyMock.expect(rc.sendBlocking(0, 0, request)).andReturn(response);
-
+
EasyMock.expect(rc.getPacketDispatcher()).andReturn(dispatcher);
-
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
EasyMock.replay(rcf, rc, dispatcher);
ClientConnection conn;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/ClientConnectionImplTest.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -18,12 +18,16 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.tests.unit.core.client.impl;
import org.easymock.EasyMock;
-import org.jboss.messaging.core.client.*;
+import org.easymock.IAnswer;
+import org.jboss.messaging.core.client.ClientConnectionFactory;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.Location;
+import org.jboss.messaging.core.client.RemotingSessionListener;
import org.jboss.messaging.core.client.impl.*;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -40,11 +44,9 @@
import java.util.Set;
/**
- *
* A ClientConnectionImplTest
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public class ClientConnectionImplTest extends UnitTestCase
{
@@ -52,22 +54,22 @@
public void testConstructor() throws Exception
{
- RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
Version version = new VersionImpl("blah132", 1, 1, 1, 12, "blah1652");
Location location = new LocationImpl(TransportType.TCP, "sausages");
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-
+
final long serverTargetID = 12091092;
-
+
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
EasyMock.replay(rc, pd);
-
+
ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-
+
EasyMock.verify(rc, pd);
assertTrue(conn.getServerVersion() == version);
@@ -81,11 +83,11 @@
testCreateSession(false, false, false, 14526512, false, false, false);
testCreateSession(true, true, true, 14526512, true, true, false);
}
-
+
public void testStartStop() throws Exception
{
- RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
Location location = new LocationImpl(TransportType.TCP, "ftftf");
@@ -93,28 +95,28 @@
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
final int serverTargetID = 23;
-
+
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
- ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-
+
+
+
rc.sendOneWay(serverTargetID, serverTargetID, new PacketImpl(PacketImpl.CONN_START));
-
+
EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(PacketImpl.CONN_STOP))).andReturn(null);
-
+
EasyMock.replay(rc, pd);
-
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
conn.start();
-
+
conn.stop();
-
- EasyMock.verify(rc, pd);
+
+ EasyMock.verify(rc, pd);
}
-
+
public void testSetRemotingSessionListener() throws Exception
{
- RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
Location location = new LocationImpl(TransportType.TCP, "ftftf");
@@ -122,84 +124,84 @@
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
final int serverTargetID = 23;
-
+
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
- ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-
+
+
+
RemotingSessionListener listener = new RemotingSessionListener()
{
public void sessionDestroyed(long sessionID, MessagingException me)
- {
+ {
}
};
-
- rc.setRemotingSessionListener(listener);
-
+
+ rc.addRemotingSessionListener(listener);
+
EasyMock.replay(rc, pd);
-
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
conn.setRemotingSessionListener(listener);
-
- EasyMock.verify(rc, pd);
+
+ EasyMock.verify(rc, pd);
}
-
+
public void testClose() throws Exception
{
- RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
Location location = new LocationImpl(TransportType.TCP, "ftftf");
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-
+
final int serverTargetID = 23;
-
+
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
+ EasyMock.replay(rc);
ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-
+ EasyMock.reset(rc);
assertFalse(conn.isClosed());
-
+
//Create some sessions
-
+
ClientSessionInternal sess1 = EasyMock.createStrictMock(ClientSessionInternal.class);
-
+
ClientSessionInternal sess2 = EasyMock.createStrictMock(ClientSessionInternal.class);
-
+
ClientSessionInternal sess3 = EasyMock.createStrictMock(ClientSessionInternal.class);
-
+
conn.addSession(sess1);
conn.addSession(sess2);
conn.addSession(sess3);
-
+
sess1.close();
sess2.close();
sess3.close();
-
+
EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, new PacketImpl(PacketImpl.CLOSE))).andReturn(null);
-
- rc.stop();
-
+
+ rc.stop();
+
EasyMock.replay(rc, pd, sess1, sess2, sess3);
-
+
conn.close();
-
+
EasyMock.verify(rc, pd, sess1, sess2, sess3);
-
+
assertTrue(conn.isClosed());
-
+
assertSame(conn.getRemotingConnection(), rc);
-
+
//Close again should do nothing
EasyMock.reset(rc, pd, sess1, sess2, sess3);
-
+
EasyMock.replay(rc, pd, sess1, sess2, sess3);
-
+
conn.close();
-
+
EasyMock.verify(rc, pd, sess1, sess2, sess3);
-
+
try
{
conn.createClientSession(false, false, false, 65655);
@@ -209,7 +211,7 @@
{
assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
}
-
+
try
{
conn.createClientSession(false, false, false, 545, false, false);
@@ -219,7 +221,7 @@
{
assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
}
-
+
try
{
conn.start();
@@ -229,7 +231,7 @@
{
assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
}
-
+
try
{
conn.stop();
@@ -239,13 +241,13 @@
{
assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
}
-
+
try
{
conn.setRemotingSessionListener(new RemotingSessionListener()
{
public void sessionDestroyed(long sessionID, MessagingException me)
- {
+ {
}
});
fail("Should throw exception");
@@ -255,64 +257,64 @@
assertEquals(MessagingException.OBJECT_CLOSED, e.getCode());
}
}
-
+
public void testRemoveSession() throws Exception
{
- RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+
Version version = new VersionImpl("tyfytfytf", 1, 1, 1, 12, "yttyft");
Location location = new LocationImpl(TransportType.TCP, "ftftf");
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
-
+
final int serverTargetID = 23;
-
+
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
+
ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
-
+
//Create some sessions
-
+
ClientSessionInternal sess1 = EasyMock.createStrictMock(ClientSessionInternal.class);
-
+
ClientSessionInternal sess2 = EasyMock.createStrictMock(ClientSessionInternal.class);
-
+
ClientSessionInternal sess3 = EasyMock.createStrictMock(ClientSessionInternal.class);
-
+
conn.addSession(sess1);
conn.addSession(sess2);
conn.addSession(sess3);
-
+
Set<ClientSession> sessions = conn.getSessions();
assertEquals(3, sessions.size());
assertTrue(sessions.contains(sess1));
assertTrue(sessions.contains(sess2));
assertTrue(sessions.contains(sess3));
-
+
conn.removeSession(sess2);
-
+
sessions = conn.getSessions();
assertEquals(2, sessions.size());
- assertTrue(sessions.contains(sess1));
+ assertTrue(sessions.contains(sess1));
assertTrue(sessions.contains(sess3));
-
+
conn.removeSession(sess1);
-
+
sessions = conn.getSessions();
- assertEquals(1, sessions.size());
+ assertEquals(1, sessions.size());
assertTrue(sessions.contains(sess3));
-
+
conn.removeSession(sess3);
-
+
sessions = conn.getSessions();
- assertEquals(0, sessions.size());
+ assertEquals(0, sessions.size());
}
-
+
public void testSessionCleanedUp() throws Exception
{
RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
Location location = new LocationImpl(TransportType.TCP, "oranges");
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
@@ -322,9 +324,9 @@
Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
- ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
+
+
ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(true, true, true);
final int sessionTargetID = 12127162;
@@ -334,7 +336,7 @@
EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
EasyMock.replay(rc, pd);
-
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
ClientSession session = conn.createClientSession(true, true, true, 1);
conn.cleanUp();
assertTrue(session.isClosed());
@@ -346,7 +348,7 @@
public void testSessionsCleanedUp() throws Exception
{
RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
Location location = new LocationImpl(TransportType.TCP, "oranges");
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
@@ -356,9 +358,9 @@
Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
- ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
+
+
ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(true, true, true);
final int sessionTargetID = 12127162;
@@ -368,6 +370,7 @@
EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response).anyTimes();
EasyMock.replay(rc, pd);
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
ClientSession session1 = conn.createClientSession(true, true, true, 1);
ClientSession session2 = conn.createClientSession(true, true, true, 2);
ClientSession session3 = conn.createClientSession(true, true, true, 3);
@@ -379,23 +382,23 @@
EasyMock.verify(rc, pd);
}
-
+
// Private -----------------------------------------------------------------------------------------------------------
private void testCreateSession(final boolean xa, final boolean autoCommitSends, final boolean autoCommitAcks,
- final int ackBatchSize, final boolean blockOnAcknowledge,
- final boolean cacheProducers, final boolean useDefaults) throws Exception
- {
- RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
-
+ final int ackBatchSize, final boolean blockOnAcknowledge,
+ final boolean cacheProducers, final boolean useDefaults) throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
Location location = new LocationImpl(TransportType.TCP, "oranges");
ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
if (useDefaults)
{
- cf.setDefaultBlockOnAcknowledge(blockOnAcknowledge);
+ cf.setDefaultBlockOnAcknowledge(blockOnAcknowledge);
}
else
{
@@ -405,11 +408,11 @@
final int serverTargetID = 17267162;
Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
-
+
PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
-
- ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
+
+
ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(xa, autoCommitSends, autoCommitAcks);
final int sessionTargetID = 12127162;
@@ -419,7 +422,7 @@
EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
EasyMock.replay(rc, pd);
-
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
ClientSession session;
if (useDefaults)
@@ -429,7 +432,7 @@
else
{
session = conn.createClientSession(xa, autoCommitSends, autoCommitAcks, ackBatchSize, blockOnAcknowledge,
- cacheProducers);
+ cacheProducers);
}
assertEquals(ackBatchSize, session.getLazyAckBatchSize());
@@ -440,4 +443,42 @@
EasyMock.verify(rc, pd);
}
+
+
+ public void testResourcesCleanedUp() throws Exception
+ {
+ RemotingConnection rc = EasyMock.createStrictMock(RemotingConnection.class);
+ Location location = new LocationImpl(TransportType.TCP, "oranges");
+ ClientConnectionFactory cf = new ClientConnectionFactoryImpl(location);
+ Version version = new VersionImpl("uqysuyqs", 1, 1, 1, 12, "uqysuays");
+ PacketDispatcher pd = EasyMock.createStrictMock(PacketDispatcher.class);
+ final int serverTargetID = 17267162;
+ rc.addRemotingSessionListener((RemotingSessionListener) EasyMock.anyObject());
+ SetRemotingSessionListenerAnswer answer = new SetRemotingSessionListenerAnswer();
+ EasyMock.expectLastCall().andAnswer(answer);
+
+
+
+ final int sessionTargetID = 12127162;
+ ConnectionCreateSessionResponseMessage response = new ConnectionCreateSessionResponseMessage(sessionTargetID);
+ ConnectionCreateSessionMessage request = new ConnectionCreateSessionMessage(false, true, true);
+ EasyMock.expect(rc.sendBlocking(serverTargetID, serverTargetID, request)).andReturn(response);
+
+ EasyMock.replay(rc, pd);
+ ClientConnectionInternal conn = new ClientConnectionImpl(cf, serverTargetID, rc, version, pd);
+ assertNotNull(answer.listener);
+ ClientSession session = conn.createClientSession(false, true, true, 1);
+ answer.listener.sessionDestroyed(serverTargetID, new MessagingException());
+ assertTrue(session.isClosed());
+ }
+
+ class SetRemotingSessionListenerAnswer implements IAnswer
+ {
+ RemotingSessionListener listener = null;
+ public Object answer() throws Throwable
+ {
+ listener = (RemotingSessionListener) EasyMock.getCurrentArguments()[0];
+ return null;
+ }
+ }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/RemotingConnectionTest.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -156,7 +156,7 @@
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
remotingConnection.start();
- remotingConnection.setRemotingSessionListener(listener);
+ remotingConnection.addRemotingSessionListener(listener);
remotingConnection.stop();
EasyMock.verify(connector);
EasyMock.verify(connectorRegistry);
@@ -729,7 +729,7 @@
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
remotingConnection.start();
- remotingConnection.setRemotingSessionListener(listener);
+ remotingConnection.addRemotingSessionListener(listener);
EasyMock.verify(connector);
EasyMock.verify(connectorRegistry);
}
@@ -760,52 +760,14 @@
RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
remotingConnection.start();
- remotingConnection.setRemotingSessionListener(listener);
- remotingConnection.setRemotingSessionListener(null);
- remotingConnection.setRemotingSessionListener(listener2);
+ remotingConnection.addRemotingSessionListener(listener);
+ remotingConnection.removeRemotingSessionListener(listener);
+ remotingConnection.addRemotingSessionListener(listener2);
EasyMock.verify(connector);
EasyMock.verify(connectorRegistry);
}
- public void testConnectionSetListenerTwiceThrowsException() throws Throwable
- {
- final ConnectorRegistry connectorRegistry = EasyMock.createStrictMock(ConnectorRegistry.class);
- RemotingConnector connector = EasyMock.createStrictMock(RemotingConnector.class);
- ConnectorRegistryFactory.setRegisteryLocator(new ConnectorRegistryLocator()
- {
- public ConnectorRegistry locate()
- {
- return connectorRegistry;
- }
- });
- Location location = EasyMock.createNiceMock(Location.class);
- ConnectionParams connectionParams = EasyMock.createNiceMock(ConnectionParams.class);
- RemotingSession remotingSession = EasyMock.createStrictMock(RemotingSession.class);
- RemotingSessionListener listener = EasyMock.createNiceMock(RemotingSessionListener.class);
- RemotingSessionListener listener2 = EasyMock.createNiceMock(RemotingSessionListener.class);
- EasyMock.expect(connectorRegistry.getConnector(location, connectionParams)).andReturn(connector);
- EasyMock.replay(connectorRegistry);
- EasyMock.expect(connector.connect()).andReturn(remotingSession);
- connector.addSessionListener(listener);
- EasyMock.replay(connector);
-
- RemotingConnectionImpl remotingConnection = new RemotingConnectionImpl(location, connectionParams);
- remotingConnection.start();
- remotingConnection.setRemotingSessionListener(listener);
- try
- {
- remotingConnection.setRemotingSessionListener(listener2);
- fail("should throw exception");
- }
- catch (IllegalStateException e)
- {
- //pass
- }
- EasyMock.verify(connector);
- EasyMock.verify(connectorRegistry);
- }
-
public void testGetDispatcher() throws Throwable
{
final ConnectorRegistry connectorRegistry = EasyMock.createStrictMock(ConnectorRegistry.class);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/network/ClientNetworkFailureTest.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,17 +22,7 @@
package org.jboss.messaging.tests.unit.core.remoting.network;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.jboss.messaging.core.remoting.TransportType.TCP;
-import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_INTERVAL;
-import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_TIMEOUT;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
import junit.framework.TestCase;
-
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientConnectionFactory;
import org.jboss.messaging.core.client.RemotingSessionListener;
@@ -43,11 +33,19 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Acceptor;
import org.jboss.messaging.core.remoting.TransportType;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.impl.mina.MinaAcceptor;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_INTERVAL;
+import static org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport.PING_TIMEOUT;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt>
@@ -137,14 +135,7 @@
//Thread.sleep((PING_INTERVAL + PING_TIMEOUT) * 1000);
assertActiveConnectionsOnTheServer(0);
- try
- {
- conn.close();
- fail("close should fail since client resources must have been cleaned up on the server side");
- }
- catch (Exception e)
- {
- }
+ assertTrue(conn.isClosed());
minaService.removeRemotingSessionListener(listener);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/util/MessagingBufferFactoryTest.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,16 +22,12 @@
package org.jboss.messaging.tests.unit.core.util;
-import java.nio.ByteBuffer;
-
+import junit.framework.TestCase;
import org.jboss.messaging.core.remoting.TransportType;
-import org.jboss.messaging.core.remoting.impl.mina.IoBufferWrapper;
-import org.jboss.messaging.util.ByteBufferWrapper;
import org.jboss.messaging.util.MessagingBuffer;
import org.jboss.messaging.util.MessagingBufferFactory;
+import org.jboss.messaging.util.MessagingBufferFactoryImpl;
-import junit.framework.TestCase;
-
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -40,6 +36,7 @@
*/
public class MessagingBufferFactoryTest extends TestCase
{
+ private MessagingBufferFactory messagingBufferFactory;
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -50,10 +47,22 @@
// Public --------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ messagingBufferFactory = new MessagingBufferFactoryImpl();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ messagingBufferFactory = null;
+ }
+
public void testCreateMessagingBufferForTCP() throws Exception
{
int length = 512;
- MessagingBuffer buffer = MessagingBufferFactory.createMessagingBuffer(TransportType.TCP, length);
+ MessagingBuffer buffer = messagingBufferFactory.createMessagingBuffer(TransportType.TCP, length);
assertNotNull(buffer);
assertEquals(length, buffer.capacity());
}
@@ -62,30 +71,11 @@
public void testCreateMessagingBufferForINVM() throws Exception
{
int length = 512;
- MessagingBuffer buffer = MessagingBufferFactory.createMessagingBuffer(TransportType.INVM, length);
+ MessagingBuffer buffer = messagingBufferFactory.createMessagingBuffer(TransportType.INVM, length);
assertNotNull(buffer);
assertEquals(length, buffer.capacity());
}
- public void testCreateMessagingBufferFromByteBufferWrapper() throws Exception
- {
- int length = 512;
- MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.allocate(length));
-
- MessagingBuffer buff = MessagingBufferFactory.createMessagingBuffer(buffer, length);
- assertNotNull(buff);
- assertTrue(buff instanceof ByteBufferWrapper);
- }
-
- public void testCreateMessagingBufferFromIoBufferWrapper() throws Exception
- {
- int length = 512;
- MessagingBuffer buffer = new IoBufferWrapper(length);
-
- MessagingBuffer buff = MessagingBufferFactory.createMessagingBuffer(buffer, length);
- assertNotNull(buff);
- assertTrue(buff instanceof IoBufferWrapper);
- }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java 2008-06-24 08:01:35 UTC (rev 4566)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/client/JBossConnectionTest.java 2008-06-24 08:50:05 UTC (rev 4567)
@@ -22,41 +22,21 @@
package org.jboss.messaging.tests.unit.jms.client;
-import static org.easymock.EasyMock.createStrictMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.ConnectionMetaData;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSession;
-import javax.jms.XAQueueSession;
-import javax.jms.XASession;
-import javax.jms.XATopicSession;
-
import junit.framework.TestCase;
-
import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
import org.easymock.IArgumentMatcher;
import org.jboss.messaging.core.client.ClientConnection;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.RemotingSessionListener;
-import org.jboss.messaging.core.client.impl.ClientConnectionInternal;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.version.Version;
import org.jboss.messaging.jms.client.JBossConnection;
import org.jboss.messaging.tests.util.RandomUtil;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+import javax.jms.*;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -701,21 +681,6 @@
verify(clientConn, clientSession);
}
- public void testResourcesCleanedUp() throws Exception
- {
- ClientConnectionInternal clientConn = createStrictMock(ClientConnectionInternal.class);
- FailureListenerMatcher failureListenerMatcher = new FailureListenerMatcher();
- EasyMock.reportMatcher(failureListenerMatcher);
- clientConn.setRemotingSessionListener(null);
- clientConn.cleanUp();
- replay(clientConn);
-
- JBossConnection connection = new JBossConnection(clientConn,
- JBossConnection.TYPE_QUEUE_CONNECTION, null, -1);
- failureListenerMatcher.listener.sessionDestroyed(0, new MessagingException());
-
- verify(clientConn);
- }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list