[Jboss-cvs] JBossAS SVN: r56520 - trunk/cluster/src/main/org/jboss/ha/framework/server
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Sep 2 00:14:28 EDT 2006
Author: bstansberry at jboss.com
Date: 2006-09-02 00:14:27 -0400 (Sat, 02 Sep 2006)
New Revision: 56520
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
Log:
[JBAS-3540] Streamable state transfer
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-09-02 02:12:19 UTC (rev 56519)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/HAPartitionImpl.java 2006-09-02 04:14:27 UTC (rev 56520)
@@ -23,12 +23,16 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Vector;
import javax.naming.Context;
@@ -39,12 +43,15 @@
import javax.naming.StringRefAddr;
import javax.management.MBeanServer;
+import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
+import org.jgroups.MembershipListener;
import org.jgroups.MergeView;
import org.jgroups.View;
import org.jgroups.Message;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
@@ -67,17 +74,32 @@
* @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
* @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
* @author Scott.Stark at jboss.org
+ * @author brian.stansberry at jboss.com
* @version $Revision$
*/
public class HAPartitionImpl
- extends org.jgroups.blocks.RpcDispatcher
- implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
+ extends RpcDispatcher
+ implements ExtendedMessageListener, MembershipListener,
HAPartition, AsynchEventHandler.AsynchEventProcessor
{
+ private static final byte NULL_VALUE = 0;
+ private static final byte SERIALIZABLE_VALUE = 1;
+ // TODO add Streamable support
+ // private static final byte STREAMABLE_VALUE = 2;
+
+ /**
+ * Returned when an RPC call arrives for a service that isn't registered.
+ */
private static class NoHandlerForRPC implements Serializable
{
static final long serialVersionUID = -1263095408483622838L;
}
+
+ private static class StateStreamEnd implements Serializable
+ {
+ /** The serialVersionUID */
+ private static final long serialVersionUID = -3705345735451504946L;
+ }
// Constants -----------------------------------------------------
@@ -362,8 +384,8 @@
log.warn("Failed to stop asynchHandler", e);
}
- // Stop the DRM and DS services
- //
+ // Stop the DRM service
+ // TODO remove when DRM is independent
try
{
this.replicantManager.stop();
@@ -373,21 +395,11 @@
log.error("operation failed", e);
}
-// try
-// {
-// this.dsManager.stop();
-// }
-// catch (Exception e)
-// {
-// log.error("operation failed", e);
-// }
-
// NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
// add the destroyPartition() step
try
{
-// channel.close();
- channel.disconnect();
+ channel.disconnect();
}
catch (Exception e)
{
@@ -409,8 +421,7 @@
log.info("Partition " + partitionName + " closed.");
}
-
-// NR 200505 : [JBCLUSTER-38] destroy partition close the channel
+
public void destroyPartition() throws Exception
{
@@ -420,145 +431,250 @@
}
catch (Exception e)
{
- log.error("operation failed", e);
- }
-
-// try
-// {
-// this.dsManager.destroy();
-// }
-// catch (Exception e)
-// {
-// log.error("operation failed", e);
-// }
+ log.error("Destroying DRM failed", e);
+ }
-
try
{
channel.close();
}
catch (Exception e)
{
- log.error("operation failed", e);
+ log.error("Closing channel failed", e);
}
log.info("Partition " + partitionName + " destroyed.");
- }
+ }
+
// org.jgroups.MessageListener implementation ----------------------------------------------
- // MessageListener methods
- //
public byte[] getState()
{
logHistory ("getState called on partition");
- boolean debug = log.isDebugEnabled();
log.debug("getState called.");
try
{
- // we now get the sub-state of each HAPartitionStateTransfer subscribers and
- // build a "macro" state
- //
- HashMap state = new HashMap();
- Iterator keys = stateHandlers.keySet().iterator();
- while (keys.hasNext())
- {
- String key = (String)keys.next();
- HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
- if (debug)
- log.debug("getState for " + key);
- state.put(key, subscriber.getCurrentState());
- }
- return objectToByteBuffer(state);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ getStateInternal(baos);
+ return baos.toByteArray();
}
catch (Exception ex)
{
log.error("getState failed", ex);
}
- return null;
+ return null; // This will cause the receiver to get a "false" on the channel.getState() call
}
+ public void getState(OutputStream stream)
+ {
+ logHistory ("getState called on partition");
+
+ log.debug("getState called.");
+ try
+ {
+ getStateInternal(stream);
+ }
+ catch (Exception ex)
+ {
+ log.error("getState failed", ex);
+ }
+
+ }
+
+ private void getStateInternal(OutputStream stream) throws IOException
+ {
+ MarshalledValueOutputStream mvos = null; // don't create until we know we need it
+
+ for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)keys.next();
+ HAPartition.HAPartitionStateTransfer subscriber =
+ (HAPartition.HAPartitionStateTransfer) entry.getValue();
+ log.debug("getState for " + entry.getKey());
+ Object state = subscriber.getCurrentState();
+ if (state != null)
+ {
+ if (mvos == null)
+ {
+ // This is our first write, so need to write the header first
+ stream.write(SERIALIZABLE_VALUE);
+
+ mvos = new MarshalledValueOutputStream(stream);
+ }
+
+ mvos.writeObject(entry.getKey());
+ mvos.writeObject(state);
+ }
+ }
+
+ if (mvos == null)
+ {
+ // We never wrote any state, so write the NULL header
+ stream.write(NULL_VALUE);
+ }
+ else
+ {
+ mvos.writeObject(new StateStreamEnd());
+ }
+
+ }
+
public void setState(byte[] obj)
{
logHistory ("setState called on partition");
try
{
- log.debug("setState called");
if (obj == null)
{
- log.debug("state is null");
- return;
+ log.debug("transferred state is null (may be first member in cluster)");
}
-
- long used_mem_before, used_mem_after;
- int state_size=obj != null? obj.length : 0;
- Runtime rt=Runtime.getRuntime();
- used_mem_before=rt.totalMemory() - rt.freeMemory();
-
- HashMap state = (HashMap)objectFromByteBuffer(obj);
- java.util.Iterator keys = state.keySet().iterator();
- while (keys.hasNext())
+ else
{
- String key = (String)keys.next();
- log.debug("setState for " + key);
- Object someState = state.get(key);
- HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
- if (subscriber != null)
- {
- try
- {
- subscriber.setCurrentState((java.io.Serializable)someState);
- }
- catch (Exception e)
- {
- // Don't let issues with one subscriber affect others
- // unless it is DRM, which is really an internal function
- // of the HAPartition
- // FIXME remove this once DRM is JBC-based
- if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
- {
- if (e instanceof RuntimeException)
- throw (RuntimeException) e;
- else
- throw new RuntimeException(e);
- }
- else
- {
- log.error("Caught exception setting state to " + subscriber, e);
- }
- }
- }
- else
- {
- log.debug("There is no stateHandler for: " + key);
- }
+ ByteArrayInputStream bais = new ByteArrayInputStream(obj);
+ setStateInternal(bais);
+ bais.close();
}
-
- used_mem_after=rt.totalMemory() - rt.freeMemory();
- log.debug("received a state of " + state_size + " bytes; expanded memory by " +
- (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
- ", used memory after: " + used_mem_after + ")");
isStateSet = true;
}
catch (Throwable t)
{
- log.error("failed setting state", t);
- if (t instanceof Exception)
- setStateException = (Exception) t;
+ recordSetStateFailure(t);
+ }
+ finally
+ {
+ notifyStateTransferCompleted();
+ }
+ }
+
+ public void setState(InputStream stream)
+ {
+ logHistory ("setState called on partition");
+ try
+ {
+ if (stream == null)
+ {
+ log.debug("transferred state is null (may be first member in cluster)");
+ }
else
- setStateException = new Exception(t);
+ {
+ setStateInternal(stream);
+ }
+
+ isStateSet = true;
}
+ catch (Throwable t)
+ {
+ recordSetStateFailure(t);
+ }
finally
{
- synchronized (stateLock)
+ notifyStateTransferCompleted();
+ }
+ }
+
+ private void setStateInternal(InputStream stream) throws IOException, ClassNotFoundException
+ {
+ byte type = (byte) stream.read();
+
+ if (type == NULL_VALUE)
+ {
+ log.debug("state is null");
+ return;
+ }
+
+ long used_mem_before, used_mem_after;
+ Runtime rt=Runtime.getRuntime();
+ used_mem_before=rt.totalMemory() - rt.freeMemory();
+
+ MarshalledValueInputStream mvis = new MarshalledValueInputStream(stream);
+
+ while (true)
+ {
+ Object obj = mvis.readObject();
+ if (obj instanceof StateStreamEnd)
+ break;
+
+ String key = (String) obj;
+ log.debug("setState for " + key);
+ Object someState = mvis.readObject();
+ HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
+ if (subscriber != null)
{
- // Notify wait that state has been set.
- stateLock.notifyAll();
+ try
+ {
+ subscriber.setCurrentState((Serializable)someState);
+ }
+ catch (Exception e)
+ {
+ // Don't let issues with one subscriber affect others
+ // unless it is DRM, which is really an internal function
+ // of the HAPartition
+ // FIXME remove this once DRM is JBC-based
+ if (DistributedReplicantManagerImpl.SERVICE_NAME.equals(key))
+ {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+ else
+ {
+ log.error("Caught exception setting state to " + subscriber, e);
+ }
+ }
}
+ else
+ {
+ log.debug("There is no stateHandler for: " + key);
+ }
}
+
+ used_mem_after=rt.totalMemory() - rt.freeMemory();
+ log.debug("received state; expanded memory by " +
+ (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
+ ", used memory after: " + used_mem_after + ")");
}
+
+ private void recordSetStateFailure(Throwable t)
+ {
+ log.error("failed setting state", t);
+ if (t instanceof Exception)
+ setStateException = (Exception) t;
+ else
+ setStateException = new Exception(t);
+ }
+
+ private void notifyStateTransferCompleted()
+ {
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ public void getState(String state_id, OutputStream ostream)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
+ public byte[] getState(String state_id)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
+ public void setState(String state_id, byte[] state)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
+ public void setState(String state_id, InputStream istream)
+ {
+ throw new UnsupportedOperationException("Not implemented; see http://jira.jboss.com/jira/browse/JBAS-3594");
+ }
+
public void receive(org.jgroups.Message msg)
{ /* complete */}
More information about the jboss-cvs-commits
mailing list