Author: jmesnil
Date: 2010-02-16 04:50:10 -0500 (Tue, 16 Feb 2010)
New Revision: 8878
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* fixed deadlock
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-02-15 20:45:11 UTC
(rev 8877)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-02-16 09:50:10 UTC
(rev 8878)
@@ -17,6 +17,7 @@
*/
package org.hornetq.core.protocol.stomp;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -68,17 +69,20 @@
{
if (size == -1)
{
- throw new IllegalStateException("Frame has not been encoded yet");
+ StompMarshaller marshaller = new StompMarshaller();
+ try
+ {
+ size = marshaller.marshal(this).length;
+ }
+ catch (IOException e)
+ {
+ return -1;
+ }
}
return size ;
}
- public void setEncodedSize(int size)
- {
- this.size = size;
- }
-
@Override
public String toString()
{
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-02-15
20:45:11 UTC (rev 8877)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-02-16
09:50:10 UTC (rev 8878)
@@ -62,9 +62,7 @@
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
- byte[] bytes = baos.toByteArray();
- command.setEncodedSize(bytes.length);
- return bytes;
+ return baos.toByteArray();
}
public void marshal(StompFrame stomp, DataOutput os) throws IOException
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-15
20:45:11 UTC (rev 8877)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-16
09:50:10 UTC (rev 8878)
@@ -34,7 +34,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -154,7 +153,6 @@
try
{
request = marshaller.unmarshal(buffer);
- System.out.println("received " + request);
if (log.isTraceEnabled())
{
log.trace("received " + request);
@@ -165,27 +163,27 @@
StompFrame response = null;
if (Stomp.Commands.CONNECT.equals(command))
{
- response = onConnect(request, server, conn);
+ response = onConnect(request, conn);
}
else if (Stomp.Commands.DISCONNECT.equals(command))
{
- response = onDisconnect(request, server, conn);
+ response = onDisconnect(request, conn);
}
else if (Stomp.Commands.SEND.equals(command))
{
- response = onSend(request, server, conn);
+ response = onSend(request, conn);
}
else if (Stomp.Commands.SUBSCRIBE.equals(command))
{
- response = onSubscribe(request, server, conn);
+ response = onSubscribe(request, conn);
}
else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
{
- response = onUnsubscribe(request, server, conn);
+ response = onUnsubscribe(request, conn);
}
else if (Stomp.Commands.ACK.equals(command))
{
- response = onAck(request, server, conn);
+ response = onAck(request, conn);
}
else if (Stomp.Commands.BEGIN.equals(command))
{
@@ -193,11 +191,11 @@
}
else if (Stomp.Commands.COMMIT.equals(command))
{
- response = onCommit(request, server, conn);
+ response = onCommit(request, conn);
}
else if (Stomp.Commands.ABORT.equals(command))
{
- response = onAbort(request, server, conn);
+ response = onAbort(request, conn);
}
else
{
@@ -248,7 +246,7 @@
// Private -------------------------------------------------------
- private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
+ private StompFrame onSubscribe(StompFrame frame, StompConnection connection) throws
Exception
{
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
@@ -296,7 +294,6 @@
throw new StompException("There already is a subscription for: " +
subscriptionID +
". Either use unique subscription IDs or do not
create multiple subscriptions for the same destination");
}
- server.getStorageManager().setContext(stompSession.getContext());
long consumerID = server.getStorageManager().generateUniqueID();
String clientID = (connection.getClientID() != null) ? connection.getClientID() :
null;
stompSession.addSubscription(consumerID, subscriptionID, clientID,
durableSubscriptionName, destination, selector, ack);
@@ -304,7 +301,7 @@
return null;
}
- private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server,
StompConnection connection) throws Exception
+ private StompFrame onUnsubscribe(StompFrame frame, StompConnection connection) throws
Exception
{
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
@@ -325,7 +322,6 @@
}
StompSession stompSession = getSession(connection);
- server.getStorageManager().setContext(stompSession.getContext());
boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
if (!unsubscribed)
{
@@ -334,7 +330,7 @@
return null;
}
- private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
+ private StompFrame onAck(StompFrame frame, StompConnection connection) throws
Exception
{
Map<String, Object> headers = frame.getHeaders();
String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
@@ -345,7 +341,6 @@
log.warn("Transactional acknowledgement is not supported");
}
stompSession = getSession(connection);
- server.getStorageManager().setContext(stompSession.getContext());
stompSession.acknowledge(messageID);
return null;
@@ -369,7 +364,7 @@
return null;
}
- private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
+ private StompFrame onCommit(StompFrame frame, StompConnection connection) throws
Exception
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
@@ -378,18 +373,18 @@
throw new StompException("transaction header is mandatory to COMMIT a
transaction");
}
- StompSession session = transactedSessions.remove(txID);
+ StompSession session = getTransactedSession(connection, txID);
if (session == null)
{
throw new StompException("No transaction started: " + txID);
}
-
+ transactedSessions.remove(txID);
session.getSession().commit();
return null;
}
- private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
+ private StompFrame onAbort(StompFrame frame, StompConnection connection) throws
Exception
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
@@ -398,12 +393,13 @@
throw new StompException("transaction header is mandatory to ABORT a
transaction");
}
- StompSession session = transactedSessions.remove(txID);
-
+ StompSession session = getTransactedSession(connection, txID);
+
if (session == null)
{
throw new StompException("No transaction started: " + txID);
}
+ transactedSessions.remove(txID);
session.getSession().rollback(false);
return null;
@@ -437,6 +433,7 @@
stompSession.setServerSession(session);
sessions.put(connection, stompSession);
}
+ server.getStorageManager().setContext(stompSession.getContext());
return stompSession;
}
@@ -460,45 +457,17 @@
stompSession.setServerSession(session);
transactedSessions.put(txID, stompSession);
}
+ server.getStorageManager().setContext(stompSession.getContext());
return stompSession;
}
- private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
StompConnection connection) throws Exception
+ private StompFrame onDisconnect(StompFrame frame, StompConnection connection) throws
Exception
{
- connection.setValid(false);
-
- StompSession session = sessions.remove(connection);
- if (session != null)
- {
- try
- {
- session.getSession().rollback(true);
- session.getSession().close();
- }
- catch (Exception e)
- {
- throw new StompException(e.getMessage());
- }
- }
-
- // removed the transacted session belonging to the connection
- Iterator<Entry<String, StompSession>> iterator =
transactedSessions.entrySet().iterator();
- while (iterator.hasNext())
- {
- Map.Entry<String, StompSession> entry = (Map.Entry<String,
StompSession>)iterator.next();
- if (entry.getValue().getConnection() == connection)
- {
- ServerSession serverSession = entry.getValue().getSession();
- serverSession.rollback(true);
- serverSession.close();
- iterator.remove();
- }
- }
-
+ cleanup(connection);
return null;
}
- private StompFrame onSend(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
+ private StompFrame onSend(StompFrame frame, StompConnection connection) throws
Exception
{
checkConnected(connection);
Map<String, Object> headers = frame.getHeaders();
@@ -533,9 +502,8 @@
}
else
{
- stompSession = transactedSessions.get(txID);
+ stompSession = getTransactedSession(connection, txID);
}
- server.getStorageManager().setContext(stompSession.getContext());
if (stompSession.isNoLocal())
{
message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
@@ -544,7 +512,7 @@
return null;
}
- private StompFrame onConnect(StompFrame frame, HornetQServer server, final
StompConnection connection) throws Exception
+ private StompFrame onConnect(StompFrame frame, final StompConnection connection)
throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -637,7 +605,6 @@
private void doSend(final StompConnection connection, final StompFrame frame)
{
- System.out.println("sent " + frame);
if (log.isTraceEnabled())
{
log.trace("sent " + frame);
@@ -647,7 +614,9 @@
if (connection.isDestroyed() || !connection.isValid())
{
log.warn("Connection closed " + connection);
+ return;
}
+
try
{
byte[] bytes = marshaller.marshal(frame);
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-15 20:45:11
UTC (rev 8877)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-16 09:50:10
UTC (rev 8878)
@@ -665,11 +665,11 @@
Assert.assertTrue(message.getJMSRedelivered());
}
- public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
}
- public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
}
@@ -1174,7 +1174,8 @@
"\n\n" +
Stomp.NULL;
sendFrame(disconnectFrame);
-
+ stompSocket.close();
+
// send the message when the durable subscriber is disconnected
sendMessage(getName(), topic);