Author: borges
Date: 2011-07-06 11:49:45 -0400 (Wed, 06 Jul 2011)
New Revision: 10934
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Clean up
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06
15:49:01 UTC (rev 10933)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-06
15:49:45 UTC (rev 10934)
@@ -142,9 +142,9 @@
else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
{
NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
-
- server.getClusterManager().notifyNodeUp(msg.getNodeID(),
getPair(msg.getConnector(), msg.isBackup()),
- false, true);
+ final boolean backup = msg.isBackup();
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(),
getPair(msg.getConnector(), backup), backup,
+ true);
}
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06
15:49:01 UTC (rev 10933)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-06
15:49:45 UTC (rev 10934)
@@ -15,7 +15,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATESESSION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.protocol.core.impl.PacketImpl.CREATE_REPLICATION;
import static org.hornetq.core.protocol.core.impl.PacketImpl.REATTACH_SESSION;
import org.hornetq.api.core.HornetQException;
@@ -103,7 +102,7 @@
break;
}
- case CREATE_REPLICATION:
+ case PacketImpl.CREATE_REPLICATION:
{
// Create queue can also be fielded here in the case of a replicated store
and forward queue creation
@@ -115,7 +114,7 @@
}
default:
{
- HornetQPacketHandler.log.error("Invalid packet " + packet);
+ log.error("Invalid packet " + packet);
}
}
}
@@ -205,7 +204,7 @@
{
log.error("Failed to create session ", e);
- HornetQPacketHandler.log.error("Failed to create session", e);
+ log.error("Failed to create session", e);
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
@@ -271,7 +270,7 @@
}
catch (Exception e)
{
- HornetQPacketHandler.log.error("Failed to reattach session", e);
+ log.error("Failed to reattach session", e);
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
@@ -291,7 +290,7 @@
}
catch (Exception e)
{
- HornetQPacketHandler.log.error("Failed to handle create queue", e);
+ log.error("Failed to handle create queue", e);
}
}
@@ -311,11 +310,11 @@
}
catch (HornetQException e)
{
- response = new HornetQExceptionMessage(e);
+ response = new HornetQExceptionMessage(e);
}
catch (Exception e)
{
- HornetQPacketHandler.log.warn(e.getMessage(), e);
+ log.warn(e.getMessage(), e);
response = new HornetQExceptionMessage(new
HornetQException(HornetQException.INTERNAL_ERROR));
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06
15:49:01 UTC (rev 10933)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/CreateQueueMessage.java 2011-07-06
15:49:45 UTC (rev 10934)
@@ -54,7 +54,7 @@
final boolean temporary,
final boolean requiresResponse)
{
- super(PacketImpl.CREATE_QUEUE);
+ this();
this.address = address;
this.queueName = queueName;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06
15:49:01 UTC (rev 10933)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-06
15:49:45 UTC (rev 10934)
@@ -126,54 +126,55 @@
public void handlePacket(final Packet packet)
{
PacketImpl response = new ReplicationResponseMessage();
+ final byte type=packet.getType();
try
{
- if (packet.getType() == PacketImpl.REPLICATION_APPEND)
+ if (type == PacketImpl.REPLICATION_APPEND)
{
handleAppendAddRecord((ReplicationAddMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_APPEND_TX)
+ else if (type == PacketImpl.REPLICATION_APPEND_TX)
{
handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_DELETE)
+ else if (type == PacketImpl.REPLICATION_DELETE)
{
handleAppendDelete((ReplicationDeleteMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_DELETE_TX)
+ else if (type == PacketImpl.REPLICATION_DELETE_TX)
{
handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_PREPARE)
+ else if (type == PacketImpl.REPLICATION_PREPARE)
{
handlePrepare((ReplicationPrepareMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
+ else if (type == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
{
handleCommitRollback((ReplicationCommitMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_PAGE_WRITE)
+ else if (type == PacketImpl.REPLICATION_PAGE_WRITE)
{
handlePageWrite((ReplicationPageWriteMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_PAGE_EVENT)
+ else if (type == PacketImpl.REPLICATION_PAGE_EVENT)
{
handlePageEvent((ReplicationPageEventMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN)
{
handleLargeMessageBegin((ReplicationLargeMessageBeingMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE)
{
handleLargeMessageWrite((ReplicationLargeMessageWriteMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
+ else if (type == PacketImpl.REPLICATION_LARGE_MESSAGE_END)
{
handleLargeMessageEnd((ReplicationLargemessageEndMessage)packet);
}
- else if (packet.getType() == PacketImpl.REPLICATION_COMPARE_DATA)
+ else if (type == PacketImpl.REPLICATION_COMPARE_DATA)
{
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
@@ -183,10 +184,15 @@
ReplicationEndpointImpl.log.warn("Packet " + packet + "
can't be processed by the ReplicationEndpoint");
}
}
+ catch (HornetQException e)
+ {
+ log.warn(e.getMessage(), e);
+ response = new HornetQExceptionMessage(e);
+ }
catch (Exception e)
{
- ReplicationEndpointImpl.log.warn(e.getMessage(), e);
- response = new HornetQExceptionMessage((HornetQException)e);
+ log.warn(e.getMessage(), e);
+ throw new RuntimeException(e);
}
channel.send(response);
@@ -616,7 +622,4 @@
{
return this.journals[journalID];
}
-
- // Inner classes -------------------------------------------------
-
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-06
15:49:01 UTC (rev 10933)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-06
15:49:45 UTC (rev 10934)
@@ -144,11 +144,14 @@
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
- // JMS Topics (which are outside of the scope of the core API) will require a dumb
subscription with a dummy-filter at this current version
- // as a way to keep its existence valid and TCK tests
- // That subscription needs an invalid filter, however paging needs to ignore any
subscription with this filter.
- // For that reason, this filter needs to be rejected on paging or any other component
on the system, and just be ignored for any purpose
- // It's declared here as this filter is considered a global ignore
+ /*
+ * JMS Topics (which are outside of the scope of the core API) will require a dumb
subscription
+ * with a dummy-filter at this current version as a way to keep its existence valid
and TCK
+ * tests. That subscription needs an invalid filter, however paging needs to ignore
any
+ * subscription with this filter. For that reason, this filter needs to be rejected on
paging or
+ * any other component on the system, and just be ignored for any purpose It's
declared here as
+ * this filter is considered a global ignore
+ */
public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
// Static
@@ -551,9 +554,7 @@
liveChannel.send(new HaBackupRegistrationMessage(getNodeID().toString(),
config));
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id,
-1).setHandler(replicationEndpoint);
- liveServerSessionFactory.getConnection()
- .getChannel(CHANNEL_ID.PING.id, -1)
- .send(new
HaBackupRegistrationMessage(getNodeID().toString(), config));
+ liveChannel.send(new HaBackupRegistrationMessage(getNodeID().toString(),
config));
started = true;
@@ -1197,15 +1198,6 @@
return connectorsService;
}
- // Public
- //
---------------------------------------------------------------------------------------
-
- // Package protected
- // ----------------------------------------------------------------------------
-
- // Protected
- //
------------------------------------------------------------------------------------
-
/**
* Protected so tests can change this behaviour
* @param backupConnector
@@ -1959,6 +1951,5 @@
replicationManager.start();
journalStorageManager.setReplicator(replicationManager);
- System.out.println("HornetQServerImpl: ReplicationManagerImpl is started &
added to JournalStorageManager...");
}
}