Author: borges
Date: 2011-07-01 12:00:03 -0400 (Fri, 01 Jul 2011)
New Revision: 10911
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
Log:
HORNETQ-720 pass CoreRemotingConnection instead of channels, as it
allows us to use the failureListener.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01
13:31:52 UTC (rev 10910)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-01
16:00:03 UTC (rev 10911)
@@ -158,14 +158,10 @@
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
System.out.println("HA_BACKUP_REGISTRATION: " + msg + "
connector=" + msg.getConnector());
- long channelID = msg.getChannelID();
- Channel channelX = rc.getChannel(CHANNEL_ID.SESSION.id, -1);
- Channel replicationChannel = rc.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
- System.out.println("msg channelID: " + channelID);
System.out.println("HA_BR: " + server.getIdentity() + ",
toString=" + server);
try
{
- server.addHaBackup(channelX, replicationChannel);
+ server.addHaBackup(rc);
}
catch (Exception e)
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01
13:31:52 UTC (rev 10910)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2011-07-01
16:00:03 UTC (rev 10911)
@@ -25,12 +25,10 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
-import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionMessage;
@@ -115,25 +113,6 @@
break;
}
- case PacketImpl.HA_BACKUP_REGISTRATION:
- {
- HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
- System.out.println("HA_BACKUP_REGISTRATION: " + msg + "
connector=" + msg.getConnector());
- long channelID = msg.getChannelID();
- Channel channelX = connection.getChannel(CHANNEL_ID.SESSION.id, -1);
- Channel replicationChannel = connection.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
- System.out.println("HA_BR: " + server.getIdentity() + ",
toString=" + server);
- try
- {
- server.addHaBackup(channelX, replicationChannel);
- }
- catch (Exception e)
- {
- // XXX This is not what we want
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
default:
{
HornetQPacketHandler.log.error("Invalid packet " + packet);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01
13:31:52 UTC (rev 10910)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-01
16:00:03 UTC (rev 10911)
@@ -83,7 +83,7 @@
private SessionFailureListener failureListener;
- private final Channel systemChannel;
+ private CoreRemotingConnection remotingConnection;
// Static --------------------------------------------------------
@@ -95,21 +95,16 @@
this.executorFactory = executorFactory;
CoreRemotingConnection conn = sessionFactory.getConnection();
- systemChannel = conn.getChannel(CHANNEL_ID.SESSION.id, -1);
replicatingChannel = conn.getChannel(CHANNEL_ID.REPLICATION.id, -1);
}
- // Public --------------------------------------------------------
-
/**
- * @param systemChannel
- * @param replicatingChannel
+ * @param remotingConnection
*/
- public ReplicationManagerImpl(Channel systemChannel, Channel replicatingChannel)
+ public ReplicationManagerImpl(CoreRemotingConnection remotingConnection)
{
- super();
- this.systemChannel = systemChannel;
- this.replicatingChannel = replicatingChannel;
+ replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ this.remotingConnection = remotingConnection;
}
public void appendAddRecord(final byte journalID, final long id, final byte
recordType, final EncodingSupport record)
@@ -314,20 +309,13 @@
throw new IllegalStateException("ReplicationManager is already
started");
}
-// replicatingConnection = sessionFactory.getConnection();
-//
-// if (replicatingConnection == null)
-// {
-// ReplicationManagerImpl.log.warn("Backup server MUST be started before
live server. Initialisation will not proceed.");
-// throw new HornetQException(HornetQException.ILLEGAL_STATE,
-// "Backup server MUST be started before live
server. Initialisation will not proceed.");
-// }
-
replicatingChannel.setHandler(responseHandler);
CreateReplicationSessionMessage replicationStartPackage =
new CreateReplicationSessionMessage(replicatingChannel.getID());
+ Channel systemChannel = remotingConnection.getChannel(CHANNEL_ID.SESSION.id, -1);
+
systemChannel.sendBlocking(replicationStartPackage);
failureListener = new SessionFailureListener()
@@ -358,10 +346,9 @@
{
}
};
- // sessionFactory.addFailureListener(failureListener);
+ remotingConnection.addFailureListener(failureListener);
started = true;
-
enabled = true;
}
@@ -397,15 +384,14 @@
replicatingChannel.close();
}
+ remotingConnection.removeFailureListener(failureListener);
// sessionFactory.causeExit();
-// sessionFactory.removeFailureListener(failureListener);
// if (replicatingConnection != null)
// {
// replicatingConnection.destroy();
// }
//
-// replicatingConnection = null;
-
+ remotingConnection = null;
started = false;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01
13:31:52 UTC (rev 10910)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-07-01
16:00:03 UTC (rev 10911)
@@ -28,6 +28,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
@@ -176,5 +177,5 @@
void stop(boolean failoverOnServerShutdown) throws Exception;
- void addHaBackup(Channel channelX, Channel replicationChannel) throws Exception;
+ void addHaBackup(CoreRemotingConnection rc) throws Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01
13:31:52 UTC (rev 10910)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-01
16:00:03 UTC (rev 10911)
@@ -81,6 +81,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.remoting.server.RemotingService;
@@ -547,7 +548,7 @@
}
log.info("announce backup to live-server (id=" + liveConnectorName
+ ")");
liveServerSessionFactory.getConnection()
- .getChannel(CHANNEL_ID.SESSION.id, -1)
+ .getChannel(CHANNEL_ID.PING.id, -1)
.send(new
HaBackupRegistrationMessage(getNodeID().toString(), config));
log.info("backup registered");
@@ -1945,19 +1946,15 @@
}
@Override
- public void addHaBackup(Channel systemChannel, Channel replicatingChannel) throws
Exception
+ public void addHaBackup(CoreRemotingConnection rc) throws Exception
{
if (!(storageManager instanceof JournalStorageManager))
return;
JournalStorageManager journalStorageManager =
(JournalStorageManager)storageManager;
System.out.println(HornetQServerImpl.class.getName() + " " +
this.getIdentity() +
- ": create a ReplicationManagerImpl. Using ChannelID=" +
systemChannel);
- // XXX not sure this is the right call to use
-// final ServerLocatorInternal serverLocator =
-//
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(connector);
-// ClientSessionFactoryInternal sessionFactory =
(ClientSessionFactoryInternal)serverLocator.createSessionFactory();
- replicationManager = new ReplicationManagerImpl(systemChannel,
replicatingChannel);
+ ": create a ReplicationManagerImpl");
+ replicationManager = new ReplicationManagerImpl(rc);
System.out.println("rep.start()");
replicationManager.start();
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java 2011-07-01
13:31:52 UTC (rev 10910)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/spi/core/protocol/RemotingConnection.java 2011-07-01
16:00:03 UTC (rev 10911)
@@ -43,17 +43,15 @@
long getCreationTime();
/**
- * returns a string representation of the remote address of this connection
- *
+ * Returns a string representation of the remote address of this connection.
* @return the remote address
*/
String getRemoteAddress();
/**
- * add a failure listener.
+ * Adds a failure listener.
* <p/>
* The listener will be called in the event of connection failure.
- *
* @param listener the listener
*/
void addFailureListener(FailureListener listener);
@@ -82,27 +80,27 @@
* @return true if removed
*/
boolean removeCloseListener(CloseListener listener);
-
+
List<CloseListener> removeCloseListeners();
-
+
void setCloseListeners(List<CloseListener> listeners);
-
-
+
+
/**
* return all the failure listeners
*
* @return the listeners
*/
List<FailureListener> getFailureListeners();
-
+
List<FailureListener> removeFailureListeners();
/**
- * set the failure listeners.
+ * Sets the failure listeners.
* <p/>
- * These will be called in the event of the connection being closed. Any previosuly
added listeners will be removed.
- *
+ * These will be called in the event of the connection being closed. Any previously
added
+ * listeners will be removed.
* @param listeners the listeners to add.
*/
void setFailureListeners(List<FailureListener> listeners);
@@ -146,20 +144,20 @@
*
* @return true if destroyed, otherwise false
*/
- boolean isDestroyed();
-
+ boolean isDestroyed();
+
/**
* Disconnect the connection, closing all channels
*/
void disconnect();
-
+
/**
* returns true if any data has been received since the last time this method was
called.
*
* @return true if data has been received.
*/
boolean checkDataReceived();
-
+
/**
* flush all outstanding data from the connection.
*/