Author: borges
Date: 2011-07-06 11:48:32 -0400 (Wed, 06 Jul 2011)
New Revision: 10932
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
Clean up
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-07-06
10:21:27 UTC (rev 10931)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2011-07-06
15:48:32 UTC (rev 10932)
@@ -54,7 +54,6 @@
import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -121,7 +120,7 @@
private final Channel channel;
private volatile CoreRemotingConnection remotingConnection;
-
+
private final boolean direct;
public ServerSessionPacketHandler(final ServerSession session,
@@ -135,10 +134,10 @@
this.channel = channel;
this.remotingConnection = channel.getConnection();
-
+
//TODO think of a better way of doing this
Connection conn = remotingConnection.getTransportConnection();
-
+
if (conn instanceof NettyConnection)
{
direct = ((NettyConnection)conn).isDirectDeliver();
@@ -188,7 +187,7 @@
{
return channel;
}
-
+
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
@@ -199,7 +198,7 @@
boolean flush = false;
boolean closeChannel = false;
boolean requiresResponse = false;
-
+
try
{
try
@@ -207,8 +206,8 @@
switch (type)
{
case SESS_CREATECONSUMER:
- {
- SessionCreateConsumerMessage request =
(SessionCreateConsumerMessage)packet;
+ {
+ SessionCreateConsumerMessage request =
(SessionCreateConsumerMessage)packet;
requiresResponse = request.isRequiresResponse();
session.createConsumer(request.getID(),
request.getQueueName(),
@@ -217,7 +216,7 @@
if (requiresResponse)
{
// We send back queue information on the queue as a response- this
allows the queue to
- // be automaticall recreated on failover
+ // be automatically recreated on failover
response = new
SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName()));
}
@@ -239,7 +238,7 @@
break;
}
case DELETE_QUEUE:
- {
+ {
requiresResponse = true;
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
session.deleteQueue(request.getQueueName());
@@ -507,7 +506,7 @@
{
if (requiresResponse)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ response = new HornetQExceptionMessage(e);
}
else
{
@@ -586,11 +585,11 @@
channel.close();
}
}
-
+
public void closeListeners()
{
List<CloseListener> listeners = remotingConnection.removeCloseListeners();
-
+
for (CloseListener closeListener: listeners)
{
closeListener.connectionClosed();
@@ -600,7 +599,7 @@
}
}
}
-
+
public int transferConnection(final CoreRemotingConnection newConnection, final int
lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is
occurring- otherwise packets might get
@@ -611,10 +610,10 @@
// might be executed
// before we have transferred the connection, leaving it in a started state
session.setTransferring(true);
-
+
List<CloseListener> closeListeners =
remotingConnection.removeCloseListeners();
List<FailureListener> failureListeners =
remotingConnection.removeFailureListeners();
-
+
// Note. We do not destroy the replicating connection here. In the case the live
server has really crashed
// then the connection will get cleaned up anyway when the server ping timeout
kicks in.
// In the case the live server is really still up, i.e. a split brain situation (or
in tests), then closing
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06
10:21:27 UTC (rev 10931)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06
15:48:32 UTC (rev 10932)
@@ -149,8 +149,6 @@
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
- System.out.println("HA_BACKUP_REGISTRATION: " + msg + "
connector=" + msg.getConnector());
- System.out.println("HA_BR: " + server.getIdentity() + ",
toString=" + server);
try
{
server.addHaBackup(rc);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06
10:21:27 UTC (rev 10931)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06
15:48:32 UTC (rev 10932)
@@ -309,17 +309,14 @@
response = new NullResponseMessage();
}
+ catch (HornetQException e)
+ {
+ response = new HornetQExceptionMessage(e);
+ }
catch (Exception e)
{
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- HornetQPacketHandler.log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ HornetQPacketHandler.log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
channel1.send(response);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06
10:21:27 UTC (rev 10931)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06
15:48:32 UTC (rev 10932)
@@ -40,7 +40,7 @@
private boolean durable;
private boolean temporary;
-
+
private boolean requiresResponse;
// Static --------------------------------------------------------
@@ -108,7 +108,7 @@
{
return temporary;
}
-
+
public boolean isRequiresResponse()
{
return requiresResponse;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06
10:21:27 UTC (rev 10931)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06
15:48:32 UTC (rev 10932)
@@ -76,7 +76,7 @@
private final HornetQServer server;
private Channel channel;
-
+
private Journal[] journals;
private JournalStorageManager storage;
@@ -99,14 +99,14 @@
}
// Public --------------------------------------------------------
-
+
public void registerJournal(final byte id, final Journal journal)
{
if (journals == null || id >= journals.length)
{
Journal[] oldJournals = journals;
journals = new Journal[id + 1];
-
+
if (oldJournals != null)
{
for (int i = 0 ; i < oldJournals.length; i++)
@@ -115,11 +115,11 @@
}
}
}
-
+
journals[id] = journal;
}
-
- /*
+
+ /*
* (non-Javadoc)
* @see
org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
*/
@@ -266,7 +266,7 @@
}
largeMessages.clear();
-
+
pageManager.stop();
}