[jboss-cvs] JBoss Messaging SVN: r3364 - in branches/Branch_JBMESSAGING-544: src/main/org/jboss/jms/server/endpoint/advised and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 26 10:29:00 EST 2007
Author: jmesnil
Date: 2007-11-26 10:28:59 -0500 (Mon, 26 Nov 2007)
New Revision: 3364
Added:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionMessageCodec.java
Removed:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionRequestCodec.java
Modified:
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetPreparedTransactionsResponseCodec.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-544: Replace client-server transport with NIO based transport
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -158,8 +158,9 @@
remotingConnection.start();
newClient = remotingConnection.getNewRemotingClient();
+ String sessionID = newClient.getSessionID();
- CreateConnectionRequest request = new CreateConnectionRequest(v, JMSClientVMIdentifier.instance, failedNodeID, username, password);
+ CreateConnectionRequest request = new CreateConnectionRequest(v, sessionID, JMSClientVMIdentifier.instance, failedNodeID, username, password);
CreateConnectionResponse response = (CreateConnectionResponse) sendBlocking(request);
ClientConnectionDelegate connectionDelegate = new ClientConnectionDelegate(response.getConnectionID(), response.getServerID());
res = new CreateConnectionResult(connectionDelegate);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -174,7 +174,7 @@
{
CreateConnectionRequest request = (CreateConnectionRequest) packet;
CreateConnectionResult del =
- createConnectionDelegate(request.getUsername(), request.getPassword(), request.getFailedNodeID(), null, request.getClientVMID(), request.getVersion(), null);
+ createConnectionDelegate(request.getUsername(), request.getPassword(), request.getFailedNodeID(), request.getRemotingSessionID(), request.getClientVMID(), request.getVersion(), null);
CreateConnectionResponse response = new CreateConnectionResponse(del.getDelegate().getID(), del.getDelegate().getServerID());
response.normalize(request);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/Client.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -7,9 +7,6 @@
package org.jboss.messaging.core.remoting;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.jboss.messaging.core.remoting.Constants.CONNECTION_TIMEOUT;
-import static org.jboss.messaging.core.remoting.TransportType.HTTP;
-import static org.jboss.messaging.core.remoting.TransportType.TCP;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -79,37 +76,19 @@
connector = new NioSocketConnector();
- connector.setConnectTimeout(CONNECTION_TIMEOUT);
-
- if (useSSL)
- addSSLSupport(connector.getFilterChain());
-
MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
connector.getFilterChain().addLast("mdc", mdcInjectionFilter);
- if (transport == TCP)
- {
- connector.getFilterChain().addLast("codec",
- new ProtocolCodecFilter(new PacketCodecFactory()));
- } else
- {
- assert transport == HTTP;
+ connector.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
- // TODO support HTTP
- // URL url = new URL("http://localhost:" + port + "/");
- // connector.getFilterChain().addLast("http_codec",
- // new ProtocolCodecFilter(new HttpProtocolCodecFactory(url)));
- // connector.getFilterChain().addLast("http_logger", new
- // LoggingFilter());
- // connector.getFilterChain().addLast("http_filter",
- // new HTTPFilter(false));
- }
-
addBlockingRequestResponseFilter(connector.getFilterChain());
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.setHandler(new ClientHandler());
+ connector.getSessionConfig().setKeepAlive(true);
+ connector.getSessionConfig().setReuseAddress(true);
InetSocketAddress address = new InetSocketAddress(host, port);
ConnectFuture future = connector.connect(address);
future.awaitUninterruptibly();
@@ -132,13 +111,6 @@
assert connector != null;
assert blockingScheduler != null;
- if (sslFilter != null)
- {
- sslFilter.stopSsl(session).awaitUninterruptibly();
- // FIXME: w/o the delay, an exception is thrown:
- // "Inbound closed before receiving peer's close_notify"
- Thread.sleep(500);
- }
CloseFuture closeFuture = session.close().awaitUninterruptibly();
boolean closed = closeFuture.isClosed();
@@ -148,6 +120,15 @@
return closed;
}
+ public String getSessionID()
+ {
+ if (session == null)
+ {
+ return null;
+ }
+ return Long.toString(session.getId());
+ }
+
public void sendOneWay(AbstractPacket packet)
{
assert packet != null;
@@ -175,7 +156,6 @@
packet.setCorrelationID(System.nanoTime());
- // TODO request timeout must be configurable
Request req = new Request(packet.getCorrelationID(), packet,
blockingRequestTimeout, blockingRequestTimeUnit);
session.write(req);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/ConnectionFactoryCreateConnectionRequestCodec.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -41,12 +41,14 @@
throws Exception
{
byte version = request.getVersion();
+ String remotingSessionID = request.getRemotingSessionID();
String clientVMID = request.getClientVMID();
int failedNodeID = request.getFailedNodeID();
String username = request.getUsername();
String password = request.getPassword();
int bodyLength = 1 // version
+ + sizeof(remotingSessionID)
+ sizeof(clientVMID)
+ INT_LENGTH // failedNodeID
+ sizeof(username)
@@ -54,6 +56,7 @@
out.putInt(bodyLength);
out.put(version);
+ putString(out, remotingSessionID);
putString(out, clientVMID);
out.putInt(failedNodeID);
putString(out, username);
@@ -70,13 +73,14 @@
return null;
}
byte version = in.get();
+ String remotingSessionID = getString(in);
String clientVMID = getString(in);
int failedNodeID = in.getInt();
String username = getString(in);
String password = getString(in);
- return new CreateConnectionRequest(version, clientVMID,
- failedNodeID, username, password);
+ return new CreateConnectionRequest(version, remotingSessionID,
+ clientVMID, failedNodeID, username, password);
}
// Package protected ---------------------------------------------
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetPreparedTransactionsResponseCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetPreparedTransactionsResponseCodec.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/GetPreparedTransactionsResponseCodec.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -48,7 +48,7 @@
MessagingXid[] xids = response.getXids();
- byte[] encodedXids = convert(xids);
+ byte[] encodedXids = encode(xids);
int bodyLength = INT_LENGTH + INT_LENGTH + encodedXids.length;
@@ -72,7 +72,7 @@
int encodedXidsLength = in.getInt();
byte[] b = new byte[encodedXidsLength];
in.get(b);
- MessagingXid[] xids = convert(numOfXids, b);
+ MessagingXid[] xids = decode(numOfXids, b);
return new GetPreparedTransactionsResponse(xids);
}
@@ -83,7 +83,7 @@
// Private -------------------------------------------------------
- private static byte[] convert(MessagingXid[] xids) throws Exception
+ private static byte[] encode(MessagingXid[] xids) throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@@ -95,7 +95,7 @@
return baos.toByteArray();
}
- private MessagingXid[] convert(int numOfXids, byte[] encodedXids) throws Exception
+ private MessagingXid[] decode(int numOfXids, byte[] encodedXids) throws Exception
{
MessagingXid[] xids = new MessagingXid[numOfXids];
ByteArrayInputStream bais = new ByteArrayInputStream(encodedXids);
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/PacketCodecFactory.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -172,7 +172,7 @@
addCodecForEmptyPacket(MSG_CLOSE, CloseMessage.class);
- addCodec(SendTransactionMessage.class, SendTransactionRequestCodec.class);
+ addCodec(SendTransactionMessage.class, SendTransactionMessageCodec.class);
addCodecForEmptyPacket(REQ_GETPREPAREDTRANSACTIONS,
GetPreparedTransactionsRequest.class);
Copied: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionMessageCodec.java (from rev 3361, branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionRequestCodec.java)
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionMessageCodec.java (rev 0)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionMessageCodec.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoSession;
+import org.jboss.jms.tx.TransactionRequest;
+import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class SendTransactionMessageCodec extends AbstractPacketCodec<SendTransactionMessage>
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SendTransactionMessageCodec()
+ {
+ super(MSG_SENDTRANSACTION);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(IoSession session, SendTransactionMessage request,
+ IoBuffer out) throws Exception
+ {
+ byte[] encodedTxReq = encodeTransactionRequest(request.getTransactionRequest());
+ boolean checkForDuplicates = request.checkForDuplicates();
+
+ int bodyLength = INT_LENGTH + encodedTxReq.length + 1;
+
+ out.putInt(bodyLength);
+ out.putInt(encodedTxReq.length);
+ out.put(encodedTxReq);
+ putBoolean(out, checkForDuplicates);
+ }
+
+ @Override
+ protected SendTransactionMessage decodeBody(IoSession session, IoBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (in.remaining() < bodyLength)
+ {
+ return null;
+ }
+
+ int txReqLength = in.getInt();
+ byte[] encodedTxReq = new byte[txReqLength];
+ in.get(encodedTxReq);
+ TransactionRequest tr = decodeTransactionRequest(encodedTxReq);
+ boolean checkForDuplicates = getBoolean(in);
+
+ return new SendTransactionMessage(tr, checkForDuplicates);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private ----------------------------------------------------
+
+ private static byte[] encodeTransactionRequest(TransactionRequest tr) throws Exception
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ tr.write(new DataOutputStream(baos));
+ baos.flush();
+ return baos.toByteArray();
+ }
+
+ private static TransactionRequest decodeTransactionRequest(byte[] b) throws Exception
+ {
+ TransactionRequest tr = new TransactionRequest();
+ ByteArrayInputStream bais = new ByteArrayInputStream(b);
+ tr.read(new DataInputStream(bais));
+ return tr;
+ }
+
+ // Inner classes -------------------------------------------------
+}
Deleted: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionRequestCodec.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionRequestCodec.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/codec/SendTransactionRequestCodec.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -1,100 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.messaging.core.remoting.codec;
-
-import static org.jboss.messaging.core.remoting.wireformat.PacketType.MSG_SENDTRANSACTION;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
-import org.jboss.jms.tx.TransactionRequest;
-import org.jboss.messaging.core.remoting.wireformat.SendTransactionMessage;
-
-/**
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
- */
-public class SendTransactionRequestCodec extends AbstractPacketCodec<SendTransactionMessage>
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SendTransactionRequestCodec()
- {
- super(MSG_SENDTRANSACTION);
- }
-
- // Public --------------------------------------------------------
-
- // AbstractPacketCodec overrides ---------------------------------
-
- @Override
- protected void encodeBody(IoSession session, SendTransactionMessage request,
- IoBuffer out) throws Exception
- {
- byte[] encodedTxReq = encodeTransactionRequest(request.getTransactionRequest());
- boolean checkForDuplicates = request.checkForDuplicates();
-
- int bodyLength = INT_LENGTH + encodedTxReq.length + 1;
-
- out.putInt(bodyLength);
- out.putInt(encodedTxReq.length);
- out.put(encodedTxReq);
- putBoolean(out, checkForDuplicates);
- }
-
- @Override
- protected SendTransactionMessage decodeBody(IoSession session, IoBuffer in)
- throws Exception
- {
- int bodyLength = in.getInt();
- if (in.remaining() < bodyLength)
- {
- return null;
- }
-
- int txReqLength = in.getInt();
- byte[] encodedTxReq = new byte[txReqLength];
- in.get(encodedTxReq);
- TransactionRequest tr = decodeTransactionRequest(encodedTxReq);
- boolean checkForDuplicates = getBoolean(in);
-
- return new SendTransactionMessage(tr, checkForDuplicates);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private ----------------------------------------------------
-
- private static byte[] encodeTransactionRequest(TransactionRequest tr) throws Exception
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- tr.write(new DataOutputStream(baos));
- baos.flush();
- return baos.toByteArray();
- }
-
- private static TransactionRequest decodeTransactionRequest(byte[] b) throws Exception
- {
- TransactionRequest tr = new TransactionRequest();
- ByteArrayInputStream bais = new ByteArrayInputStream(b);
- tr.read(new DataInputStream(bais));
- return tr;
- }
-
- // Inner classes -------------------------------------------------
-}
Modified: branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/src/main/org/jboss/messaging/core/remoting/wireformat/CreateConnectionRequest.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -22,6 +22,7 @@
// Attributes ----------------------------------------------------
private final byte version;
+ private final String remotingSessionID;
private final String clientVMID;
private final String username;
private final String password;
@@ -32,12 +33,15 @@
// Constructors --------------------------------------------------
public CreateConnectionRequest(byte version,
- String clientVMID, int failedNodeID, String username, String password)
+ String remotingSessionID, String clientVMID, int failedNodeID, String username, String password)
{
super(REQ_CREATECONNECTION);
+ assertValidID(remotingSessionID);
assertValidID(clientVMID);
+
this.version = version;
+ this.remotingSessionID = remotingSessionID;
this.clientVMID = clientVMID;
this.failedNodeID = failedNodeID;
this.username = username;
@@ -51,6 +55,11 @@
return version;
}
+ public String getRemotingSessionID()
+ {
+ return remotingSessionID;
+ }
+
public String getClientVMID()
{
return clientVMID;
@@ -76,6 +85,7 @@
{
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", version=" + version);
+ buf.append(", remotingSessionID=" + remotingSessionID);
buf.append(", clientVMID=" + clientVMID);
buf.append(", failedNodeID=" + failedNodeID);
buf.append(", username=" + username);
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/core/remoting/wireformat/PacketTypeTest.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -237,18 +237,19 @@
assertEquals(expected.getUniqueName(), actual.getUniqueName());
assertEquals(expected.getFailoverMap(), actual.getFailoverMap());
- ClientConnectionFactoryDelegate[] expectedDelegates = expected.getDelegates();
+ ClientConnectionFactoryDelegate[] expectedDelegates = expected
+ .getDelegates();
ClientConnectionFactoryDelegate[] actualDelegates = actual.getDelegates();
-
+
assertEquals(expectedDelegates.length, actualDelegates.length);
-
+
for (int i = 0; i < expectedDelegates.length; i++)
{
ClientConnectionFactoryDelegate expectedDelegate = expectedDelegates[i];
ClientConnectionFactoryDelegate actualDelegate = actualDelegates[i];
-
+
assertEquals(expectedDelegate.getID(), actualDelegate.getID());
- assertEquals(expectedDelegate.getName(), actualDelegate.getName());
+ assertEquals(expectedDelegate.getName(), actualDelegate.getName());
}
}
@@ -306,13 +307,14 @@
public void testCreateConnectionRequest() throws Exception
{
byte version = (byte) 4;
+ String remotingSessionID = randomString();
String clientVMID = randomString();
int failedNodeID = 0;
String username = null;
String password = null;
CreateConnectionRequest request = new CreateConnectionRequest(version,
- clientVMID, failedNodeID, username, password);
+ remotingSessionID, clientVMID, failedNodeID, username, password);
AbstractPacket decodedPacket = encodeAndDecode(request);
@@ -321,6 +323,7 @@
assertEquals(REQ_CREATECONNECTION, decodedPacket.getType());
assertEquals(request.getVersion(), decodedRequest.getVersion());
+ assertEquals(request.getRemotingSessionID(), decodedRequest.getRemotingSessionID());
assertEquals(request.getClientVMID(), decodedRequest.getClientVMID());
assertEquals(request.getFailedNodeID(), decodedRequest.getFailedNodeID());
assertEquals(request.getUsername(), decodedRequest.getUsername());
@@ -703,8 +706,10 @@
public void testSendTransactionMessage() throws Exception
{
ClientTransaction tx = new ClientTransaction();
+ MessagingXid xid = new MessagingXid(randomString().getBytes(), 23,
+ randomString().getBytes());
TransactionRequest tr = new TransactionRequest(
- TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
+ TransactionRequest.ONE_PHASE_COMMIT_REQUEST, xid, tx);
SendTransactionMessage message = new SendTransactionMessage(tr, true);
@@ -713,10 +718,14 @@
assertTrue(decodedPacket instanceof SendTransactionMessage);
SendTransactionMessage decodedMessage = (SendTransactionMessage) decodedPacket;
assertEquals(MSG_SENDTRANSACTION, decodedMessage.getType());
- assertEquals(message.getTransactionRequest().getRequestType(),
- decodedMessage.getTransactionRequest().getRequestType());
assertEquals(message.checkForDuplicates(), decodedMessage
.checkForDuplicates());
+
+ TransactionRequest expectedTxReq = message.getTransactionRequest();
+ TransactionRequest actualTxReq = decodedMessage.getTransactionRequest();
+
+ assertEquals(expectedTxReq.getRequestType(), actualTxReq.getRequestType());
+ assertEquals(expectedTxReq.getXid(), actualTxReq.getXid());
}
public void testGetPreparedTransactionsRequest() throws Exception
@@ -1026,6 +1035,9 @@
ProtocolEncoder encoder = new PacketCodecFactory().getEncoder(session);
encoder.encode(session, packet, session.getEncoderOutput());
IoBuffer buffer = session.getEncoderOutputQueue().poll();
+
+ session.close();
+
return buffer;
}
@@ -1042,6 +1054,8 @@
assertNotNull(o);
assertTrue(o instanceof AbstractPacket);
+ session.close();
+
return (AbstractPacket) o;
}
Modified: branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-11-26 14:03:22 UTC (rev 3363)
+++ branches/Branch_JBMESSAGING-544/tests/src/org/jboss/test/messaging/tools/container/ServiceContainer.java 2007-11-26 15:28:59 UTC (rev 3364)
@@ -1424,35 +1424,45 @@
private void startMINAServer() throws Exception
{
- acceptor = new NioSocketAcceptor();
+ if (acceptor == null)
+ {
+ acceptor = new NioSocketAcceptor();
- // Prepare the configuration
- MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
- acceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
- acceptor.getFilterChain().addLast("codec",
- new ProtocolCodecFilter(new PacketCodecFactory()));
- acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+ // Prepare the configuration
+ MdcInjectionFilter mdcInjectionFilter = new MdcInjectionFilter();
+ acceptor.getFilterChain().addLast("mdc", mdcInjectionFilter);
+ acceptor.getFilterChain().addLast("codec",
+ new ProtocolCodecFilter(new PacketCodecFactory()));
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
- // Bind
- acceptor.setLocalAddress(new InetSocketAddress(PORT));
- acceptor.setHandler(new ServerHandler());
- acceptor.bind();
+ // Bind
+ acceptor.setLocalAddress(new InetSocketAddress(PORT));
+ acceptor.setReuseAddress(true);
+ acceptor.getSessionConfig().setReuseAddress(true);
+ acceptor.getSessionConfig().setKeepAlive(true);
+ acceptor.setDisconnectOnUnbind(false);
- info("Listening on port " + PORT);
+ acceptor.setHandler(new ServerHandler());
+ acceptor.bind();
+
+ info("Started MINA on port " + PORT);
+ }
}
-
+
private void stopMINAServer()
{
if (acceptor != null)
{
- acceptor.setDisconnectOnUnbind(false);
acceptor.unbind();
- }
+ acceptor.dispose();
+
+ info("Stopped MINA ");
+ }
}
private void info(String s)
{
- log.info(new Formatter().format("##MINA## %-50s ###\n", s).toString());
+ log.info(new Formatter().format("### %-30s ###\n", s).toString());
}
More information about the jboss-cvs-commits
mailing list