Author: jmesnil
Date: 2010-01-21 09:46:21 -0500 (Thu, 21 Jan 2010)
New Revision: 8827
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* sync with the trunk: svn merge -r 8806:8821
https://svn.jboss.org/repos/hornetq/trunk
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java
(from rev 8819, trunk/src/main/org/hornetq/core/server/BindingQueryResult.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/BindingQueryResult.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ *
+ * A BindingQueryResult
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class BindingQueryResult
+{
+ private boolean exists;
+
+ private List<SimpleString> queueNames;
+
+ public BindingQueryResult(final boolean exists, final List<SimpleString>
queueNames)
+ {
+ this.exists = exists;
+
+ this.queueNames = queueNames;
+ }
+
+ public boolean isExists()
+ {
+ return exists;
+ }
+
+ public List<SimpleString> getQueueNames()
+ {
+ return queueNames;
+ }
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -23,10 +23,8 @@
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.security.Role;
@@ -70,24 +68,19 @@
void unregisterActivateCallback(ActivateCallback callback);
- ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String
name, int lastConfirmedCommandID) throws Exception;
-
/** The journal at the backup server has to be equivalent as the journal used on the
live node.
* Or else the backup node is out of sync. */
ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
- CreateSessionResponseMessage createSession(String name,
- long channelID,
- String username,
- String password,
- int minLargeMessageSize,
- int incrementingVersion,
- RemotingConnection remotingConnection,
- boolean autoCommitSends,
- boolean autoCommitAcks,
- boolean preAcknowledge,
- boolean xa,
- int confirmationWindowSize) throws
Exception;
+ ServerSession createSession(String name,
+ String username,
+ String password,
+ int minLargeMessageSize,
+ CoreRemotingConnection remotingConnection,
+ boolean autoCommitSends,
+ boolean autoCommitAcks,
+ boolean preAcknowledge,
+ boolean xa) throws Exception;
void removeSession(String name) throws Exception;
@@ -136,4 +129,6 @@
void setGroupingHandler(GroupingHandler groupingHandler);
GroupingHandler getGroupingHandler();
+
+ boolean checkActivate() throws Exception;
}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java
(from rev 8819, trunk/src/main/org/hornetq/core/server/QueueQueryResult.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/QueueQueryResult.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ *
+ * A QueueQueryResult
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class QueueQueryResult
+{
+ private SimpleString name;
+
+ private boolean exists;
+
+ private boolean durable;
+
+ private int consumerCount;
+
+ private int messageCount;
+
+ private SimpleString filterString;
+
+ private SimpleString address;
+
+ private boolean temporary;
+
+ public QueueQueryResult(final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final int messageCount)
+ {
+ this(name, address, durable, temporary, filterString, consumerCount, messageCount,
true);
+ }
+
+ public QueueQueryResult()
+ {
+ this(null, null, false, false, null, 0, 0, false);
+ }
+
+ private QueueQueryResult(final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final int messageCount,
+ final boolean exists)
+ {
+ this.durable = durable;
+
+ this.temporary = temporary;
+
+ this.consumerCount = consumerCount;
+
+ this.messageCount = messageCount;
+
+ this.filterString = filterString;
+
+ this.address = address;
+
+ this.name = name;
+
+ this.exists = exists;
+ }
+
+ public boolean isExists()
+ {
+ return exists;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public int getConsumerCount()
+ {
+ return consumerCount;
+ }
+
+ public int getMessageCount()
+ {
+ return messageCount;
+ }
+
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public boolean isTemporary()
+ {
+ return temporary;
+ }
+
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -13,35 +13,12 @@
package org.hornetq.core.server;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.hornetq.core.server.impl.ServerSessionPacketHandler;
+import java.util.List;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.SimpleString;
+
/**
*
* A ServerSession
@@ -54,8 +31,6 @@
{
String getName();
- long getID();
-
String getUsername();
String getPassword();
@@ -66,75 +41,75 @@
void removeConsumer(ServerConsumer consumer) throws Exception;
- void close() throws Exception;
+ void acknowledge(long consumerID, long messageID) throws Exception;
- void handleAcknowledge(final SessionAcknowledgeMessage packet);
+ void expire(long consumerID, long messageID) throws Exception;
- void handleExpired(final SessionExpiredMessage packet);
+ void rollback(boolean considerLastMessageAsDelivered) throws Exception;
- void handleRollback(RollbackMessage packet);
+ void commit() throws Exception;
- void handleCommit(Packet packet);
+ void xaCommit(Xid xid, boolean onePhase) throws Exception;
- void handleXACommit(SessionXACommitMessage packet);
+ void xaEnd(Xid xid) throws Exception;
- void handleXAEnd(SessionXAEndMessage packet);
+ void xaForget(Xid xid) throws Exception;
- void handleXAForget(SessionXAForgetMessage packet);
+ void xaJoin(Xid xid) throws Exception;
- void handleXAJoin(SessionXAJoinMessage packet);
+ void xaPrepare(Xid xid) throws Exception;
- void handleXAPrepare(SessionXAPrepareMessage packet);
+ void xaResume(Xid xid) throws Exception;
- void handleXAResume(SessionXAResumeMessage packet);
+ void xaRollback(Xid xid) throws Exception;
- void handleXARollback(SessionXARollbackMessage packet);
+ void xaStart(Xid xid) throws Exception;
- void handleXAStart(SessionXAStartMessage packet);
+ void xaSuspend() throws Exception;
- void handleXASuspend(Packet packet);
+ List<Xid> xaGetInDoubtXids();
- void handleGetInDoubtXids(Packet packet);
+ int xaGetTimeout();
- void handleGetXATimeout(Packet packet);
+ void xaSetTimeout(int timeout);
- void handleSetXATimeout(SessionXASetTimeoutMessage packet);
+ void start();
- void handleStart(Packet packet);
+ void stop();
- void handleStop(Packet packet);
+ void createQueue(SimpleString address,
+ SimpleString name,
+ SimpleString filterString,
+ boolean temporary,
+ boolean durable) throws Exception;
- void handleCreateQueue(CreateQueueMessage packet);
+ void deleteQueue(SimpleString name) throws Exception;
- void handleDeleteQueue(SessionDeleteQueueMessage packet);
+ void createConsumer(long consumerID, SimpleString name, SimpleString filterString,
boolean browseOnly) throws Exception;
- void handleCreateConsumer(SessionCreateConsumerMessage packet);
+ QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
- void handleExecuteQueueQuery(SessionQueueQueryMessage packet);
+ BindingQueryResult executeBindingQuery(SimpleString address);
- void handleExecuteBindingQuery(SessionBindingQueryMessage packet);
+ void closeConsumer(long consumerID) throws Exception;
- void handleCloseConsumer(SessionConsumerCloseMessage packet);
+ void receiveConsumerCredits(long consumerID, int credits) throws Exception;
- void handleReceiveConsumerCredits(SessionConsumerFlowCreditMessage packet);
+ void sendContinuations(int packetSize, byte[] body, boolean continues) throws
Exception;
- void handleSendContinuations(SessionSendContinuationMessage packet);
+ void send(ServerMessage message) throws Exception;
- void handleSend(SessionSendMessage packet);
+ void sendLarge(byte[] largeMessageHeader) throws Exception;
- void handleSendLargeMessage(SessionSendLargeMessage packet);
+ void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
- void handleForceConsumerDelivery(SessionForceConsumerDelivery message);
+ void requestProducerCredits(SimpleString address, int credits) throws Exception;
- void handleRequestProducerCredits(SessionRequestProducerCreditsMessage message) throws
Exception;
+ void close() throws Exception;
- void handleClose(Packet packet);
-
- int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
-
- Channel getChannel();
-
- ServerSessionPacketHandler getHandler();
-
- void setHandler(ServerSessionPacketHandler handler);
+ void setTransferring(boolean transferring);
+
+ void runConnectionFailureRunners();
+
+ void setCallback(SessionCallback callback);
}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java
(from rev 8819, trunk/src/main/org/hornetq/core/server/SessionCallback.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/SessionCallback.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * A SessionCallback
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface SessionCallback
+{
+ void sendProducerCreditsMessage(int credits, SimpleString address, int offset);
+
+ int sendMessage(ServerMessage message, long consumerID, int deliveryCount);
+
+ int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount);
+
+ int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues,
boolean requiresResponse);
+
+ void closed();
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -1,234 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
-
-/**
- * A packet handler for all packets that need to be handled at the server level
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
- */
-public class HornetQPacketHandler implements ChannelHandler
-{
- private static final Logger log = Logger.getLogger(HornetQPacketHandler.class);
-
- private final HornetQServer server;
-
- private final Channel channel1;
-
- private final RemotingConnection connection;
-
- public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final
RemotingConnection connection)
- {
- this.server = server;
-
- this.channel1 = channel1;
-
- this.connection = connection;
- }
-
- public void handlePacket(final Packet packet)
- {
- byte type = packet.getType();
-
- // All these operations need to be idempotent since they are outside of the
session
- // reliability replay functionality
- switch (type)
- {
- case CREATESESSION:
- {
- CreateSessionMessage request = (CreateSessionMessage)packet;
-
- handleCreateSession(request);
-
- break;
- }
- case REATTACH_SESSION:
- {
- ReattachSessionMessage request = (ReattachSessionMessage)packet;
-
- handleReattachSession(request);
-
- break;
- }
- case CREATE_QUEUE:
- {
- // Create queue can also be fielded here in the case of a replicated store
and forward queue creation
-
- CreateQueueMessage request = (CreateQueueMessage)packet;
-
- handleCreateQueue(request);
-
- break;
- }
- case CREATE_REPLICATION:
- {
- // Create queue can also be fielded here in the case of a replicated store
and forward queue creation
-
- CreateReplicationSessionMessage request =
(CreateReplicationSessionMessage)packet;
-
- handleCreateReplication(request);
-
- break;
- }
- default:
- {
- HornetQPacketHandler.log.error("Invalid packet " + packet);
- }
- }
- }
-
- private void handleCreateSession(final CreateSessionMessage request)
- {
- boolean incompatibleVersion = false;
- Packet response;
- try
- {
- response = server.createSession(request.getName(),
- request.getSessionChannelID(),
- request.getUsername(),
- request.getPassword(),
- request.getMinLargeMessageSize(),
- request.getVersion(),
- connection,
- request.isAutoCommitSends(),
- request.isAutoCommitAcks(),
- request.isPreAcknowledge(),
- request.isXA(),
- request.getWindowSize());
- }
- catch (Exception e)
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
-
- if (((HornetQException)e).getCode() ==
HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS)
- {
- incompatibleVersion = true;
- }
- }
- else
- {
- HornetQPacketHandler.log.error("Failed to create session", e);
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- // send the exception to the client and destroy
- // the connection if the client and server versions
- // are not compatible
- if (incompatibleVersion)
- {
- channel1.sendAndFlush(response);
- }
- else
- {
- channel1.send(response);
- }
- }
-
- private void handleReattachSession(final ReattachSessionMessage request)
- {
- Packet response;
-
- try
- {
- response = server.reattachSession(connection, request.getName(),
request.getLastConfirmedCommandID());
- }
- catch (Exception e)
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- HornetQPacketHandler.log.error("Failed to reattach session", e);
-
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- channel1.send(response);
- }
-
- private void handleCreateQueue(final CreateQueueMessage request)
- {
- try
- {
- server.createQueue(request.getAddress(),
- request.getQueueName(),
- request.getFilterString(),
- request.isDurable(),
- request.isTemporary());
- }
- catch (Exception e)
- {
- HornetQPacketHandler.log.error("Failed to handle create queue", e);
- }
- }
-
- private void handleCreateReplication(final CreateReplicationSessionMessage request)
- {
- Packet response;
-
- try
- {
- Channel channel = connection.getChannel(request.getSessionChannelID(), -1);
-
- ReplicationEndpoint endpoint = server.connectToReplicationEndpoint(channel);
-
- channel.setHandler(endpoint);
-
- response = new NullResponseMessage();
- }
- catch (Exception e)
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- HornetQPacketHandler.log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- channel1.send(response);
- }
-
-}
\ No newline at end of file
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -22,7 +22,6 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -69,10 +68,8 @@
import org.hornetq.core.postoffice.impl.DivertBinding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -342,12 +339,12 @@
managementService.removeNotificationListener(groupingHandler);
groupingHandler = null;
}
- // Need to flush all sessions to make sure all confirmations get sent back to
client
-
- for (ServerSession session : sessions.values())
- {
- session.getChannel().flushConfirmations();
- }
+ // // Need to flush all sessions to make sure all confirmations get sent back to
client
+ //
+ // for (ServerSession session : sessions.values())
+ // {
+ // session.getChannel().flushConfirmations();
+ // }
}
// we stop the remoting service outside a lock
@@ -537,102 +534,21 @@
return clusterManager;
}
- public ReattachSessionResponseMessage reattachSession(final RemotingConnection
connection,
- final String name,
- final int
lastConfirmedCommandID) throws Exception
+ public ServerSession createSession(final String name,
+ final String username,
+ final String password,
+ final int minLargeMessageSize,
+ final CoreRemotingConnection connection,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final boolean xa) throws Exception
{
- if (!started)
- {
- return new ReattachSessionResponseMessage(-1, false);
- }
-
- ServerSession session = sessions.get(name);
-
- if (!checkActivate())
- {
- return new ReattachSessionResponseMessage(-1, false);
- }
-
- if (session == null)
- {
- return new ReattachSessionResponseMessage(-1, false);
- }
- else
- {
- if (session.getChannel().getConfirmationWindowSize() == -1)
- {
- // Even though session exists, we can't reattach since confi window size
== -1,
- // i.e. we don't have a resend cache for commands, so we just close the
old session
- // and let the client recreate
-
- try
- {
- session.close();
- }
- catch (Exception e)
- {
- HornetQServerImpl.log.error("Failed to close session", e);
- }
-
- sessions.remove(name);
-
- return new ReattachSessionResponseMessage(-1, false);
- }
- else
- {
- // Reconnect the channel to the new connection
- int serverLastConfirmedCommandID = session.transferConnection(connection,
lastConfirmedCommandID);
-
- return new ReattachSessionResponseMessage(serverLastConfirmedCommandID,
true);
- }
- }
- }
-
- public CreateSessionResponseMessage createSession(final String name,
- final long channelID,
- final String username,
- final String password,
- final int minLargeMessageSize,
- final int incrementingVersion,
- final RemotingConnection
connection,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final boolean xa,
- final int sendWindowSize) throws
Exception
- {
- if (!started)
- {
- throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
"Server not started");
- }
-
- if (version.getIncrementingVersion() != incrementingVersion)
- {
- HornetQServerImpl.log.warn("Client with version " +
incrementingVersion +
- " and address " +
- connection.getRemoteAddress() +
- " is not compatible with server version "
+
- version.getFullVersion() +
- ". " +
- "Please ensure all clients and servers are
upgraded to the same version for them to " +
- "interoperate properly");
- throw new
HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
- "Server and client versions
incompatible");
- }
-
- if (!checkActivate())
- {
- throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
- "Server will not accept create session
requests");
- }
-
if (securityStore != null)
{
securityStore.authenticate(username, password);
}
- Channel channel = connection.getChannel(channelID, sendWindowSize);
-
final ServerSessionImpl session = new ServerSessionImpl(name,
username,
password,
@@ -647,22 +563,13 @@
postOffice,
resourceManager,
securityStore,
- channel,
managementService,
this,
configuration.getManagementAddress());
sessions.put(name, session);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
-
storageManager.newContext(executorFactory.getExecutor()),
-
storageManager);
-
- session.setHandler(handler);
-
- channel.setHandler(handler);
-
- return new CreateSessionResponseMessage(version.getIncrementingVersion());
+ return session;
}
public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel
channel) throws Exception
@@ -937,7 +844,7 @@
}
}
- private synchronized boolean checkActivate() throws Exception
+ public synchronized boolean checkActivate() throws Exception
{
if (configuration.isBackup())
{
@@ -1025,8 +932,7 @@
if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser())
&&
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
{
- log.warn("Security risk! It has been detected that the cluster admin user
and password "
- + "have not been changed from the installation default. "
+ log.warn("Security risk! It has been detected that the cluster admin user
and password " + "have not been changed from the installation default. "
+ "Please see the HornetQ user guide, cluster chapter, for
instructions on how to do this.");
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -245,12 +245,12 @@
executor.execute(deliverRunner);
}
}
-
+
public Executor getExecutor()
{
return executor;
}
-
+
public synchronized void deliverNow()
{
deliverRunner.run();
@@ -743,7 +743,7 @@
}
return false;
}
-
+
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
{
int count = 0;
@@ -757,7 +757,7 @@
deliveringCount.incrementAndGet();
sendToDeadLetterAddress(ref);
iter.remove();
- count ++;
+ count++;
}
}
return count;
@@ -832,7 +832,7 @@
return false;
}
-
+
public int changeReferencesPriority(final Filter filter, final byte newPriority)
throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -843,7 +843,7 @@
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage()))
{
- count ++;
+ count++;
iter.remove();
ref.getMessage().setPriority(newPriority);
addLast(ref);
@@ -1404,7 +1404,8 @@
}
catch (Exception e)
{
- QueueImpl.log.warn("Unable to remove message id = " +
message.getMessageID() + " please remove manually");
+ QueueImpl.log.warn("Unable to remove message id = " +
message.getMessageID() + " please remove manually",
+ e);
}
}
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -23,7 +23,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -33,10 +32,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -44,6 +39,7 @@
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.transaction.Transaction;
@@ -112,7 +108,7 @@
private final java.util.Queue<MessageReference> deliveringRefs = new
ConcurrentLinkedQueue<MessageReference>();
- private final Channel channel;
+ private final SessionCallback callback;
private volatile boolean closed;
@@ -133,12 +129,11 @@
final boolean started,
final boolean browseOnly,
final StorageManager storageManager,
- final Channel channel,
+ final SessionCallback callback,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final ManagementService managementService) throws Exception
{
-
this.id = id;
this.filter = filter;
@@ -157,7 +152,7 @@
this.storageManager = storageManager;
- this.channel = channel;
+ this.callback = callback;
this.preAcknowledge = preAcknowledge;
@@ -361,9 +356,7 @@
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());
- final SessionReceiveMessage packet = new SessionReceiveMessage(id,
forcedDeliveryMessage, 0);
-
- channel.send(packet);
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
}
catch (Exception e)
{
@@ -634,15 +627,12 @@
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage
message)
{
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, message,
ref.getDeliveryCount());
+ int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
- channel.send(packet);
-
if (availableCredits != null)
{
- availableCredits.addAndGet(-packet.getPacketSize());
+ availableCredits.addAndGet(-packetSize);
}
-
}
// Inner classes
@@ -732,21 +722,18 @@
sizePendingLargeMessage = context.getLargeBodySize();
- SessionReceiveLargeMessage initialPacket = new
SessionReceiveLargeMessage(id,
-
headerBuffer.toByteBuffer()
-
.array(),
-
context.getLargeBodySize(),
-
ref.getDeliveryCount());
-
context.open();
sentInitialPacket = true;
- channel.send(initialPacket);
+ int packetSize = callback.sendLargeMessage(id,
+
headerBuffer.toByteBuffer().array(),
+ context.getLargeBodySize(),
+ ref.getDeliveryCount());
if (availableCredits != null)
{
- availableCredits.addAndGet(-initialPacket.getPacketSize());
+ availableCredits.addAndGet(-packetSize);
}
// Execute the rest of the large message on a different thread so as not
to tie up the delivery thread
@@ -768,22 +755,33 @@
return false;
}
- SessionReceiveContinuationMessage chunk = createChunkSend(context);
+ int localChunkLen = 0;
- int chunkLen = chunk.getBody().length;
+ localChunkLen = (int)Math.min(sizePendingLargeMessage -
positionPendingLargeMessage, minLargeMessageSize);
- channel.send(chunk);
+ HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
+ context.encode(bodyBuffer, localChunkLen);
+
+ byte[] body = bodyBuffer.toByteBuffer().array();
+
+ int packetSize = callback.sendLargeMessageContinuation(id,
+ body,
+
positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
+ false);
+
+ int chunkLen = body.length;
+
if (ServerConsumerImpl.trace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Sending " +
chunk.getPacketSize() +
+ ServerConsumerImpl.trace("deliverLargeMessage: Sending " +
packetSize +
" availableCredits now is " +
availableCredits);
}
if (availableCredits != null)
{
- availableCredits.addAndGet(-chunk.getPacketSize());
+ availableCredits.addAndGet(-packetSize);
}
positionPendingLargeMessage += chunkLen;
@@ -846,26 +844,6 @@
lock.unlock();
}
}
-
- private SessionReceiveContinuationMessage createChunkSend(final BodyEncoder
context) throws HornetQException
- {
- SessionReceiveContinuationMessage chunk;
-
- int localChunkLen = 0;
-
- localChunkLen = (int)Math.min(sizePendingLargeMessage -
positionPendingLargeMessage, minLargeMessageSize);
-
- HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
-
- context.encode(bodyBuffer, localChunkLen);
-
- chunk = new SessionReceiveContinuationMessage(id,
-
bodyBuffer.toByteBuffer().array(),
- positionPendingLargeMessage +
localChunkLen < sizePendingLargeMessage,
- false);
-
- return chunk;
- }
}
private class BrowserDeliverer implements Runnable
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -21,7 +21,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.protocol.core.PacketImpl;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -24,13 +24,13 @@
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.journal.IOAsyncTask;
@@ -42,55 +42,22 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.SecurityStore;
+import org.hornetq.core.server.BindingQueryResult;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.transaction.ResourceManager;
@@ -113,10 +80,10 @@
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
// Static
-------------------------------------------------------------------------------
-
+
// Attributes
----------------------------------------------------------------------------
- private final long id;
+ // private final long id;
private final String username;
@@ -132,7 +99,7 @@
private final boolean strictUpdateDeliveryCount;
- private RemotingConnection remotingConnection;
+ private CoreRemotingConnection remotingConnection;
private final Map<Long, ServerConsumer> consumers = new
ConcurrentHashMap<Long, ServerConsumer>();
@@ -146,8 +113,6 @@
private final SecurityStore securityStore;
- private final Channel channel;
-
private final ManagementService managementService;
private volatile boolean started = false;
@@ -163,14 +128,14 @@
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
- private ServerSessionPacketHandler handler;
-
private boolean closed;
private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new
HashMap<SimpleString, CreditManagerHolder>();
private final RoutingContext routingContext = new RoutingContextImpl(null);
+ private SessionCallback callback;
+
// Constructors
---------------------------------------------------------------------------------
public ServerSessionImpl(final String name,
@@ -182,18 +147,15 @@
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final boolean xa,
- final RemotingConnection remotingConnection,
+ final CoreRemotingConnection remotingConnection,
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
- final Channel channel,
final ManagementService managementService,
final HornetQServer server,
final SimpleString managementAddress) throws Exception
{
- id = channel.getID();
-
this.username = username;
this.password = password;
@@ -223,8 +185,6 @@
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
- this.channel = channel;
-
this.managementService = managementService;
this.name = name;
@@ -240,16 +200,11 @@
// ServerSession implementation
----------------------------------------------------------------------------
- public ServerSessionPacketHandler getHandler()
+ public void setCallback(final SessionCallback callback)
{
- return handler;
+ this.callback = callback;
}
- public void setHandler(final ServerSessionPacketHandler handler)
- {
- this.handler = handler;
- }
-
public String getUsername()
{
return username;
@@ -265,11 +220,6 @@
return minLargeMessageSize;
}
- public long getID()
- {
- return id;
- }
-
public String getName()
{
return name;
@@ -288,7 +238,7 @@
}
}
- public synchronized void close() throws Exception
+ private synchronized void doClose() throws Exception
{
if (tx != null && tx.getXid() == null)
{
@@ -330,579 +280,346 @@
{
holder.store.returnProducerCredits(holder.outstandingCredits);
}
+
+ callback.closed();
}
- public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
+ public void createConsumer(final long consumerID,
+ final SimpleString name,
+ final SimpleString filterString,
+ final boolean browseOnly) throws Exception
{
- SimpleString name = packet.getQueueName();
-
- SimpleString filterString = packet.getFilterString();
+ Binding binding = postOffice.getBinding(name);
- boolean browseOnly = packet.isBrowseOnly();
-
- Packet response = null;
-
- try
+ if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
{
- Binding binding = postOffice.getBinding(name);
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue
" + name + " does not exist");
+ }
- if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
- {
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue
" + name + " does not exist");
- }
+ securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
- securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
+ Filter filter = FilterImpl.createFilter(filterString);
- Filter filter = FilterImpl.createFilter(filterString);;
+ ServerConsumer consumer = new ServerConsumerImpl(consumerID,
+ this,
+ (QueueBinding)binding,
+ filter,
+ started,
+ browseOnly,
+ storageManager,
+ callback,
+ preAcknowledge,
+ strictUpdateDeliveryCount,
+ managementService);
- ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
- this,
- (QueueBinding)binding,
- filter,
- started,
- browseOnly,
- storageManager,
- channel,
- preAcknowledge,
- strictUpdateDeliveryCount,
- managementService);
+ consumers.put(consumer.getID(), consumer);
- consumers.put(consumer.getID(), consumer);
+ if (!browseOnly)
+ {
+ TypedProperties props = new TypedProperties();
- if (!browseOnly)
- {
- TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS,
binding.getAddress());
- props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS,
binding.getAddress());
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME,
binding.getClusterName());
- props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME,
binding.getClusterName());
+ props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME,
binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME,
binding.getRoutingName());
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
+ Queue theQueue = (Queue)binding.getBindable();
- Queue theQueue = (Queue)binding.getBindable();
+ props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT,
theQueue.getConsumerCount());
- props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT,
theQueue.getConsumerCount());
+ if (filterString != null)
+ {
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
filterString);
+ }
- if (filterString != null)
- {
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
filterString);
- }
+ Notification notification = new Notification(null, CONSUMER_CREATED, props);
- Notification notification = new Notification(null, CONSUMER_CREATED, props);
+ managementService.sendNotification(notification);
+ }
+ }
- managementService.sendNotification(notification);
- }
-
- //We send back queue information on the queue as a response- this allows the
queue to
- //be automaticall recreated on failover
-
- if (packet.isRequiresResponse())
- {
- response = doExecuteQueueQuery(name);
- }
- else
- {
- response = null;
- }
+ public void createQueue(final SimpleString address,
+ final SimpleString name,
+ final SimpleString filterString,
+ final boolean temporary,
+ final boolean durable) throws Exception
+ {
+ if (durable)
+ {
+ // make sure the user has privileges to create this queue
+ securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
}
- catch (Exception e)
+ else
{
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to create consumer", e);
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
}
-
- sendResponse(packet, response, false, false);
- }
- public void handleCreateQueue(final CreateQueueMessage packet)
- {
- SimpleString address = packet.getAddress();
+ server.createQueue(address, name, filterString, durable, temporary);
- final SimpleString name = packet.getQueueName();
-
- SimpleString filterString = packet.getFilterString();
-
- boolean temporary = packet.isTemporary();
-
- boolean durable = packet.isDurable();
-
- Packet response = null;
-
- try
+ if (temporary)
{
- if (durable)
- {
- // make sure the user has privileges to create this queue
- securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
- }
- else
- {
- securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
- }
+ // Temporary queue in core simply means the queue will be deleted if
+ // the remoting connection
+ // dies. It does not mean it will get deleted automatically when the
+ // session is closed.
+ // It is up to the user to delete the queue when finished with it
- server.createQueue(address, name, filterString, durable, temporary);
-
- if (temporary)
+ failureRunners.put(name, new Runnable()
{
- // Temporary queue in core simply means the queue will be deleted if
- // the remoting connection
- // dies. It does not mean it will get deleted automatically when the
- // session is closed.
- // It is up to the user to delete the queue when finished with it
-
- failureRunners.put(name, new Runnable()
+ public void run()
{
- public void run()
+ try
{
- try
+ if (postOffice.getBinding(name) != null)
{
- if (postOffice.getBinding(name) != null)
+ postOffice.removeBinding(name);
+
+ if (postOffice.getBindingsForAddress(name).getBindings().size() ==
0)
{
- postOffice.removeBinding(name);
-
- if (postOffice.getBindingsForAddress(name).getBindings().size()
== 0)
- {
- creditManagerHolders.remove(name);
- }
+ creditManagerHolders.remove(name);
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to remove temporary queue
" + name);
- }
}
- });
- }
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
- else
- {
- response = null;
- }
+ catch (Exception e)
+ {
+ ServerSessionImpl.log.error("Failed to remove temporary queue
" + name);
+ }
+ }
+ });
}
- catch (Exception e)
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to create queue", e);
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
+ public void deleteQueue(final SimpleString name) throws Exception
{
- SimpleString name = packet.getQueueName();
+ Binding binding = postOffice.getBinding(name);
- Packet response = null;
-
- try
+ if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
{
- Binding binding = postOffice.getBinding(name);
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
+ }
- if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
- {
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
- }
+ server.destroyQueue(name, this);
- server.destroyQueue(name, this);
+ failureRunners.remove(name);
- failureRunners.remove(name);
-
- if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
- {
- creditManagerHolders.remove(name);
- }
-
- response = new NullResponseMessage();
- }
- catch (Exception e)
+ if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
{
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to delete queue", e);
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ creditManagerHolders.remove(name);
}
-
- sendResponse(packet, response, false, false);
}
-
- public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
- {
- SimpleString name = packet.getQueueName();
- Packet response = null;
-
- try
+ public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
+ {
+ if (name == null)
{
- response = doExecuteQueueQuery(name);
+ throw new IllegalArgumentException("Queue name is null");
}
- catch (Exception e)
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to execute queue query", e);
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
+ QueueQueryResult response;
- sendResponse(packet, response, false, false);
- }
+ Binding binding = postOffice.getBinding(name);
- public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
- {
- SimpleString address = packet.getAddress();
-
- Packet response = null;
-
- try
+ if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
{
- if (address == null)
- {
- throw new IllegalArgumentException("Address is null");
- }
+ Queue queue = (Queue)binding.getBindable();
- List<SimpleString> names = new ArrayList<SimpleString>();
+ Filter filter = queue.getFilter();
- Bindings bindings = postOffice.getMatchingBindings(address);
+ SimpleString filterString = filter == null ? null : filter.getFilterString();
- for (Binding binding : bindings.getBindings())
- {
- if (binding.getType() == BindingType.LOCAL_QUEUE)
- {
- names.add(binding.getUniqueName());
- }
- }
-
- response = new SessionBindingQueryResponseMessage(!names.isEmpty(), names);
+ response = new QueueQueryResult(name,
+ binding.getAddress(),
+ queue.isDurable(),
+ queue.isTemporary(),
+ filterString,
+ queue.getConsumerCount(),
+ queue.getMessageCount());
}
- catch (Exception e)
+ // make an exception for the management address (see HORNETQ-29)
+ else if (name.equals(managementAddress))
{
- ServerSessionImpl.log.error("Failed to execute binding query", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ response = new QueueQueryResult(name, managementAddress, true, false, null, -1,
-1);
}
+ else
+ {
+ response = new QueueQueryResult();
+ }
- sendResponse(packet, response, false, false);
+ return response;
}
- public void handleForceConsumerDelivery(final SessionForceConsumerDelivery message)
+ public BindingQueryResult executeBindingQuery(final SimpleString address)
{
- try
+ if (address == null)
{
- ServerConsumer consumer = consumers.get(message.getConsumerID());
-
- consumer.forceDelivery(message.getSequence());
+ throw new IllegalArgumentException("Address is null");
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to query consumer deliveries",
e);
- }
- sendResponse(message, null, false, false);
- }
+ List<SimpleString> names = new ArrayList<SimpleString>();
- public void handleAcknowledge(final SessionAcknowledgeMessage packet)
- {
- Packet response = null;
+ Bindings bindings = postOffice.getMatchingBindings(address);
- try
+ for (Binding binding : bindings.getBindings())
{
- ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
- consumer.acknowledge(autoCommitAcks, tx, packet.getMessageID());
-
- if (packet.isRequiresResponse())
+ if (binding.getType() == BindingType.LOCAL_QUEUE)
{
- response = new NullResponseMessage();
+ names.add(binding.getUniqueName());
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to acknowledge", e);
- if (packet.isRequiresResponse())
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
- }
+ return new BindingQueryResult(!names.isEmpty(), names);
+ }
- sendResponse(packet, response, false, false);
+ public void forceConsumerDelivery(final long consumerID, final long sequence) throws
Exception
+ {
+ ServerConsumer consumer = consumers.get(consumerID);
+
+ consumer.forceDelivery(sequence);
}
- public void handleExpired(final SessionExpiredMessage packet)
+ public void acknowledge(final long consumerID, final long messageID) throws Exception
{
- try
- {
- MessageReference ref =
consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
+ ServerConsumer consumer = consumers.get(consumerID);
- if (ref != null)
- {
- ref.getQueue().expire(ref);
- }
- }
- catch (Exception e)
+ consumer.acknowledge(autoCommitAcks, tx, messageID);
+ }
+
+ public void expire(final long consumerID, final long messageID) throws Exception
+ {
+ MessageReference ref = consumers.get(consumerID).getExpired(messageID);
+
+ if (ref != null)
{
- ServerSessionImpl.log.error("Failed to acknowledge", e);
+ ref.getQueue().expire(ref);
}
-
- sendResponse(packet, null, false, false);
}
- public void handleCommit(final Packet packet)
+ public void commit() throws Exception
{
- Packet response = null;
-
try
{
tx.commit();
-
- response = new NullResponseMessage();
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to commit", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
finally
{
tx = new TransactionImpl(storageManager);
}
-
- sendResponse(packet, response, false, false);
}
- public void handleRollback(final RollbackMessage packet)
+ public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
{
- Packet response = null;
-
- try
+ if (tx == null)
{
- rollback(packet.isConsiderLastMessageAsDelivered());
+ // Might be null if XA
- response = new NullResponseMessage();
+ tx = new TransactionImpl(storageManager);
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to rollback", e);
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
+ doRollback(considerLastMessageAsDelivered, tx);
- sendResponse(packet, response, false, false);
+ tx = new TransactionImpl(storageManager);
}
- public void handleXACommit(final SessionXACommitMessage packet)
+ public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
{
- Packet response = null;
+ if (tx != null)
+ {
+ final String msg = "Cannot commit, session is currently doing work in
transaction " + tx.getXid();
- Xid xid = packet.getXid();
-
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx != null)
- {
- final String msg = "Cannot commit, session is currently doing work in
transaction " + tx.getXid();
+ Transaction theTx = resourceManager.removeTransaction(xid);
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ if (theTx == null)
+ {
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+ {
+ throw new HornetQXAException(XAException.XA_HEURCOM,
+ "transaction has been heuristically
committed: " + xid);
+ }
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ throw new HornetQXAException(XAException.XA_HEURRB,
+ "transaction has been heuristically
rolled back: " + xid);
+ }
+ else
+ {
+ throw new HornetQXAException(XAException.XAER_NOTA, "Cannot find xid
in resource manager: " + xid);
+ }
}
else
{
- Transaction theTx = resourceManager.removeTransaction(xid);
-
- if (theTx == null)
+ if (theTx.getState() == Transaction.State.SUSPENDED)
{
- // checked heuristic committed transactions
- if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURCOM,
- "transaction has been
heuristically committed: " + xid);
- }
- // checked heuristic rolled back transactions
- else if
(resourceManager.getHeuristicRolledbackTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURRB,
- "transaction has been
heuristically rolled back: " + xid);
- }
- else
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_NOTA,
- "Cannot find xid in
resource manager: " + xid);
- }
+ // Put it back
+ resourceManager.putTransaction(xid, tx);
+
+ throw new HornetQXAException(XAException.XAER_PROTO, "Cannot commit
transaction, it is suspended " + xid);
}
else
{
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- // Put it back
- resourceManager.putTransaction(xid, tx);
-
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot commit
transaction, it is suspended " + xid);
- }
- else
- {
- theTx.commit(packet.isOnePhase());
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
- }
+ theTx.commit(onePhase);
}
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa commit", e);
+ }
- if (e instanceof HornetQException)
+ public void xaEnd(final Xid xid) throws Exception
+ {
+ if (tx != null && tx.getXid().equals(xid))
+ {
+ if (tx.getState() == Transaction.State.SUSPENDED)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ final String msg = "Cannot end, transaction is suspended";
+
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
}
else
{
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
+ tx = null;
}
}
-
- sendResponse(packet, response, false, false);
- }
-
- public void handleXAEnd(final SessionXAEndMessage packet)
- {
- Packet response = null;
-
- Xid xid = packet.getXid();
-
- try
+ else
{
- if (tx != null && tx.getXid().equals(xid))
+ // It's also legal for the TM to call end for a Xid in the suspended
+ // state
+ // See JTA 1.1 spec 3.4.4 - state diagram
+ // Although in practice TMs rarely do this.
+ Transaction theTx = resourceManager.getTransaction(xid);
+
+ if (theTx == null)
{
- if (tx.getState() == Transaction.State.SUSPENDED)
- {
- final String msg = "Cannot end, transaction is suspended";
+ final String msg = "Cannot find suspended transaction to end " +
xid;
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO,
msg);
- }
- else
- {
- tx = null;
- }
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
else
{
- // It's also legal for the TM to call end for a Xid in the suspended
- // state
- // See JTA 1.1 spec 3.4.4 - state diagram
- // Although in practice TMs rarely do this.
- Transaction theTx = resourceManager.getTransaction(xid);
-
- if (theTx == null)
+ if (theTx.getState() != Transaction.State.SUSPENDED)
{
- final String msg = "Cannot find suspended transaction to end " +
xid;
+ final String msg = "Transaction is not suspended " + xid;
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA,
msg);
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
}
else
{
- if (theTx.getState() != Transaction.State.SUSPENDED)
- {
- final String msg = "Transaction is not suspended " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO,
msg);
- }
- else
- {
- theTx.resume();
- }
+ theTx.resume();
}
}
-
- if (response == null)
- {
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa end", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleXAForget(final SessionXAForgetMessage packet)
+ public void xaForget(final Xid xid) throws Exception
{
- long id = resourceManager.removeHeuristicCompletion(packet.getXid());
- int code = XAResource.XA_OK;
+ long id = resourceManager.removeHeuristicCompletion(xid);
+
if (id != -1)
{
try
@@ -912,402 +629,236 @@
catch (Exception e)
{
e.printStackTrace();
- code = XAException.XAER_RMERR;
+
+ throw new HornetQXAException(XAException.XAER_RMERR);
}
}
else
{
- code = XAException.XAER_NOTA;
+ throw new HornetQXAException(XAException.XAER_NOTA);
}
-
- Packet response = new SessionXAResponseMessage((code != XAResource.XA_OK), code,
null);
-
- sendResponse(packet, response, false, false);
}
- public void handleXAJoin(final SessionXAJoinMessage packet)
+ public void xaJoin(final Xid xid) throws Exception
{
- Packet response = null;
+ Transaction theTx = resourceManager.getTransaction(xid);
- Xid xid = packet.getXid();
-
- try
+ if (theTx == null)
{
- Transaction theTx = resourceManager.getTransaction(xid);
+ final String msg = "Cannot find xid in resource manager: " + xid;
- if (theTx == null)
- {
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
- }
- else
- {
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot join tx, it is
suspended " + xid);
- }
- else
- {
- tx = theTx;
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
- }
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
- catch (Exception e)
+ else
{
- ServerSessionImpl.log.error("Failed to xa join", e);
-
- if (e instanceof HornetQException)
+ if (theTx.getState() == Transaction.State.SUSPENDED)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ throw new HornetQXAException(XAException.XAER_PROTO, "Cannot join tx, it
is suspended " + xid);
}
else
{
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
+ tx = theTx;
}
}
-
- sendResponse(packet, response, false, false);
}
- public void handleXAResume(final SessionXAResumeMessage packet)
+ public void xaResume(final Xid xid) throws Exception
{
- Packet response = null;
+ if (tx != null)
+ {
+ final String msg = "Cannot resume, session is currently doing work in a
transaction " + tx.getXid();
- Xid xid = packet.getXid();
-
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx != null)
+ Transaction theTx = resourceManager.getTransaction(xid);
+
+ if (theTx == null)
{
- final String msg = "Cannot resume, session is currently doing work in a
transaction " + tx.getXid();
+ final String msg = "Cannot find xid in resource manager: " + xid;
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
else
{
- Transaction theTx = resourceManager.getTransaction(xid);
-
- if (theTx == null)
+ if (theTx.getState() != Transaction.State.SUSPENDED)
{
- final String msg = "Cannot find xid in resource manager: " +
xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA,
msg);
+ throw new HornetQXAException(XAException.XAER_PROTO,
+ "Cannot resume transaction, it is not
suspended " + xid);
}
else
{
- if (theTx.getState() != Transaction.State.SUSPENDED)
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot resume
transaction, it is not suspended " + xid);
- }
- else
- {
- tx = theTx;
+ tx = theTx;
- tx.resume();
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
- }
+ tx.resume();
}
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa resume", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleXARollback(final SessionXARollbackMessage packet)
+ public void xaRollback(final Xid xid) throws Exception
{
- Packet response = null;
+ if (tx != null)
+ {
+ final String msg = "Cannot roll back, session is currently doing work in a
transaction " + tx.getXid();
- Xid xid = packet.getXid();
-
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx != null)
- {
- final String msg = "Cannot roll back, session is currently doing work in
a transaction " + tx.getXid();
+ Transaction theTx = resourceManager.removeTransaction(xid);
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
- else
+ if (theTx == null)
{
- Transaction theTx = resourceManager.removeTransaction(xid);
-
- if (theTx == null)
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
{
- // checked heuristic committed transactions
- if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURCOM,
- "transaction has ben
heuristically committed: " + xid);
- }
- // checked heuristic rolled back transactions
- else if
(resourceManager.getHeuristicRolledbackTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURRB,
- "transaction has ben
heuristically rolled back: " + xid);
- }
- else
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_NOTA,
- "Cannot find xid in
resource manager: " + xid);
- }
+ throw new HornetQXAException(XAException.XA_HEURCOM,
+ "transaction has ben heuristically
committed: " + xid);
}
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ throw new HornetQXAException(XAException.XA_HEURRB,
+ "transaction has ben heuristically
rolled back: " + xid);
+ }
else
{
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- // Put it back
- resourceManager.putTransaction(xid, tx);
-
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot rollback
transaction, it is suspended " + xid);
- }
- else
- {
- doRollback(false, theTx);
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
- }
+ throw new HornetQXAException(XAException.XAER_NOTA, "Cannot find xid
in resource manager: " + xid);
}
}
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa rollback", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
else
{
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
- }
-
- public void handleXAStart(final SessionXAStartMessage packet)
- {
- Packet response = null;
-
- Xid xid = packet.getXid();
-
- try
- {
- if (tx != null)
- {
- final String msg = "Cannot start, session is already doing work in a
transaction " + tx.getXid();
-
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
- else
- {
- tx = new TransactionImpl(xid, storageManager, postOffice);
-
- boolean added = resourceManager.putTransaction(xid, tx);
-
- if (!added)
+ if (theTx.getState() == Transaction.State.SUSPENDED)
{
- final String msg = "Cannot start, there is already a xid " +
tx.getXid();
+ // Put it back
+ resourceManager.putTransaction(xid, tx);
- response = new SessionXAResponseMessage(true, XAException.XAER_DUPID,
msg);
+ throw new HornetQXAException(XAException.XAER_PROTO,
+ "Cannot rollback transaction, it is
suspended " + xid);
}
else
{
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ doRollback(false, theTx);
}
}
}
- catch (Exception e)
+ }
+
+ public void xaStart(final Xid xid) throws Exception
+ {
+ if (tx != null)
{
- ServerSessionImpl.log.error("Failed to xa start", e);
+ final String msg = "Cannot start, session is already doing work in a
transaction " + tx.getXid();
- if (e instanceof HornetQException)
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ tx = new TransactionImpl(xid, storageManager, postOffice);
+
+ boolean added = resourceManager.putTransaction(xid, tx);
+
+ if (!added)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ final String msg = "Cannot start, there is already a xid " +
tx.getXid();
+
+ throw new HornetQXAException(XAException.XAER_DUPID, msg);
}
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
}
-
- sendResponse(packet, response, false, false);
}
- public void handleXASuspend(final Packet packet)
+ public void xaSuspend() throws Exception
{
- Packet response = null;
+ if (tx == null)
+ {
+ final String msg = "Cannot suspend, session is not doing work in a
transaction ";
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx == null)
+ if (tx.getState() == Transaction.State.SUSPENDED)
{
- final String msg = "Cannot suspend, session is not doing work in a
transaction ";
+ final String msg = "Cannot suspend, transaction is already suspended
" + tx.getXid();
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
}
else
{
- if (tx.getState() == Transaction.State.SUSPENDED)
- {
- final String msg = "Cannot suspend, transaction is already suspended
" + tx.getXid();
+ tx.suspend();
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO,
msg);
- }
- else
- {
- tx.suspend();
-
- tx = null;
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
+ tx = null;
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa suspend", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleXAPrepare(final SessionXAPrepareMessage packet)
+ public void xaPrepare(final Xid xid) throws Exception
{
- Packet response = null;
+ if (tx != null)
+ {
+ final String msg = "Cannot commit, session is currently doing work in a
transaction " + tx.getXid();
- Xid xid = packet.getXid();
-
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx != null)
+ Transaction theTx = resourceManager.getTransaction(xid);
+
+ if (theTx == null)
{
- final String msg = "Cannot commit, session is currently doing work in a
transaction " + tx.getXid();
+ final String msg = "Cannot find xid in resource manager: " + xid;
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
else
{
- Transaction theTx = resourceManager.getTransaction(xid);
-
- if (theTx == null)
+ if (theTx.getState() == Transaction.State.SUSPENDED)
{
- final String msg = "Cannot find xid in resource manager: " +
xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA,
msg);
+ throw new HornetQXAException(XAException.XAER_PROTO,
+ "Cannot prepare transaction, it is
suspended " + xid);
}
else
{
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot prepare
transaction, it is suspended " + xid);
- }
- else
- {
- theTx.prepare();
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
- }
+ theTx.prepare();
}
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa prepare", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleGetInDoubtXids(final Packet packet)
+ public List<Xid> xaGetInDoubtXids()
{
- List<Xid> indoubtsXids = new ArrayList<Xid>();
- indoubtsXids.addAll(resourceManager.getPreparedTransactions());
- indoubtsXids.addAll(resourceManager.getHeuristicCommittedTransactions());
- indoubtsXids.addAll(resourceManager.getHeuristicRolledbackTransactions());
- Packet response = new SessionXAGetInDoubtXidsResponseMessage(indoubtsXids);
+ List<Xid> xids = new ArrayList<Xid>();
- sendResponse(packet, response, false, false);
+ xids.addAll(resourceManager.getPreparedTransactions());
+ xids.addAll(resourceManager.getHeuristicCommittedTransactions());
+ xids.addAll(resourceManager.getHeuristicRolledbackTransactions());
+
+ return xids;
}
- public void handleGetXATimeout(final Packet packet)
+ public int xaGetTimeout()
{
- Packet response = new
SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
-
- sendResponse(packet, response, false, false);
+ return resourceManager.getTimeoutSeconds();
}
- public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
+ public void xaSetTimeout(final int timeout)
{
- Packet response = new
SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
-
- sendResponse(packet, response, false, false);
+ resourceManager.setTimeoutSeconds(timeout);
}
- public void handleStart(final Packet packet)
+ public void start()
{
setStarted(true);
-
- sendResponse(packet, null, false, false);
}
- public void handleStop(final Packet packet)
+ public void stop()
{
- final Packet response = new NullResponseMessage();
-
setStarted(false);
-
- sendResponse(packet, response, false, false);
}
- public void handleClose(final Packet packet)
+ public void close()
{
storageManager.afterCompleteOperations(new IOAsyncTask()
{
@@ -1317,97 +868,63 @@
public void done()
{
- doClose(packet);
+ try
+ {
+ doClose();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
}
});
}
- public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
+ public void closeConsumer(final long consumerID) throws Exception
{
- final ServerConsumer consumer = consumers.get(packet.getConsumerID());
+ final ServerConsumer consumer = consumers.get(consumerID);
- Packet response;
-
- try
+ if (consumer != null)
{
- if (consumer != null)
- {
- consumer.close();
- }
- else
- {
- ServerSessionImpl.log.error("Cannot find consumer with id " +
packet.getConsumerID());
- }
-
- response = new NullResponseMessage();
+ consumer.close();
}
- catch (Exception e)
+ else
{
- ServerSessionImpl.log.error("Failed to close consumer", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ ServerSessionImpl.log.error("Cannot find consumer with id " +
consumerID);
}
-
- sendResponse(packet, response, false, false);
}
- public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage
packet)
+ public void receiveConsumerCredits(final long consumerID, final int credits) throws
Exception
{
- ServerConsumer consumer = consumers.get(packet.getConsumerID());
+ ServerConsumer consumer = consumers.get(consumerID);
if (consumer == null)
{
- ServerSessionImpl.log.error("There is no consumer with id " +
packet.getConsumerID());
-
+ ServerSessionImpl.log.error("There is no consumer with id " +
consumerID);
+
return;
}
- try
- {
- consumer.receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to receive credits " +
server.getConfiguration().isBackup(), e);
- }
-
- sendResponse(packet, null, false, false);
+ consumer.receiveCredits(credits);
}
- public void handleSendLargeMessage(final SessionSendLargeMessage packet)
+ public void sendLarge(final byte[] largeMessageHeader) throws Exception
{
-
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
- LargeServerMessage msg = doCreateLargeMessage(id, packet);
+ LargeServerMessage msg = storageManager.createLargeMessage(id,
largeMessageHeader);
- if (msg != null)
+ if (currentLargeMessage != null)
{
- if (currentLargeMessage != null)
- {
- ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with
ID=" + currentLargeMessage.getMessageID());
- }
-
- currentLargeMessage = msg;
-
- sendResponse(packet, null, false, false);
+ ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with
ID=" + currentLargeMessage.getMessageID());
}
+
+ currentLargeMessage = msg;
}
- public void handleSend(final SessionSendMessage packet)
+ public void send(final ServerMessage message) throws Exception
{
- Packet response = null;
-
- ServerMessage message = (ServerMessage)packet.getMessage();
-
try
{
long id = storageManager.generateUniqueID();
@@ -1423,30 +940,9 @@
}
else
{
- send(message);
+ doSend(message);
}
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to send message", e);
-
- if (packet.isRequiresResponse())
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
- }
finally
{
try
@@ -1458,74 +954,40 @@
ServerSessionImpl.log.error("Failed to release outstanding
credits", e);
}
}
-
- sendResponse(packet, response, false, false);
}
- public void handleSendContinuations(final SessionSendContinuationMessage packet)
+ public void sendContinuations(final int packetSize, final byte[] body, final boolean
continues) throws Exception
{
- Packet response = null;
-
- try
+ if (currentLargeMessage == null)
{
- if (currentLargeMessage == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
"large-message not initialized on server");
- }
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message
not initialized on server");
+ }
- // Immediately release the credits for the continuations- these don't
contribute to the in-memory size
- // of the message
+ // Immediately release the credits for the continuations- these don't
contribute to the in-memory size
+ // of the message
- releaseOutStanding(currentLargeMessage, packet.getPacketSize());
+ releaseOutStanding(currentLargeMessage, packetSize);
- currentLargeMessage.addBytes(packet.getBody());
+ currentLargeMessage.addBytes(body);
- if (!packet.isContinues())
- {
- currentLargeMessage.releaseResources();
+ if (!continues)
+ {
+ currentLargeMessage.releaseResources();
- send(currentLargeMessage);
+ doSend(currentLargeMessage);
- releaseOutStanding(currentLargeMessage,
currentLargeMessage.getEncodeSize());
+ releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
- currentLargeMessage = null;
- }
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
+ currentLargeMessage = null;
}
- catch (Exception e)
- {
- if (packet.isRequiresResponse())
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to send message", e);
-
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleRequestProducerCredits(final SessionRequestProducerCreditsMessage
packet) throws Exception
+ public void requestProducerCredits(final SimpleString address, final int credits)
throws Exception
{
- final SimpleString address = packet.getAddress();
-
final CreditManagerHolder holder = getCreditManagerHolder(address);
- int credits = packet.getCredits();
-
- //Requesting -ve credits means returning them
-
+ // Requesting -ve credits means returning them
+
if (credits < 0)
{
releaseOutStanding(address, -credits);
@@ -1541,7 +1003,7 @@
if (!closed)
{
sendProducerCredits(holder, credits, address);
-
+
return true;
}
else
@@ -1551,58 +1013,37 @@
}
}
});
-
+
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
}
}
-
- sendResponse(packet, null, false, false);
}
- public int transferConnection(final RemotingConnection newConnection, final int
lastReceivedCommandID)
- {
- //We need to disable delivery on all the consumers while the transfer is occurring-
otherwise packets might get delivered
- //after the channel has transferred but *before* packets have been replayed - this
will give the client the wrong
- //sequence of packets.
- //It is not sufficient to just stop the session, since right after stopping the
session, another session start might be executed
- //before we have transferred the connection, leaving it in a started state
- setTransferring(true);
+ public void setTransferring(final boolean transferring)
+ {
+ Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
- remotingConnection.removeFailureListener(this);
- remotingConnection.removeCloseListener(this);
-
- // Note. We do not destroy the replicating connection here. In the case the live
server has really crashed
- // then the connection will get cleaned up anyway when the server ping timeout
kicks in.
- // In the case the live server is really still up, i.e. a split brain situation (or
in tests), then closing
- // the replicating connection will cause the outstanding responses to be be
replayed on the live server,
- // if these reach the client who then subsequently fails over, on reconnection to
backup, it will have
- // received responses that the backup did not know about.
-
- channel.transferConnection(newConnection);
-
-
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
- remotingConnection = newConnection;
-
- remotingConnection.addFailureListener(this);
- remotingConnection.addCloseListener(this);
-
- int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
-
- channel.replayCommands(lastReceivedCommandID, id);
-
- channel.setTransferring(false);
-
- setTransferring(false);
-
- return serverLastReceivedCommandID;
+ for (ServerConsumer consumer : consumersClone)
+ {
+ consumer.setTransferring(transferring);
+ }
}
- public Channel getChannel()
+ public void runConnectionFailureRunners()
{
- return channel;
+ for (Runnable runner : failureRunners.values())
+ {
+ try
+ {
+ runner.run();
+ }
+ catch (Throwable t)
+ {
+ ServerSessionImpl.log.error("Failed to execute failure runner",
t);
+ }
+ }
}
// FailureListener implementation
@@ -1626,7 +1067,7 @@
}
}
- handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
+ close();
ServerSessionImpl.log.warn("Cleared up resources for session " +
name);
}
@@ -1654,7 +1095,7 @@
}
catch (Throwable t)
{
- ServerSessionImpl.log.error("Failed fire listeners " + this);
+ ServerSessionImpl.log.error("Failed to fire listeners " + this);
}
}
@@ -1665,129 +1106,6 @@
// Private
// ----------------------------------------------------------------------------
- private SessionQueueQueryResponseMessage doExecuteQueueQuery(final SimpleString name)
throws Exception
- {
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name is null");
- }
-
- SessionQueueQueryResponseMessage response;
-
- Binding binding = postOffice.getBinding(name);
-
- if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
- {
- Queue queue = (Queue)binding.getBindable();
-
- Filter filter = queue.getFilter();
-
- SimpleString filterString = filter == null ? null : filter.getFilterString();
-
- response = new SessionQueueQueryResponseMessage(name,
- binding.getAddress(),
- queue.isDurable(),
- queue.isTemporary(),
- filterString,
- queue.getConsumerCount(),
- queue.getMessageCount());
- }
- // make an exception for the management address (see HORNETQ-29)
- else if (name.equals(managementAddress))
- {
- response = new SessionQueueQueryResponseMessage(name, managementAddress, true,
false, null, -1, -1);
- }
- else
- {
- response = new SessionQueueQueryResponseMessage();
- }
-
- return response;
- }
-
- private void sendResponse(final Packet confirmPacket,
- final Packet response,
- final boolean flush,
- final boolean closeChannel)
- {
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
- public void onError(final int errorCode, final String errorMessage)
- {
- ServerSessionImpl.log.warn("Error processing IOCallback code = " +
errorCode + " message = " + errorMessage);
-
- HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new
HornetQException(errorCode,
-
errorMessage));
-
- doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
- }
-
- public void done()
- {
- doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
- }
- });
- }
-
- /**
- * @param confirmPacket
- * @param response
- * @param flush
- * @param closeChannel
- */
- private void doConfirmAndResponse(final Packet confirmPacket,
- final Packet response,
- final boolean flush,
- final boolean closeChannel)
- {
- if (confirmPacket != null)
- {
- channel.confirm(confirmPacket);
-
- if (flush)
- {
- channel.flushConfirmations();
- }
- }
-
- if (response != null)
- {
- channel.send(response);
- }
-
- if (closeChannel)
- {
- channel.close();
- }
- }
-
- private void doClose(final Packet packet)
- {
- Packet response = null;
-
- try
- {
- close();
-
- response = new NullResponseMessage();
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to close", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, true, true);
- }
-
private void setStarted(final boolean s)
{
Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
@@ -1799,45 +1117,7 @@
started = s;
}
-
- private void setTransferring(final boolean transferring)
- {
- Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
- for (ServerConsumer consumer : consumersClone)
- {
- consumer.setTransferring(transferring);
- }
- }
-
- /**
- * We need to create the LargeMessage before replicating the packet, or else we
won't know how to extract the destination,
- * which is stored on the header
- * @param packet
- * @throws Exception
- */
- private LargeServerMessage doCreateLargeMessage(final long id, final
SessionSendLargeMessage packet)
- {
- try
- {
- LargeServerMessage msg = createLargeMessageStorage(id,
packet.getLargeMessageHeader());
-
- return msg;
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to create large message", e);
- Packet response = null;
-
- channel.confirm(packet);
- if (response != null)
- {
- channel.send(response);
- }
- return null;
- }
- }
-
private void handleManagementMessage(final ServerMessage message) throws Exception
{
try
@@ -1861,15 +1141,10 @@
{
reply.setAddress(replyTo);
- send(reply);
+ doSend(reply);
}
}
- private LargeServerMessage createLargeMessageStorage(final long id, final byte[]
header) throws Exception
- {
- return storageManager.createLargeMessage(id, header);
- }
-
private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx)
throws Exception
{
boolean wasStarted = started;
@@ -1902,20 +1177,6 @@
}
}
- private void rollback(final boolean lastMessageAsDelived) throws Exception
- {
- if (tx == null)
- {
- // Might be null if XA
-
- tx = new TransactionImpl(storageManager);
- }
-
- doRollback(lastMessageAsDelived, tx);
-
- tx = new TransactionImpl(storageManager);
- }
-
/*
* The way flow producer flow control works is as follows:
* The client can only send messages as long as it has credits. It requests credits
from the server
@@ -1931,7 +1192,7 @@
{
releaseOutStanding(message.getAddress(), credits);
}
-
+
private void releaseOutStanding(final SimpleString address, final int credits) throws
Exception
{
CreditManagerHolder holder = getCreditManagerHolder(address);
@@ -1961,12 +1222,10 @@
{
holder.outstandingCredits += credits;
- Packet packet = new SessionProducerCreditsMessage(credits, address, -1);
-
- channel.send(packet);
+ callback.sendProducerCreditsMessage(credits, address, -1);
}
- private void send(final ServerMessage msg) throws Exception
+ private void doSend(final ServerMessage msg) throws Exception
{
// Look up the paging store
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-21
14:46:21 UTC (rev 8827)
@@ -1,314 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.OperationContext;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.hornetq.core.server.ServerSession;
-
-/**
- * A ServerSessionPacketHandler
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
- * @author <a href="mailto:clebert.suconic@jboss.org>Clebert
Suconic</a>
- */
-public class ServerSessionPacketHandler implements ChannelHandler
-{
- private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
-
- private final ServerSession session;
-
- private final OperationContext sessionContext;
-
- // Storagemanager here is used to set the Context
- private final StorageManager storageManager;
-
- public ServerSessionPacketHandler(final ServerSession session,
- final OperationContext sessionContext,
- final StorageManager storageManager)
- {
- this.session = session;
-
- this.storageManager = storageManager;
-
- this.sessionContext = sessionContext;
- }
-
- public long getID()
- {
- return session.getID();
- }
-
- public void handlePacket(final Packet packet)
- {
- byte type = packet.getType();
-
- storageManager.setContext(sessionContext);
-
- try
- {
- switch (type)
- {
- case SESS_CREATECONSUMER:
- {
- SessionCreateConsumerMessage request =
(SessionCreateConsumerMessage)packet;
- session.handleCreateConsumer(request);
- break;
- }
- case CREATE_QUEUE:
- {
- CreateQueueMessage request = (CreateQueueMessage)packet;
- session.handleCreateQueue(request);
- break;
- }
- case DELETE_QUEUE:
- {
- SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
- session.handleDeleteQueue(request);
- break;
- }
- case SESS_QUEUEQUERY:
- {
- SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
- session.handleExecuteQueueQuery(request);
- break;
- }
- case SESS_BINDINGQUERY:
- {
- SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
- session.handleExecuteBindingQuery(request);
- break;
- }
- case SESS_ACKNOWLEDGE:
- {
- SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
- session.handleAcknowledge(message);
- break;
- }
- case SESS_EXPIRED:
- {
- SessionExpiredMessage message = (SessionExpiredMessage)packet;
- session.handleExpired(message);
- break;
- }
- case SESS_COMMIT:
- {
- session.handleCommit(packet);
- break;
- }
- case SESS_ROLLBACK:
- {
- session.handleRollback((RollbackMessage)packet);
- break;
- }
- case SESS_XA_COMMIT:
- {
- SessionXACommitMessage message = (SessionXACommitMessage)packet;
- session.handleXACommit(message);
- break;
- }
- case SESS_XA_END:
- {
- SessionXAEndMessage message = (SessionXAEndMessage)packet;
- session.handleXAEnd(message);
- break;
- }
- case SESS_XA_FORGET:
- {
- SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
- session.handleXAForget(message);
- break;
- }
- case SESS_XA_JOIN:
- {
- SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
- session.handleXAJoin(message);
- break;
- }
- case SESS_XA_RESUME:
- {
- SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
- session.handleXAResume(message);
- break;
- }
- case SESS_XA_ROLLBACK:
- {
- SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
- session.handleXARollback(message);
- break;
- }
- case SESS_XA_START:
- {
- SessionXAStartMessage message = (SessionXAStartMessage)packet;
- session.handleXAStart(message);
- break;
- }
- case SESS_XA_SUSPEND:
- {
- session.handleXASuspend(packet);
- break;
- }
- case SESS_XA_PREPARE:
- {
- SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
- session.handleXAPrepare(message);
- break;
- }
- case SESS_XA_INDOUBT_XIDS:
- {
- session.handleGetInDoubtXids(packet);
- break;
- }
- case SESS_XA_GET_TIMEOUT:
- {
- session.handleGetXATimeout(packet);
- break;
- }
- case SESS_XA_SET_TIMEOUT:
- {
- SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
- session.handleSetXATimeout(message);
- break;
- }
- case SESS_START:
- {
- session.handleStart(packet);
- break;
- }
- case SESS_STOP:
- {
- session.handleStop(packet);
- break;
- }
- case SESS_CLOSE:
- {
- session.handleClose(packet);
- break;
- }
- case SESS_CONSUMER_CLOSE:
- {
- SessionConsumerCloseMessage message =
(SessionConsumerCloseMessage)packet;
- session.handleCloseConsumer(message);
- break;
- }
- case SESS_FLOWTOKEN:
- {
- SessionConsumerFlowCreditMessage message =
(SessionConsumerFlowCreditMessage)packet;
- session.handleReceiveConsumerCredits(message);
- break;
- }
- case SESS_SEND:
- {
- SessionSendMessage message = (SessionSendMessage)packet;
- session.handleSend(message);
- break;
- }
- case SESS_SEND_LARGE:
- {
- SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
- session.handleSendLargeMessage(message);
- break;
- }
- case SESS_SEND_CONTINUATION:
- {
- SessionSendContinuationMessage message =
(SessionSendContinuationMessage)packet;
- session.handleSendContinuations(message);
- break;
- }
- case SESS_FORCE_CONSUMER_DELIVERY:
- {
- SessionForceConsumerDelivery message =
(SessionForceConsumerDelivery)packet;
- session.handleForceConsumerDelivery(message);
- break;
- }
- case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
- {
- SessionRequestProducerCreditsMessage message =
(SessionRequestProducerCreditsMessage)packet;
- session.handleRequestProducerCredits(message);
- break;
- }
- }
- }
- catch (Throwable t)
- {
- ServerSessionPacketHandler.log.error("Caught unexpected exception",
t);
- }
- finally
- {
- storageManager.completeOperations();
- storageManager.clearContext();
- }
- }
-}